This project has retired. For details please refer to its Attic page.
AbstractProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.nio.charset.Charset;
23  import java.util.Calendar;
24  
25  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
26  import org.apache.hadoop.chukwa.Chunk;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
28  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
29  import org.apache.hadoop.chukwa.extraction.engine.Record;
30  import org.apache.hadoop.chukwa.util.RecordConstants;
31  import org.apache.hadoop.mapred.OutputCollector;
32  import org.apache.hadoop.mapred.Reporter;
33  import org.apache.log4j.Logger;
34  
35  public abstract class AbstractProcessor implements MapProcessor {
36    static Logger log = Logger.getLogger(AbstractProcessor.class);
37  
38    Calendar calendar = Calendar.getInstance();
39    byte[] bytes;
40    int[] recordOffsets;
41    protected int currentPos = 0;
42    protected int startOffset = 0;
43  
44    protected ChukwaArchiveKey archiveKey = null;
45    protected ChukwaRecordKey key = new ChukwaRecordKey();
46    protected Chunk chunk = null;
47  
48    boolean chunkInErrorSaved = false;
49    OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
50    Reporter reporter = null;
51  
52    public AbstractProcessor() {
53    }
54  
55    protected abstract void parse(String recordEntry,
56        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57        throws Throwable;
58  
59    protected void saveChunkInError(Throwable throwable) {
60      if (chunkInErrorSaved == false) {
61        try {
62          ChunkSaver.saveChunk(chunk, throwable, output, reporter);
63          chunkInErrorSaved = true;
64        } catch (Exception e) {
65          e.printStackTrace();
66        }
67      }
68  
69    }
70  
71    public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
72        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
73      chunkInErrorSaved = false;
74  
75      this.archiveKey = archiveKey;
76      this.output = output;
77      this.reporter = reporter;
78  
79      reset(chunk);
80  
81      while (hasNext()) {
82        try {
83          parse(nextLine(), output, reporter);
84        } catch (Throwable e) {
85          saveChunkInError(e);
86        }
87      }
88    }
89  
90    protected void buildGenericRecord(ChukwaRecord record, String body,
91        long timestamp, String dataSource) {
92      calendar.setTimeInMillis(timestamp);
93      calendar.set(Calendar.MINUTE, 0);
94      calendar.set(Calendar.SECOND, 0);
95      calendar.set(Calendar.MILLISECOND, 0);
96  
97      key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
98          + timestamp);
99      key.setReduceType(dataSource);
100 
101     if (body != null) {
102       record.add(Record.bodyField, body);
103     }
104     record.setTime(timestamp);
105 
106     record.add(Record.tagsField, chunk.getTags());
107     record.add(Record.sourceField, chunk.getSource());
108     record.add(Record.applicationField, chunk.getStreamName());
109 
110   }
111 
112   protected void reset(Chunk chunk) {
113     this.chunk = chunk;
114     this.bytes = chunk.getData();
115     this.recordOffsets = chunk.getRecordOffsets();
116     currentPos = 0;
117     startOffset = 0;
118   }
119 
120   protected boolean hasNext() {
121     return (currentPos < recordOffsets.length);
122   }
123 
124   protected String nextLine() {
125     String log = new String(bytes, startOffset, (recordOffsets[currentPos]
126         - startOffset + 1), Charset.forName("UTF-8"));
127     startOffset = recordOffsets[currentPos] + 1;
128     currentPos++;
129     return RecordConstants.recoverRecordSeparators("\n", log);
130   }
131 
132 public int getCurrentPos() {
133 	return currentPos;
134 }
135 
136 public void setCurrentPos(int currentPos) {
137 	this.currentPos = currentPos;
138 }
139 
140 public int getStartOffset() {
141 	return startOffset;
142 }
143 
144 public void setStartOffset(int startOffset) {
145 	this.startOffset = startOffset;
146 }  
147 }