This project has retired. For details please refer to its Attic page.
JobLogHistoryProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map.Entry;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.extraction.engine.Record;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  
37  @Table(name="Mapreduce",columnFamily="JobLogHistory")
38  public class JobLogHistoryProcessor extends AbstractProcessor {
39    static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
40  
41    private static final String recordType = "JobLogHistory";
42    private static final String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
43    private Pattern ip = null;
44  
45    private Matcher internalMatcher = null;
46  
47    public JobLogHistoryProcessor() {
48      ip = Pattern.compile(internalRegex);
49      internalMatcher = ip.matcher("-");
50    }
51  
52    @Override
53    protected void parse(String recordEntry,
54        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55        throws Throwable {
56  
57      // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
58      // + chunk.getDataType() + "]");
59  
60      try {
61  
62        HashMap<String, String> keys = new HashMap<String, String>();
63        ChukwaRecord record = null;
64  
65        int firstSep = recordEntry.indexOf(" ");
66        keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
67        // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
68        // + keys.get("RECORD_TYPE") + "]");
69  
70        String body = recordEntry.substring(firstSep);
71  
72        internalMatcher.reset(body);
73  
74        // String fieldName = null;
75        // String fieldValue = null;
76  
77        while (internalMatcher.matches()) {
78  
79          keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
80              .trim());
81  
82          // TODO Remove debug info before production
83          // fieldName = internalMatcher.group(1).trim();
84          // fieldValue = internalMatcher.group(2).trim();
85          // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
86          // "][" + fieldValue +"]" );
87          // log.info("EOL : [" + internalMatcher.group(3) + "]" );
88          internalMatcher.reset(internalMatcher.group(3));
89        }
90  
91        if (!keys.containsKey("JOBID")) {
92          // Extract JobID from taskID
93          // JOBID = "job_200804210403_0005"
94          // TASKID = "tip_200804210403_0005_m_000018"
95          String jobId = keys.get("TASKID");
96          int idx1 = jobId.indexOf('_', 0);
97          int idx2 = jobId.indexOf('_', idx1 + 1);
98          idx2 = jobId.indexOf('_', idx2 + 1);
99          keys.put("JOBID", jobId.substring(idx1 + 1, idx2));
100         // log.info("JobLogHistoryProcessor Add field: [JOBID]["
101         // + keys.get("JOBID") + "]");
102       } else {
103         String jobId = keys.get("JOBID").replace("_", "").substring(3);
104         keys.put("JOBID", jobId);
105       }
106       // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
107       // keys.containsKey("SUBMIT_TIME"))
108       // {
109       // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
110       // USER="userxxx"
111       // // SUBMIT_TIME="1208760436751"
112       // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
113       //					
114       //					
115       // }
116       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
117       // keys.containsKey("LAUNCH_TIME"))
118       // {
119       // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
120       // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
121       //					
122       // }
123       // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
124       // keys.containsKey("FINISH_TIME"))
125       // {
126       // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
127       // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
128       // FAILED_MAPS="0" FAILED_REDUCES="0"
129       // // COUNTERS="File Systems.Local bytes read:1735053407244,File
130       // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
131       // read:801605644910,File Systems.HDFS bytes written:44135800,
132       // // Job Counters .Launched map tasks:5912,Job Counters .Launched
133       // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
134       // Counters .Rack-local map tasks:316,Map-Reduce Framework.
135       // // Map input records:9410696067,Map-Reduce Framework.Map output
136       // records:9410696067,Map-Reduce Framework.Map input
137       // bytes:801599188816,Map-Reduce Framework.Map output
138       // bytes:784427968116,
139       // // Map-Reduce Framework.Combine input records:0,Map-Reduce
140       // Framework.Combine output records:0,Map-Reduce Framework.Reduce
141       // input groups:477265,Map-Reduce Framework.Reduce input
142       // records:739000,
143       // // Map-Reduce Framework.Reduce output records:739000"
144       //					
145       // }
146       // else
147       if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
148           && keys.containsKey("START_TIME")) {
149         // MapAttempt TASK_TYPE="MAP"
150         // TASKID="tip_200804210403_0005_m_000018"
151         // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
152         // START_TIME="1208760437531"
153         // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
154 
155         key = new ChukwaRecordKey();
156         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
157             + keys.get("START_TIME"));
158         key.setReduceType("JobLogHistoryReduceProcessor");
159         record = new ChukwaRecord();
160         record.setTime(Long.parseLong(keys.get("START_TIME")));
161         record.add("JOBID", keys.get("JOBID"));
162         record.add("START_TIME", keys.get("START_TIME"));
163         record.add(Record.tagsField, chunk.getTags());
164         // log.info("JobLogHist/Map/S");
165         output.collect(key, record);
166 
167       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
168           && keys.containsKey("FINISH_TIME")) {
169         // MapAttempt TASK_TYPE="MAP"
170         // TASKID="tip_200804210403_0005_m_005494"
171         // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
172         // TASK_STATUS="SUCCESS"
173         // FINISH_TIME="1208760624124"
174         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
175 
176         key = new ChukwaRecordKey();
177         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
178             + keys.get("FINISH_TIME"));
179         key.setReduceType("JobLogHistoryReduceProcessor");
180         record = new ChukwaRecord();
181         record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
182         record.add("JOBID", keys.get("JOBID"));
183         record.add("FINISH_TIME", keys.get("FINISH_TIME"));
184         record.add(Record.tagsField, chunk.getTags());
185         // log.info("JobLogHist/Map/E");
186         output.collect(key, record);
187       }
188 
189       else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
190           && keys.containsKey("START_TIME")) {
191         // ReduceAttempt TASK_TYPE="REDUCE"
192         // TASKID="tip_200804210403_0005_r_000138"
193         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
194         // START_TIME="1208760454885"
195         // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
196 
197         key = new ChukwaRecordKey();
198         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
199             + keys.get("START_TIME"));
200         key.setReduceType("JobLogHistoryReduceProcessor");
201         record = new ChukwaRecord();
202         record.setTime(Long.parseLong(keys.get("START_TIME")));
203         record.add("JOBID", keys.get("JOBID"));
204         record.add("START_TIME", keys.get("START_TIME"));
205         record.add(Record.tagsField, chunk.getTags());
206         // log.info("JobLogHist/SHUFFLE/S");
207         output.collect(key, record);
208 
209       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
210           && keys.containsKey("FINISH_TIME")) {
211         // ReduceAttempt TASK_TYPE="REDUCE"
212         // TASKID="tip_200804210403_0005_r_000138"
213         // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
214         // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
215         // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
216         // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
217 
218         key = new ChukwaRecordKey();
219         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
220             + keys.get("SHUFFLE_FINISHED"));
221         key.setReduceType("JobLogHistoryReduceProcessor");
222         record = new ChukwaRecord();
223         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
224         record.add("JOBID", keys.get("JOBID"));
225         record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
226         record.add(Record.tagsField, chunk.getTags());
227         // log.info("JobLogHist/SHUFFLE/E");
228         output.collect(key, record);
229 
230         // SORT
231         key = new ChukwaRecordKey();
232         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
233             + keys.get("SHUFFLE_FINISHED"));
234         key.setReduceType("JobLogHistoryReduceProcessor");
235         record = new ChukwaRecord();
236         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
237         record.add("JOBID", keys.get("JOBID"));
238         record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
239         record.add(Record.tagsField, chunk.getTags());
240         // log.info("JobLogHist/SORT/S");
241         output.collect(key, record);
242 
243         key = new ChukwaRecordKey();
244         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
245             + keys.get("SORT_FINISHED"));
246         key.setReduceType("JobLogHistoryReduceProcessor");
247         record = new ChukwaRecord();
248         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
249         record.add("JOBID", keys.get("JOBID"));
250         record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
251         record.add(Record.tagsField, chunk.getTags());
252         // log.info("JobLogHist/SORT/E");
253         output.collect(key, record);
254 
255         // Reduce
256         key = new ChukwaRecordKey();
257         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
258             + keys.get("SORT_FINISHED"));
259         key.setReduceType("JobLogHistoryReduceProcessor");
260         record = new ChukwaRecord();
261         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
262         record.add("JOBID", keys.get("JOBID"));
263         record.add("START_TIME", keys.get("SORT_FINISHED"));
264         record.add(Record.tagsField, chunk.getTags());
265         // log.info("JobLogHist/REDUCE/S");
266         output.collect(key, record);
267 
268         key = new ChukwaRecordKey();
269         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
270             + keys.get("FINISH_TIME"));
271         key.setReduceType("JobLogHistoryReduceProcessor");
272         record = new ChukwaRecord();
273         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
274         record.add("JOBID", keys.get("JOBID"));
275         record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
276         record.add(Record.tagsField, chunk.getTags());
277         // log.info("JobLogHist/REDUCE/E");
278         output.collect(key, record);
279 
280       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) {
281         // 1
282         // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx"
283         // SUBMIT_TIME="1208760906812"
284         // JOBCONF=
285         // "/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
286 
287         // 2
288         // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816"
289         // TOTAL_MAPS="3" TOTAL_REDUCES="7"
290 
291         // 3
292         // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
293         // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
294         // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
295         // COUNTERS="File Systems.Local bytes read:1735053407244,File
296         // Systems.Local bytes written:2610106384012,File Systems.HDFS
297         // bytes read:801605644910,File Systems.HDFS bytes
298         // written:44135800,
299         // Job Counters .Launched map tasks:5912,Job Counters .Launched
300         // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
301         // Counters .Rack-local map tasks:316,Map-Reduce Framework.
302         // Map input records:9410696067,Map-Reduce Framework.Map output
303         // records:9410696067,Map-Reduce Framework.Map input
304         // bytes:801599188816,Map-Reduce Framework.Map output
305         // bytes:784427968116,
306         // Map-Reduce Framework.Combine input records:0,Map-Reduce
307         // Framework.Combine output records:0,Map-Reduce
308         // Framework.Reduce input groups:477265,Map-Reduce
309         // Framework.Reduce input records:739000,
310         // Map-Reduce Framework.Reduce output records:739000"
311 
312         record = new ChukwaRecord();
313         key = new ChukwaRecordKey();
314         buildGenericRecord(record, null, Long
315             .parseLong(keys.get("FINISH_TIME")), "MRJob");
316         if (keys.containsKey("COUNTERS")) {
317           extractCounters(record, keys.get("COUNTERS"));
318         }
319 
320         key = new ChukwaRecordKey();
321         key.setKey("MRJob/" + keys.get("JOBID"));
322         key.setReduceType("MRJobReduceProcessor");
323 
324         record = new ChukwaRecord();
325         record.add(Record.tagsField, chunk.getTags());
326         if (keys.containsKey("SUBMIT_TIME")) {
327           record.setTime(Long.parseLong(keys.get("SUBMIT_TIME")));
328         } else if (keys.containsKey("LAUNCH_TIME")) {
329           record.setTime(Long.parseLong(keys.get("LAUNCH_TIME")));
330         } else if (keys.containsKey("FINISH_TIME")) {
331           record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
332         }
333 
334         for(Entry<String, String> entry : keys.entrySet()) {
335           record.add(entry.getKey(), entry.getValue());
336         }
337 
338         output.collect(key, record);
339       }
340 
341       if (keys.containsKey("TASK_TYPE")
342           && keys.containsKey("COUNTERS")
343           && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
344               "TASK_TYPE").equalsIgnoreCase("MAP"))) {
345         // MAP
346         // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
347         // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
348         // COUNTERS="File Systems.Local bytes read:159265655,File
349         // Systems.Local bytes written:318531310,
350         // File Systems.HDFS bytes read:145882417,Map-Reduce
351         // Framework.Map input records:1706604,
352         // Map-Reduce Framework.Map output records:1706604,Map-Reduce
353         // Framework.Map input bytes:145882057,
354         // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
355         // Framework.Combine input records:0,Map-Reduce
356         // Framework.Combine output records:0"
357 
358         // REDUCE
359         // Task TASKID="tip_200804210403_0005_r_000524"
360         // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
361         // FINISH_TIME="1208760877072"
362         // COUNTERS="File Systems.Local bytes read:1179319677,File
363         // Systems.Local bytes written:1184474889,File Systems.HDFS
364         // bytes written:59021,
365         // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
366         // Framework.Reduce input records:1000,Map-Reduce
367         // Framework.Reduce output records:1000"
368 
369         record = new ChukwaRecord();
370         key = new ChukwaRecordKey();
371         buildGenericRecord(record, null, Long
372             .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
373         extractCounters(record, keys.get("COUNTERS"));
374         record.add("JOBID", keys.get("JOBID"));
375         record.add("TASKID", keys.get("TASKID"));
376         record.add("TASK_TYPE", keys.get("TASK_TYPE"));
377         record.add(Record.tagsField, chunk.getTags());
378         // log.info("MR_Graph +1");
379         output.collect(key, record);
380 
381       }
382     } catch (IOException e) {
383       log.warn("Unable to collect output in JobLogHistoryProcessor ["
384           + recordEntry + "]", e);
385       e.printStackTrace();
386       throw e;
387     }
388 
389   }
390 
391   protected void extractCounters(ChukwaRecord record, String input) {
392 
393     String[] data = null;
394     String[] counters = input.split(",");
395 
396     for (String counter : counters) {
397       data = counter.split(":");
398       record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
399           .toUpperCase(), data[1]);
400     }
401   }
402 
403   public String getDataType() {
404     return JobLogHistoryProcessor.recordType;
405   }
406 }