1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.util.Hashtable;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.Record;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.Reporter;
31 import org.apache.log4j.Logger;
32
33 public class Log4jJobHistoryProcessor extends AbstractProcessor {
34 static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
35
36 private static final String recordType = "JobLogHistory";
37 private static String internalRegex = null;
38 private static Pattern ip = null;
39
40 private Matcher internalMatcher = null;
41
42 public Log4jJobHistoryProcessor() {
43 internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
44 ip = Pattern.compile(internalRegex);
45 internalMatcher = ip.matcher("-");
46 }
47
48 @Override
49 protected void parse(String recordEntry,
50 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51 throws Throwable {
52
53 // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
54 // + chunk.getDataType() + "]");
55
56 try {
57
58 // String dStr = recordEntry.substring(0, 23);
59 int start = 24;
60 int idx = recordEntry.indexOf(' ', start);
61 // String level = recordEntry.substring(start, idx);
62 start = idx + 1;
63 idx = recordEntry.indexOf(' ', start);
64 // String className = recordEntry.substring(start, idx-1);
65 String body = recordEntry.substring(idx + 1);
66
67 Hashtable<String, String> keys = new Hashtable<String, String>();
68 ChukwaRecord record = null;
69
70 int firstSep = body.indexOf(" ");
71 keys.put("RECORD_TYPE", body.substring(0, firstSep));
72 // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
73 // + keys.get("RECORD_TYPE") + "]");
74
75 body = body.substring(firstSep);
76
77 internalMatcher.reset(body);
78
79 // String fieldName = null;
80 // String fieldValue = null;
81
82 while (internalMatcher.matches()) {
83
84 keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
85 .trim());
86
87 // TODO Remove debug info before production
88 // fieldName = internalMatcher.group(1).trim();
89 // fieldValue = internalMatcher.group(2).trim();
90 // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
91 // "][" + fieldValue +"]" );
92 // log.info("EOL : [" + internalMatcher.group(3) + "]" );
93 internalMatcher.reset(internalMatcher.group(3));
94 }
95
96 if (!keys.containsKey("JOBID")) {
97 // Extract JobID from taskID
98 // JOBID = "job_200804210403_0005"
99 // TASKID = "tip_200804210403_0005_m_000018"
100 String jobId = keys.get("TASKID");
101 int idx1 = jobId.indexOf('_', 0);
102 int idx2 = jobId.indexOf('_', idx1 + 1);
103 idx2 = jobId.indexOf('_', idx2 + 1);
104 keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
105 // log.info("JobLogHistoryProcessor Add field: [JOBID]["
106 // + keys.get("JOBID") + "]");
107 }
108
109 // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
110 // keys.containsKey("SUBMIT_TIME"))
111 // {
112 // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
113 // USER="userxxx"
114 // // SUBMIT_TIME="1208760436751"
115 // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
116 //
117 //
118 // }
119 // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
120 // keys.containsKey("LAUNCH_TIME"))
121 // {
122 // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
123 // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
124 //
125 // }
126 // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
127 // keys.containsKey("FINISH_TIME"))
128 // {
129 // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
130 // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
131 // FAILED_MAPS="0" FAILED_REDUCES="0"
132 // // COUNTERS="File Systems.Local bytes read:1735053407244,File
133 // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
134 // read:801605644910,File Systems.HDFS bytes written:44135800,
135 // // Job Counters .Launched map tasks:5912,Job Counters .Launched
136 // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
137 // Counters .Rack-local map tasks:316,Map-Reduce Framework.
138 // // Map input records:9410696067,Map-Reduce Framework.Map output
139 // records:9410696067,Map-Reduce Framework.Map input
140 // bytes:801599188816,Map-Reduce Framework.Map output
141 // bytes:784427968116,
142 // // Map-Reduce Framework.Combine input records:0,Map-Reduce
143 // Framework.Combine output records:0,Map-Reduce Framework.Reduce
144 // input groups:477265,Map-Reduce Framework.Reduce input
145 // records:739000,
146 // // Map-Reduce Framework.Reduce output records:739000"
147 //
148 // }
149 // else
150 if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
151 && keys.containsKey("START_TIME")) {
152 // MapAttempt TASK_TYPE="MAP"
153 // TASKID="tip_200804210403_0005_m_000018"
154 // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
155 // START_TIME="1208760437531"
156 // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
157
158 key = new ChukwaRecordKey();
159 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
160 + keys.get("START_TIME"));
161 key.setReduceType("JobLogHistoryReduceProcessor");
162 record = new ChukwaRecord();
163 record.setTime(Long.parseLong(keys.get("START_TIME")));
164 record.add("JOBID", keys.get("JOBID"));
165 record.add("START_TIME", keys.get("START_TIME"));
166 record.add(Record.tagsField, chunk.getTags());
167 // log.info("JobLogHist/Map/S");
168 output.collect(key, record);
169
170 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
171 && keys.containsKey("FINISH_TIME")) {
172 // MapAttempt TASK_TYPE="MAP"
173 // TASKID="tip_200804210403_0005_m_005494"
174 // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
175 // TASK_STATUS="SUCCESS"
176 // FINISH_TIME="1208760624124"
177 // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
178
179 key = new ChukwaRecordKey();
180 key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
181 + keys.get("FINISH_TIME"));
182 key.setReduceType("JobLogHistoryReduceProcessor");
183 record = new ChukwaRecord();
184 record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
185 record.add("JOBID", keys.get("JOBID"));
186 record.add("FINISH_TIME", keys.get("FINISH_TIME"));
187 record.add(Record.tagsField, chunk.getTags());
188 // log.info("JobLogHist/Map/E");
189 output.collect(key, record);
190 }
191
192 else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
193 && keys.containsKey("START_TIME")) {
194 // ReduceAttempt TASK_TYPE="REDUCE"
195 // TASKID="tip_200804210403_0005_r_000138"
196 // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
197 // START_TIME="1208760454885"
198 // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
199
200 key = new ChukwaRecordKey();
201 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
202 + keys.get("START_TIME"));
203 key.setReduceType("JobLogHistoryReduceProcessor");
204 record = new ChukwaRecord();
205 record.setTime(Long.parseLong(keys.get("START_TIME")));
206 record.add("JOBID", keys.get("JOBID"));
207 record.add("START_TIME", keys.get("START_TIME"));
208 record.add(Record.tagsField, chunk.getTags());
209 // log.info("JobLogHist/SHUFFLE/S");
210 output.collect(key, record);
211
212 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
213 && keys.containsKey("FINISH_TIME")) {
214 // ReduceAttempt TASK_TYPE="REDUCE"
215 // TASKID="tip_200804210403_0005_r_000138"
216 // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
217 // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
218 // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
219 // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
220
221 key = new ChukwaRecordKey();
222 key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
223 + keys.get("SHUFFLE_FINISHED"));
224 key.setReduceType("JobLogHistoryReduceProcessor");
225 record = new ChukwaRecord();
226 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
227 record.add("JOBID", keys.get("JOBID"));
228 record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
229 record.add(Record.tagsField, chunk.getTags());
230 // log.info("JobLogHist/SHUFFLE/E");
231 output.collect(key, record);
232
233 // SORT
234 key = new ChukwaRecordKey();
235 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
236 + keys.get("SHUFFLE_FINISHED"));
237 key.setReduceType("JobLogHistoryReduceProcessor");
238 record = new ChukwaRecord();
239 record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
240 record.add("JOBID", keys.get("JOBID"));
241 record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
242 record.add(Record.tagsField, chunk.getTags());
243 // log.info("JobLogHist/SORT/S");
244 output.collect(key, record);
245
246 key = new ChukwaRecordKey();
247 key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
248 + keys.get("SORT_FINISHED"));
249 key.setReduceType("JobLogHistoryReduceProcessor");
250 record = new ChukwaRecord();
251 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
252 record.add("JOBID", keys.get("JOBID"));
253 record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
254 record.add(Record.tagsField, chunk.getTags());
255 // log.info("JobLogHist/SORT/E");
256 output.collect(key, record);
257
258 // Reduce
259 key = new ChukwaRecordKey();
260 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
261 + keys.get("SORT_FINISHED"));
262 key.setReduceType("JobLogHistoryReduceProcessor");
263 record = new ChukwaRecord();
264 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
265 record.add("JOBID", keys.get("JOBID"));
266 record.add("START_TIME", keys.get("SORT_FINISHED"));
267 record.add(Record.tagsField, chunk.getTags());
268 // log.info("JobLogHist/REDUCE/S");
269 output.collect(key, record);
270
271 key = new ChukwaRecordKey();
272 key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
273 + keys.get("FINISH_TIME"));
274 key.setReduceType("JobLogHistoryReduceProcessor");
275 record = new ChukwaRecord();
276 record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
277 record.add("JOBID", keys.get("JOBID"));
278 record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
279 record.add(Record.tagsField, chunk.getTags());
280 // log.info("JobLogHist/REDUCE/E");
281 output.collect(key, record);
282
283 } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
284 && keys.containsKey("COUNTERS")) {
285 // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
286 // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
287 // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
288 // COUNTERS="File Systems.Local bytes read:1735053407244,File
289 // Systems.Local bytes written:2610106384012,File Systems.HDFS
290 // bytes read:801605644910,File Systems.HDFS bytes
291 // written:44135800,
292 // Job Counters .Launched map tasks:5912,Job Counters .Launched
293 // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
294 // Counters .Rack-local map tasks:316,Map-Reduce Framework.
295 // Map input records:9410696067,Map-Reduce Framework.Map output
296 // records:9410696067,Map-Reduce Framework.Map input
297 // bytes:801599188816,Map-Reduce Framework.Map output
298 // bytes:784427968116,
299 // Map-Reduce Framework.Combine input records:0,Map-Reduce
300 // Framework.Combine output records:0,Map-Reduce
301 // Framework.Reduce input groups:477265,Map-Reduce
302 // Framework.Reduce input records:739000,
303 // Map-Reduce Framework.Reduce output records:739000"
304
305 record = new ChukwaRecord();
306 key = new ChukwaRecordKey();
307 buildGenericRecord(record, null, Long
308 .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
309 extractCounters(record, keys.get("COUNTERS"));
310
311 String jobId = keys.get("JOBID").replace("_", "").substring(3);
312 record.add("JobId", "" + jobId);
313
314 // FIXME validate this when HodId will be available
315 if (keys.containsKey("HODID")) {
316 record.add("HodId", keys.get("HODID"));
317 }
318
319 // log.info("MRJobCounters +1");
320 output.collect(key, record);
321 }
322
323 if (keys.containsKey("TASK_TYPE")
324 && keys.containsKey("COUNTERS")
325 && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
326 "TASK_TYPE").equalsIgnoreCase("MAP"))) {
327 // MAP
328 // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
329 // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
330 // COUNTERS="File Systems.Local bytes read:159265655,File
331 // Systems.Local bytes written:318531310,
332 // File Systems.HDFS bytes read:145882417,Map-Reduce
333 // Framework.Map input records:1706604,
334 // Map-Reduce Framework.Map output records:1706604,Map-Reduce
335 // Framework.Map input bytes:145882057,
336 // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
337 // Framework.Combine input records:0,Map-Reduce
338 // Framework.Combine output records:0"
339
340 // REDUCE
341 // Task TASKID="tip_200804210403_0005_r_000524"
342 // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
343 // FINISH_TIME="1208760877072"
344 // COUNTERS="File Systems.Local bytes read:1179319677,File
345 // Systems.Local bytes written:1184474889,File Systems.HDFS
346 // bytes written:59021,
347 // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
348 // Framework.Reduce input records:1000,Map-Reduce
349 // Framework.Reduce output records:1000"
350
351 record = new ChukwaRecord();
352 key = new ChukwaRecordKey();
353 buildGenericRecord(record, null, Long
354 .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
355 extractCounters(record, keys.get("COUNTERS"));
356 record.add("JOBID", keys.get("JOBID"));
357 record.add("TASKID", keys.get("TASKID"));
358 record.add("TASK_TYPE", keys.get("TASK_TYPE"));
359
360 // log.info("MR_Graph +1");
361 output.collect(key, record);
362
363 }
364 } catch (IOException e) {
365 log.warn("Unable to collect output in JobLogHistoryProcessor ["
366 + recordEntry + "]", e);
367 e.printStackTrace();
368 throw e;
369 }
370
371 }
372
373 protected void extractCounters(ChukwaRecord record, String input) {
374
375 String[] data = null;
376 String[] counters = input.split(",");
377
378 for (String counter : counters) {
379 data = counter.split(":");
380 record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
381 .toUpperCase(), data[1]);
382 }
383 }
384
385 public String getDataType() {
386 return Log4jJobHistoryProcessor.recordType;
387 }
388 }