This project has retired. For details please refer to its
Attic page.
Log4jJobHistoryProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.util.Hashtable;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.Record;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.Reporter;
31 import org.apache.log4j.Logger;
32
33 public class Log4jJobHistoryProcessor extends AbstractProcessor {
34 static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
35
36 private static final String recordType = "JobLogHistory";
37 private static String internalRegex = null;
38 private static Pattern ip = null;
39
40 private Matcher internalMatcher = null;
41
42 public Log4jJobHistoryProcessor() {
43 internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
44 ip = Pattern.compile(internalRegex);
45 internalMatcher = ip.matcher("-");
46 }
47
48 @Override
49 protected void parse(String recordEntry,
50 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51 throws Throwable {
52
53
54
55
56 try {
57
58
59 int start = 24;
60 int idx = recordEntry.indexOf(' ', start);
61
62 start = idx + 1;
63 idx = recordEntry.indexOf(' ', start);
64
65 String body = recordEntry.substring(idx + 1);
66
67 Hashtable<String, String> keys = new Hashtable<String, String>();
68 ChukwaRecord record = null;
69
70 int firstSep = body.indexOf(" ");
71 keys.put("RECORD_TYPE", body.substring(0, firstSep));
72
73
74
75 body = body.substring(firstSep);
76
77 internalMatcher.reset(body);
78
79
80
81
82 while (internalMatcher.matches()) {
83
84 keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
85 .trim());
86
87
88
89
90
91
92
93 internalMatcher.reset(internalMatcher.group(3));
94 }
95
96 if (!keys.containsKey("JOBID")) {
97
98
99
100 String jobId = keys.get("TASKID");
101 int idx1 = jobId.indexOf('_', 0);
102 int idx2 = jobId.indexOf('_', idx1 + 1);
103 idx2 = jobId.indexOf('_', idx2 + 1);
104 keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
105
106
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
151 && keys.containsKey("START_TIME")) {
152
153
154
155
156
157
158 key = new ChukwaRecordKey();
159 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
160 + keys.get("START_TIME"));
161 key.setReduceType("JobLogHistoryReduceProcessor");
162 record = new ChukwaRecord();
163 record.setTime(Long.parseLong(keys.get("START_TIME")));
164 record.add("JOBID", keys.get("JOBID"));
165 record.add("START_TIME", keys.get("START_TIME"));
166 record.add(Record.tagsField, chunk.getTags());
167
168 output.collect(key, record);
169
170 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
171 && keys.containsKey("FINISH_TIME")) {
172
173
174
175
176
177
178
179 key = new ChukwaRecordKey();
180 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
181 + keys.get("FINISH_TIME"));
182 key.setReduceType("JobLogHistoryReduceProcessor");
183 record = new ChukwaRecord();
184 record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
185 record.add("JOBID", keys.get("JOBID"));
186 record.add("FINISH_TIME", keys.get("FINISH_TIME"));
187 record.add(Record.tagsField, chunk.getTags());
188
189 output.collect(key, record);
190 }
191
192 else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
193 && keys.containsKey("START_TIME")) {
194
195
196
197
198
199
200 key = new ChukwaRecordKey();
201 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
202 + keys.get("START_TIME"));
203 key.setReduceType("JobLogHistoryReduceProcessor");
204 record = new ChukwaRecord();
205 record.setTime(Long.parseLong(keys.get("START_TIME")));
206 record.add("JOBID", keys.get("JOBID"));
207 record.add("START_TIME", keys.get("START_TIME"));
208 record.add(Record.tagsField, chunk.getTags());
209
210 output.collect(key, record);
211
212 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
213 && keys.containsKey("FINISH_TIME")) {
214
215
216
217
218
219
220
221 key = new ChukwaRecordKey();
222 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
223 + keys.get("SHUFFLE_FINISHED"));
224 key.setReduceType("JobLogHistoryReduceProcessor");
225 record = new ChukwaRecord();
226 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
227 record.add("JOBID", keys.get("JOBID"));
228 record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
229 record.add(Record.tagsField, chunk.getTags());
230
231 output.collect(key, record);
232
233
234 key = new ChukwaRecordKey();
235 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
236 + keys.get("SHUFFLE_FINISHED"));
237 key.setReduceType("JobLogHistoryReduceProcessor");
238 record = new ChukwaRecord();
239 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
240 record.add("JOBID", keys.get("JOBID"));
241 record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
242 record.add(Record.tagsField, chunk.getTags());
243
244 output.collect(key, record);
245
246 key = new ChukwaRecordKey();
247 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
248 + keys.get("SORT_FINISHED"));
249 key.setReduceType("JobLogHistoryReduceProcessor");
250 record = new ChukwaRecord();
251 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
252 record.add("JOBID", keys.get("JOBID"));
253 record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
254 record.add(Record.tagsField, chunk.getTags());
255
256 output.collect(key, record);
257
258
259 key = new ChukwaRecordKey();
260 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
261 + keys.get("SORT_FINISHED"));
262 key.setReduceType("JobLogHistoryReduceProcessor");
263 record = new ChukwaRecord();
264 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
265 record.add("JOBID", keys.get("JOBID"));
266 record.add("START_TIME", keys.get("SORT_FINISHED"));
267 record.add(Record.tagsField, chunk.getTags());
268
269 output.collect(key, record);
270
271 key = new ChukwaRecordKey();
272 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
273 + keys.get("FINISH_TIME"));
274 key.setReduceType("JobLogHistoryReduceProcessor");
275 record = new ChukwaRecord();
276 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
277 record.add("JOBID", keys.get("JOBID"));
278 record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
279 record.add(Record.tagsField, chunk.getTags());
280
281 output.collect(key, record);
282
283 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
284 && keys.containsKey("COUNTERS")) {
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 record = new ChukwaRecord();
306 key = new ChukwaRecordKey();
307 buildGenericRecord(record, null, Long
308 .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
309 extractCounters(record, keys.get("COUNTERS"));
310
311 String jobId = keys.get("JOBID").replace("_", "").substring(3);
312 record.add("JobId", "" + jobId);
313
314
315 if (keys.containsKey("HODID")) {
316 record.add("HodId", keys.get("HODID"));
317 }
318
319
320 output.collect(key, record);
321 }
322
323 if (keys.containsKey("TASK_TYPE")
324 && keys.containsKey("COUNTERS")
325 && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
326 "TASK_TYPE").equalsIgnoreCase("MAP"))) {
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351 record = new ChukwaRecord();
352 key = new ChukwaRecordKey();
353 buildGenericRecord(record, null, Long
354 .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
355 extractCounters(record, keys.get("COUNTERS"));
356 record.add("JOBID", keys.get("JOBID"));
357 record.add("TASKID", keys.get("TASKID"));
358 record.add("TASK_TYPE", keys.get("TASK_TYPE"));
359
360
361 output.collect(key, record);
362
363 }
364 } catch (IOException e) {
365 log.warn("Unable to collect output in JobLogHistoryProcessor ["
366 + recordEntry + "]", e);
367 e.printStackTrace();
368 throw e;
369 }
370
371 }
372
373 protected void extractCounters(ChukwaRecord record, String input) {
374
375 String[] data = null;
376 String[] counters = input.split(",");
377
378 for (String counter : counters) {
379 data = counter.split(":");
380 record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
381 .toUpperCase(), data[1]);
382 }
383 }
384
385 public String getDataType() {
386 return Log4jJobHistoryProcessor.recordType;
387 }
388 }