This project has retired. For details please refer to its
Attic page.
JobLogHistoryProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.regex.Matcher;
26 import java.util.regex.Pattern;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.extraction.engine.Record;
32 import org.apache.hadoop.mapred.OutputCollector;
33 import org.apache.hadoop.mapred.Reporter;
34 import org.apache.log4j.Logger;
35
36 @Table(name="Mapreduce",columnFamily="JobLogHistory")
37 public class JobLogHistoryProcessor extends AbstractProcessor {
38 static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
39
40 private static final String recordType = "JobLogHistory";
41 private static String internalRegex = null;
42 private static Pattern ip = null;
43
44 private Matcher internalMatcher = null;
45
46 public JobLogHistoryProcessor() {
47 internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
48 ip = Pattern.compile(internalRegex);
49 internalMatcher = ip.matcher("-");
50 }
51
52 @Override
53 protected void parse(String recordEntry,
54 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55 throws Throwable {
56
57
58
59
60 try {
61
62 HashMap<String, String> keys = new HashMap<String, String>();
63 ChukwaRecord record = null;
64
65 int firstSep = recordEntry.indexOf(" ");
66 keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
67
68
69
70 String body = recordEntry.substring(firstSep);
71
72 internalMatcher.reset(body);
73
74
75
76
77 while (internalMatcher.matches()) {
78
79 keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
80 .trim());
81
82
83
84
85
86
87
88 internalMatcher.reset(internalMatcher.group(3));
89 }
90
91 if (!keys.containsKey("JOBID")) {
92
93
94
95 String jobId = keys.get("TASKID");
96 int idx1 = jobId.indexOf('_', 0);
97 int idx2 = jobId.indexOf('_', idx1 + 1);
98 idx2 = jobId.indexOf('_', idx2 + 1);
99 keys.put("JOBID", jobId.substring(idx1 + 1, idx2));
100
101
102 } else {
103 String jobId = keys.get("JOBID").replace("_", "").substring(3);
104 keys.put("JOBID", jobId);
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
148 && keys.containsKey("START_TIME")) {
149
150
151
152
153
154
155 key = new ChukwaRecordKey();
156 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
157 + keys.get("START_TIME"));
158 key.setReduceType("JobLogHistoryReduceProcessor");
159 record = new ChukwaRecord();
160 record.setTime(Long.parseLong(keys.get("START_TIME")));
161 record.add("JOBID", keys.get("JOBID"));
162 record.add("START_TIME", keys.get("START_TIME"));
163 record.add(Record.tagsField, chunk.getTags());
164
165 output.collect(key, record);
166
167 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
168 && keys.containsKey("FINISH_TIME")) {
169
170
171
172
173
174
175
176 key = new ChukwaRecordKey();
177 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
178 + keys.get("FINISH_TIME"));
179 key.setReduceType("JobLogHistoryReduceProcessor");
180 record = new ChukwaRecord();
181 record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
182 record.add("JOBID", keys.get("JOBID"));
183 record.add("FINISH_TIME", keys.get("FINISH_TIME"));
184 record.add(Record.tagsField, chunk.getTags());
185
186 output.collect(key, record);
187 }
188
189 else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
190 && keys.containsKey("START_TIME")) {
191
192
193
194
195
196
197 key = new ChukwaRecordKey();
198 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
199 + keys.get("START_TIME"));
200 key.setReduceType("JobLogHistoryReduceProcessor");
201 record = new ChukwaRecord();
202 record.setTime(Long.parseLong(keys.get("START_TIME")));
203 record.add("JOBID", keys.get("JOBID"));
204 record.add("START_TIME", keys.get("START_TIME"));
205 record.add(Record.tagsField, chunk.getTags());
206
207 output.collect(key, record);
208
209 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
210 && keys.containsKey("FINISH_TIME")) {
211
212
213
214
215
216
217
218 key = new ChukwaRecordKey();
219 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
220 + keys.get("SHUFFLE_FINISHED"));
221 key.setReduceType("JobLogHistoryReduceProcessor");
222 record = new ChukwaRecord();
223 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
224 record.add("JOBID", keys.get("JOBID"));
225 record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
226 record.add(Record.tagsField, chunk.getTags());
227
228 output.collect(key, record);
229
230
231 key = new ChukwaRecordKey();
232 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
233 + keys.get("SHUFFLE_FINISHED"));
234 key.setReduceType("JobLogHistoryReduceProcessor");
235 record = new ChukwaRecord();
236 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
237 record.add("JOBID", keys.get("JOBID"));
238 record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
239 record.add(Record.tagsField, chunk.getTags());
240
241 output.collect(key, record);
242
243 key = new ChukwaRecordKey();
244 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
245 + keys.get("SORT_FINISHED"));
246 key.setReduceType("JobLogHistoryReduceProcessor");
247 record = new ChukwaRecord();
248 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
249 record.add("JOBID", keys.get("JOBID"));
250 record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
251 record.add(Record.tagsField, chunk.getTags());
252
253 output.collect(key, record);
254
255
256 key = new ChukwaRecordKey();
257 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
258 + keys.get("SORT_FINISHED"));
259 key.setReduceType("JobLogHistoryReduceProcessor");
260 record = new ChukwaRecord();
261 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
262 record.add("JOBID", keys.get("JOBID"));
263 record.add("START_TIME", keys.get("SORT_FINISHED"));
264 record.add(Record.tagsField, chunk.getTags());
265
266 output.collect(key, record);
267
268 key = new ChukwaRecordKey();
269 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
270 + keys.get("FINISH_TIME"));
271 key.setReduceType("JobLogHistoryReduceProcessor");
272 record = new ChukwaRecord();
273 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
274 record.add("JOBID", keys.get("JOBID"));
275 record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
276 record.add(Record.tagsField, chunk.getTags());
277
278 output.collect(key, record);
279
280 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) {
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 record = new ChukwaRecord();
313 key = new ChukwaRecordKey();
314 buildGenericRecord(record, null, Long
315 .parseLong(keys.get("FINISH_TIME")), "MRJob");
316 if (keys.containsKey("COUNTERS")) {
317 extractCounters(record, keys.get("COUNTERS"));
318 }
319
320 key = new ChukwaRecordKey();
321 key.setKey("MRJob/" + keys.get("JOBID"));
322 key.setReduceType("MRJobReduceProcessor");
323
324 record = new ChukwaRecord();
325 record.add(Record.tagsField, chunk.getTags());
326 if (keys.containsKey("SUBMIT_TIME")) {
327 record.setTime(Long.parseLong(keys.get("SUBMIT_TIME")));
328 } else if (keys.containsKey("LAUNCH_TIME")) {
329 record.setTime(Long.parseLong(keys.get("LAUNCH_TIME")));
330 } else if (keys.containsKey("FINISH_TIME")) {
331 record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
332 }
333
334 Iterator<String> it = keys.keySet().iterator();
335 while (it.hasNext()) {
336 String field = it.next();
337 record.add(field, keys.get(field));
338 }
339
340 output.collect(key, record);
341 }
342
343 if (keys.containsKey("TASK_TYPE")
344 && keys.containsKey("COUNTERS")
345 && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
346 "TASK_TYPE").equalsIgnoreCase("MAP"))) {
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 record = new ChukwaRecord();
372 key = new ChukwaRecordKey();
373 buildGenericRecord(record, null, Long
374 .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
375 extractCounters(record, keys.get("COUNTERS"));
376 record.add("JOBID", keys.get("JOBID"));
377 record.add("TASKID", keys.get("TASKID"));
378 record.add("TASK_TYPE", keys.get("TASK_TYPE"));
379 record.add(Record.tagsField, chunk.getTags());
380
381 output.collect(key, record);
382
383 }
384 } catch (IOException e) {
385 log.warn("Unable to collect output in JobLogHistoryProcessor ["
386 + recordEntry + "]", e);
387 e.printStackTrace();
388 throw e;
389 }
390
391 }
392
393 protected void extractCounters(ChukwaRecord record, String input) {
394
395 String[] data = null;
396 String[] counters = input.split(",");
397
398 for (String counter : counters) {
399 data = counter.split(":");
400 record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
401 .toUpperCase(), data[1]);
402 }
403 }
404
405 public String getDataType() {
406 return JobLogHistoryProcessor.recordType;
407 }
408 }