This project has retired. For details please refer to its Attic page.
Log4jJobHistoryProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.util.Hashtable;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28  import org.apache.hadoop.chukwa.extraction.engine.Record;
29  import org.apache.hadoop.mapred.OutputCollector;
30  import org.apache.hadoop.mapred.Reporter;
31  import org.apache.log4j.Logger;
32  
33  public class Log4jJobHistoryProcessor extends AbstractProcessor {
34    static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
35  
36    private static final String recordType = "JobLogHistory";
37    private static String internalRegex = null;
38    private static Pattern ip = null;
39  
40    private Matcher internalMatcher = null;
41  
42    public Log4jJobHistoryProcessor() {
43      internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
44      ip = Pattern.compile(internalRegex);
45      internalMatcher = ip.matcher("-");
46    }
47  
48    @Override
49    protected void parse(String recordEntry,
50        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51        throws Throwable {
52  
53      // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
54      // + chunk.getDataType() + "]");
55  
56      try {
57  
58        // String dStr = recordEntry.substring(0, 23);
59        int start = 24;
60        int idx = recordEntry.indexOf(' ', start);
61        // String level = recordEntry.substring(start, idx);
62        start = idx + 1;
63        idx = recordEntry.indexOf(' ', start);
64        // String className = recordEntry.substring(start, idx-1);
65        String body = recordEntry.substring(idx + 1);
66  
67        Hashtable<String, String> keys = new Hashtable<String, String>();
68        ChukwaRecord record = null;
69  
70        int firstSep = body.indexOf(" ");
71        keys.put("RECORD_TYPE", body.substring(0, firstSep));
72        // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
73        // + keys.get("RECORD_TYPE") + "]");
74  
75        body = body.substring(firstSep);
76  
77        internalMatcher.reset(body);
78  
79        // String fieldName = null;
80        // String fieldValue = null;
81  
82        while (internalMatcher.matches()) {
83  
84          keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
85              .trim());
86  
87          // TODO Remove debug info before production
88          // fieldName = internalMatcher.group(1).trim();
89          // fieldValue = internalMatcher.group(2).trim();
90          // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
91          // "][" + fieldValue +"]" );
92          // log.info("EOL : [" + internalMatcher.group(3) + "]" );
93          internalMatcher.reset(internalMatcher.group(3));
94        }
95  
96        if (!keys.containsKey("JOBID")) {
97          // Extract JobID from taskID
98          // JOBID = "job_200804210403_0005"
99          // TASKID = "tip_200804210403_0005_m_000018"
100         String jobId = keys.get("TASKID");
101         int idx1 = jobId.indexOf('_', 0);
102         int idx2 = jobId.indexOf('_', idx1 + 1);
103         idx2 = jobId.indexOf('_', idx2 + 1);
104         keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
105         // log.info("JobLogHistoryProcessor Add field: [JOBID]["
106         // + keys.get("JOBID") + "]");
107       }
108 
109       // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
110       // keys.containsKey("SUBMIT_TIME"))
111       // {
112       // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
113       // USER="userxxx"
114       // // SUBMIT_TIME="1208760436751"
115       // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
116       //					
117       //					
118       // }
119       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
120       // keys.containsKey("LAUNCH_TIME"))
121       // {
122       // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
123       // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
124       //					
125       // }
126       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
127       // keys.containsKey("FINISH_TIME"))
128       // {
129       // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
130       // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
131       // FAILED_MAPS="0" FAILED_REDUCES="0"
132       // // COUNTERS="File Systems.Local bytes read:1735053407244,File
133       // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
134       // read:801605644910,File Systems.HDFS bytes written:44135800,
135       // // Job Counters .Launched map tasks:5912,Job Counters .Launched
136       // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
137       // Counters .Rack-local map tasks:316,Map-Reduce Framework.
138       // // Map input records:9410696067,Map-Reduce Framework.Map output
139       // records:9410696067,Map-Reduce Framework.Map input
140       // bytes:801599188816,Map-Reduce Framework.Map output
141       // bytes:784427968116,
142       // // Map-Reduce Framework.Combine input records:0,Map-Reduce
143       // Framework.Combine output records:0,Map-Reduce Framework.Reduce
144       // input groups:477265,Map-Reduce Framework.Reduce input
145       // records:739000,
146       // // Map-Reduce Framework.Reduce output records:739000"
147       //					
148       // }
149       // else
150       if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
151           && keys.containsKey("START_TIME")) {
152         // MapAttempt TASK_TYPE="MAP"
153         // TASKID="tip_200804210403_0005_m_000018"
154         // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
155         // START_TIME="1208760437531"
156         // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
157 
158         key = new ChukwaRecordKey();
159         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
160             + keys.get("START_TIME"));
161         key.setReduceType("JobLogHistoryReduceProcessor");
162         record = new ChukwaRecord();
163         record.setTime(Long.parseLong(keys.get("START_TIME")));
164         record.add("JOBID", keys.get("JOBID"));
165         record.add("START_TIME", keys.get("START_TIME"));
166         record.add(Record.tagsField, chunk.getTags());
167         // log.info("JobLogHist/Map/S");
168         output.collect(key, record);
169 
170       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
171           && keys.containsKey("FINISH_TIME")) {
172         // MapAttempt TASK_TYPE="MAP"
173         // TASKID="tip_200804210403_0005_m_005494"
174         // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
175         // TASK_STATUS="SUCCESS"
176         // FINISH_TIME="1208760624124"
177         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
178 
179         key = new ChukwaRecordKey();
180         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
181             + keys.get("FINISH_TIME"));
182         key.setReduceType("JobLogHistoryReduceProcessor");
183         record = new ChukwaRecord();
184         record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
185         record.add("JOBID", keys.get("JOBID"));
186         record.add("FINISH_TIME", keys.get("FINISH_TIME"));
187         record.add(Record.tagsField, chunk.getTags());
188         // log.info("JobLogHist/Map/E");
189         output.collect(key, record);
190       }
191 
192       else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
193           && keys.containsKey("START_TIME")) {
194         // ReduceAttempt TASK_TYPE="REDUCE"
195         // TASKID="tip_200804210403_0005_r_000138"
196         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
197         // START_TIME="1208760454885"
198         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
199 
200         key = new ChukwaRecordKey();
201         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
202             + keys.get("START_TIME"));
203         key.setReduceType("JobLogHistoryReduceProcessor");
204         record = new ChukwaRecord();
205         record.setTime(Long.parseLong(keys.get("START_TIME")));
206         record.add("JOBID", keys.get("JOBID"));
207         record.add("START_TIME", keys.get("START_TIME"));
208         record.add(Record.tagsField, chunk.getTags());
209         // log.info("JobLogHist/SHUFFLE/S");
210         output.collect(key, record);
211 
212       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
213           && keys.containsKey("FINISH_TIME")) {
214         // ReduceAttempt TASK_TYPE="REDUCE"
215         // TASKID="tip_200804210403_0005_r_000138"
216         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
217         // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
218         // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
219         // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
220 
221         key = new ChukwaRecordKey();
222         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
223             + keys.get("SHUFFLE_FINISHED"));
224         key.setReduceType("JobLogHistoryReduceProcessor");
225         record = new ChukwaRecord();
226         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
227         record.add("JOBID", keys.get("JOBID"));
228         record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
229         record.add(Record.tagsField, chunk.getTags());
230         // log.info("JobLogHist/SHUFFLE/E");
231         output.collect(key, record);
232 
233         // SORT
234         key = new ChukwaRecordKey();
235         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
236             + keys.get("SHUFFLE_FINISHED"));
237         key.setReduceType("JobLogHistoryReduceProcessor");
238         record = new ChukwaRecord();
239         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
240         record.add("JOBID", keys.get("JOBID"));
241         record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
242         record.add(Record.tagsField, chunk.getTags());
243         // log.info("JobLogHist/SORT/S");
244         output.collect(key, record);
245 
246         key = new ChukwaRecordKey();
247         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
248             + keys.get("SORT_FINISHED"));
249         key.setReduceType("JobLogHistoryReduceProcessor");
250         record = new ChukwaRecord();
251         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
252         record.add("JOBID", keys.get("JOBID"));
253         record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
254         record.add(Record.tagsField, chunk.getTags());
255         // log.info("JobLogHist/SORT/E");
256         output.collect(key, record);
257 
258         // Reduce
259         key = new ChukwaRecordKey();
260         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
261             + keys.get("SORT_FINISHED"));
262         key.setReduceType("JobLogHistoryReduceProcessor");
263         record = new ChukwaRecord();
264         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
265         record.add("JOBID", keys.get("JOBID"));
266         record.add("START_TIME", keys.get("SORT_FINISHED"));
267         record.add(Record.tagsField, chunk.getTags());
268         // log.info("JobLogHist/REDUCE/S");
269         output.collect(key, record);
270 
271         key = new ChukwaRecordKey();
272         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
273             + keys.get("FINISH_TIME"));
274         key.setReduceType("JobLogHistoryReduceProcessor");
275         record = new ChukwaRecord();
276         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
277         record.add("JOBID", keys.get("JOBID"));
278         record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
279         record.add(Record.tagsField, chunk.getTags());
280         // log.info("JobLogHist/REDUCE/E");
281         output.collect(key, record);
282 
283       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
284           && keys.containsKey("COUNTERS")) {
285         // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
286         // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
287         // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
288         // COUNTERS="File Systems.Local bytes read:1735053407244,File
289         // Systems.Local bytes written:2610106384012,File Systems.HDFS
290         // bytes read:801605644910,File Systems.HDFS bytes
291         // written:44135800,
292         // Job Counters .Launched map tasks:5912,Job Counters .Launched
293         // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
294         // Counters .Rack-local map tasks:316,Map-Reduce Framework.
295         // Map input records:9410696067,Map-Reduce Framework.Map output
296         // records:9410696067,Map-Reduce Framework.Map input
297         // bytes:801599188816,Map-Reduce Framework.Map output
298         // bytes:784427968116,
299         // Map-Reduce Framework.Combine input records:0,Map-Reduce
300         // Framework.Combine output records:0,Map-Reduce
301         // Framework.Reduce input groups:477265,Map-Reduce
302         // Framework.Reduce input records:739000,
303         // Map-Reduce Framework.Reduce output records:739000"
304 
305         record = new ChukwaRecord();
306         key = new ChukwaRecordKey();
307         buildGenericRecord(record, null, Long
308             .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
309         extractCounters(record, keys.get("COUNTERS"));
310 
311         String jobId = keys.get("JOBID").replace("_", "").substring(3);
312         record.add("JobId", "" + jobId);
313 
314         // FIXME validate this when HodId will be available
315         if (keys.containsKey("HODID")) {
316           record.add("HodId", keys.get("HODID"));
317         }
318 
319         // log.info("MRJobCounters +1");
320         output.collect(key, record);
321       }
322 
323       if (keys.containsKey("TASK_TYPE")
324           && keys.containsKey("COUNTERS")
325           && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
326               "TASK_TYPE").equalsIgnoreCase("MAP"))) {
327         // MAP
328         // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
329         // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
330         // COUNTERS="File Systems.Local bytes read:159265655,File
331         // Systems.Local bytes written:318531310,
332         // File Systems.HDFS bytes read:145882417,Map-Reduce
333         // Framework.Map input records:1706604,
334         // Map-Reduce Framework.Map output records:1706604,Map-Reduce
335         // Framework.Map input bytes:145882057,
336         // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
337         // Framework.Combine input records:0,Map-Reduce
338         // Framework.Combine output records:0"
339 
340         // REDUCE
341         // Task TASKID="tip_200804210403_0005_r_000524"
342         // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
343         // FINISH_TIME="1208760877072"
344         // COUNTERS="File Systems.Local bytes read:1179319677,File
345         // Systems.Local bytes written:1184474889,File Systems.HDFS
346         // bytes written:59021,
347         // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
348         // Framework.Reduce input records:1000,Map-Reduce
349         // Framework.Reduce output records:1000"
350 
351         record = new ChukwaRecord();
352         key = new ChukwaRecordKey();
353         buildGenericRecord(record, null, Long
354             .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
355         extractCounters(record, keys.get("COUNTERS"));
356         record.add("JOBID", keys.get("JOBID"));
357         record.add("TASKID", keys.get("TASKID"));
358         record.add("TASK_TYPE", keys.get("TASK_TYPE"));
359 
360         // log.info("MR_Graph +1");
361         output.collect(key, record);
362 
363       }
364     } catch (IOException e) {
365       log.warn("Unable to collect output in JobLogHistoryProcessor ["
366           + recordEntry + "]", e);
367       e.printStackTrace();
368       throw e;
369     }
370 
371   }
372 
373   protected void extractCounters(ChukwaRecord record, String input) {
374 
375     String[] data = null;
376     String[] counters = input.split(",");
377 
378     for (String counter : counters) {
379       data = counter.split(":");
380       record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
381           .toUpperCase(), data[1]);
382     }
383   }
384 
385   public String getDataType() {
386     return Log4jJobHistoryProcessor.recordType;
387   }
388 }