This project has retired. For details please refer to its
Attic page.
AbstractProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.util.Calendar;
23 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27 import org.apache.hadoop.chukwa.extraction.engine.Record;
28 import org.apache.hadoop.chukwa.util.RecordConstants;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.Reporter;
31 import org.apache.log4j.Logger;
32
33 public abstract class AbstractProcessor implements MapProcessor {
34 static Logger log = Logger.getLogger(AbstractProcessor.class);
35
36 Calendar calendar = Calendar.getInstance();
37 byte[] bytes;
38 int[] recordOffsets;
39 protected int currentPos = 0;
40 protected int startOffset = 0;
41
42 protected ChukwaArchiveKey archiveKey = null;
43 protected ChukwaRecordKey key = new ChukwaRecordKey();
44 protected Chunk chunk = null;
45
46 boolean chunkInErrorSaved = false;
47 OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
48 Reporter reporter = null;
49
50 public AbstractProcessor() {
51 }
52
53 protected abstract void parse(String recordEntry,
54 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55 throws Throwable;
56
57 protected void saveChunkInError(Throwable throwable) {
58 if (chunkInErrorSaved == false) {
59 try {
60 ChunkSaver.saveChunk(chunk, throwable, output, reporter);
61 chunkInErrorSaved = true;
62 } catch (Exception e) {
63 e.printStackTrace();
64 }
65 }
66
67 }
68
69 public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
70 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
71 chunkInErrorSaved = false;
72
73 this.archiveKey = archiveKey;
74 this.output = output;
75 this.reporter = reporter;
76
77 reset(chunk);
78
79 while (hasNext()) {
80 try {
81 parse(nextLine(), output, reporter);
82 } catch (Throwable e) {
83 saveChunkInError(e);
84 }
85 }
86 }
87
88 protected void buildGenericRecord(ChukwaRecord record, String body,
89 long timestamp, String dataSource) {
90 calendar.setTimeInMillis(timestamp);
91 calendar.set(Calendar.MINUTE, 0);
92 calendar.set(Calendar.SECOND, 0);
93 calendar.set(Calendar.MILLISECOND, 0);
94
95 key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
96 + timestamp);
97 key.setReduceType(dataSource);
98
99 if (body != null) {
100 record.add(Record.bodyField, body);
101 }
102 record.setTime(timestamp);
103
104 record.add(Record.tagsField, chunk.getTags());
105 record.add(Record.sourceField, chunk.getSource());
106 record.add(Record.applicationField, chunk.getStreamName());
107
108 }
109
110 protected void reset(Chunk chunk) {
111 this.chunk = chunk;
112 this.bytes = chunk.getData();
113 this.recordOffsets = chunk.getRecordOffsets();
114 currentPos = 0;
115 startOffset = 0;
116 }
117
118 protected boolean hasNext() {
119 return (currentPos < recordOffsets.length);
120 }
121
122 protected String nextLine() {
123 String log = new String(bytes, startOffset, (recordOffsets[currentPos]
124 - startOffset + 1));
125 startOffset = recordOffsets[currentPos] + 1;
126 currentPos++;
127 return RecordConstants.recoverRecordSeparators("\n", log);
128 }
129
130 public int getCurrentPos() {
131 return currentPos;
132 }
133
134 public void setCurrentPos(int currentPos) {
135 this.currentPos = currentPos;
136 }
137
138 public int getStartOffset() {
139 return startOffset;
140 }
141
142 public void setStartOffset(int startOffset) {
143 this.startOffset = startOffset;
144 }
145 }