This project has retired. For details please refer to its
Attic page.
AbstractProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.nio.charset.Charset;
23 import java.util.Calendar;
24
25 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
26 import org.apache.hadoop.chukwa.Chunk;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
29 import org.apache.hadoop.chukwa.extraction.engine.Record;
30 import org.apache.hadoop.chukwa.util.RecordConstants;
31 import org.apache.hadoop.mapred.OutputCollector;
32 import org.apache.hadoop.mapred.Reporter;
33 import org.apache.log4j.Logger;
34
35 public abstract class AbstractProcessor implements MapProcessor {
36 static Logger log = Logger.getLogger(AbstractProcessor.class);
37
38 Calendar calendar = Calendar.getInstance();
39 byte[] bytes;
40 int[] recordOffsets;
41 protected int currentPos = 0;
42 protected int startOffset = 0;
43
44 protected ChukwaArchiveKey archiveKey = null;
45 protected ChukwaRecordKey key = new ChukwaRecordKey();
46 protected Chunk chunk = null;
47
48 boolean chunkInErrorSaved = false;
49 OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
50 Reporter reporter = null;
51
52 public AbstractProcessor() {
53 }
54
55 protected abstract void parse(String recordEntry,
56 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
57 throws Throwable;
58
59 protected void saveChunkInError(Throwable throwable) {
60 if (chunkInErrorSaved == false) {
61 try {
62 ChunkSaver.saveChunk(chunk, throwable, output, reporter);
63 chunkInErrorSaved = true;
64 } catch (Exception e) {
65 e.printStackTrace();
66 }
67 }
68
69 }
70
71 public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
72 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
73 chunkInErrorSaved = false;
74
75 this.archiveKey = archiveKey;
76 this.output = output;
77 this.reporter = reporter;
78
79 reset(chunk);
80
81 while (hasNext()) {
82 try {
83 parse(nextLine(), output, reporter);
84 } catch (Throwable e) {
85 saveChunkInError(e);
86 }
87 }
88 }
89
90 protected void buildGenericRecord(ChukwaRecord record, String body,
91 long timestamp, String dataSource) {
92 calendar.setTimeInMillis(timestamp);
93 calendar.set(Calendar.MINUTE, 0);
94 calendar.set(Calendar.SECOND, 0);
95 calendar.set(Calendar.MILLISECOND, 0);
96
97 key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
98 + timestamp);
99 key.setReduceType(dataSource);
100
101 if (body != null) {
102 record.add(Record.bodyField, body);
103 }
104 record.setTime(timestamp);
105
106 record.add(Record.tagsField, chunk.getTags());
107 record.add(Record.sourceField, chunk.getSource());
108 record.add(Record.applicationField, chunk.getStreamName());
109
110 }
111
112 protected void reset(Chunk chunk) {
113 this.chunk = chunk;
114 this.bytes = chunk.getData();
115 this.recordOffsets = chunk.getRecordOffsets();
116 currentPos = 0;
117 startOffset = 0;
118 }
119
120 protected boolean hasNext() {
121 return (currentPos < recordOffsets.length);
122 }
123
124 protected String nextLine() {
125 String log = new String(bytes, startOffset, (recordOffsets[currentPos]
126 - startOffset + 1), Charset.forName("UTF-8"));
127 startOffset = recordOffsets[currentPos] + 1;
128 currentPos++;
129 return RecordConstants.recoverRecordSeparators("\n", log);
130 }
131
132 public int getCurrentPos() {
133 return currentPos;
134 }
135
136 public void setCurrentPos(int currentPos) {
137 this.currentPos = currentPos;
138 }
139
140 public int getStartOffset() {
141 return startOffset;
142 }
143
144 public void setStartOffset(int startOffset) {
145 this.startOffset = startOffset;
146 }
147 }