This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map.Entry;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.extraction.engine.Record;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  
37  @Table(name="Mapreduce",columnFamily="JobLogHistory")
38  public class JobLogHistoryProcessor extends AbstractProcessor {
39    static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
40  
41    private static final String recordType = "JobLogHistory";
42    private static final String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
43    private Pattern ip = null;
44  
45    private Matcher internalMatcher = null;
46  
47    public JobLogHistoryProcessor() {
48      ip = Pattern.compile(internalRegex);
49      internalMatcher = ip.matcher("-");
50    }
51  
52    @Override
53    protected void parse(String recordEntry,
54        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55        throws Throwable {
56  
57      
58      
59  
60      try {
61  
62        HashMap<String, String> keys = new HashMap<String, String>();
63        ChukwaRecord record = null;
64  
65        int firstSep = recordEntry.indexOf(" ");
66        keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
67        
68        
69  
70        String body = recordEntry.substring(firstSep);
71  
72        internalMatcher.reset(body);
73  
74        
75        
76  
77        while (internalMatcher.matches()) {
78  
79          keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
80              .trim());
81  
82          
83          
84          
85          
86          
87          
88          internalMatcher.reset(internalMatcher.group(3));
89        }
90  
91        if (!keys.containsKey("JOBID")) {
92          
93          
94          
95          String jobId = keys.get("TASKID");
96          int idx1 = jobId.indexOf('_', 0);
97          int idx2 = jobId.indexOf('_', idx1 + 1);
98          idx2 = jobId.indexOf('_', idx2 + 1);
99          keys.put("JOBID", jobId.substring(idx1 + 1, idx2));
100         
101         
102       } else {
103         String jobId = keys.get("JOBID").replace("_", "").substring(3);
104         keys.put("JOBID", jobId);
105       }
106       
107       
108       
109       
110       
111       
112       
113       
114       
115       
116       
117       
118       
119       
120       
121       
122       
123       
124       
125       
126       
127       
128       
129       
130       
131       
132       
133       
134       
135       
136       
137       
138       
139       
140       
141       
142       
143       
144       
145       
146       
147       if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
148           && keys.containsKey("START_TIME")) {
149         
150         
151         
152         
153         
154 
155         key = new ChukwaRecordKey();
156         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
157             + keys.get("START_TIME"));
158         key.setReduceType("JobLogHistoryReduceProcessor");
159         record = new ChukwaRecord();
160         record.setTime(Long.parseLong(keys.get("START_TIME")));
161         record.add("JOBID", keys.get("JOBID"));
162         record.add("START_TIME", keys.get("START_TIME"));
163         record.add(Record.tagsField, chunk.getTags());
164         
165         output.collect(key, record);
166 
167       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
168           && keys.containsKey("FINISH_TIME")) {
169         
170         
171         
172         
173         
174         
175 
176         key = new ChukwaRecordKey();
177         key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
178             + keys.get("FINISH_TIME"));
179         key.setReduceType("JobLogHistoryReduceProcessor");
180         record = new ChukwaRecord();
181         record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
182         record.add("JOBID", keys.get("JOBID"));
183         record.add("FINISH_TIME", keys.get("FINISH_TIME"));
184         record.add(Record.tagsField, chunk.getTags());
185         
186         output.collect(key, record);
187       }
188 
189       else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
190           && keys.containsKey("START_TIME")) {
191         
192         
193         
194         
195         
196 
197         key = new ChukwaRecordKey();
198         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
199             + keys.get("START_TIME"));
200         key.setReduceType("JobLogHistoryReduceProcessor");
201         record = new ChukwaRecord();
202         record.setTime(Long.parseLong(keys.get("START_TIME")));
203         record.add("JOBID", keys.get("JOBID"));
204         record.add("START_TIME", keys.get("START_TIME"));
205         record.add(Record.tagsField, chunk.getTags());
206         
207         output.collect(key, record);
208 
209       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
210           && keys.containsKey("FINISH_TIME")) {
211         
212         
213         
214         
215         
216         
217 
218         key = new ChukwaRecordKey();
219         key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
220             + keys.get("SHUFFLE_FINISHED"));
221         key.setReduceType("JobLogHistoryReduceProcessor");
222         record = new ChukwaRecord();
223         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
224         record.add("JOBID", keys.get("JOBID"));
225         record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
226         record.add(Record.tagsField, chunk.getTags());
227         
228         output.collect(key, record);
229 
230         
231         key = new ChukwaRecordKey();
232         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
233             + keys.get("SHUFFLE_FINISHED"));
234         key.setReduceType("JobLogHistoryReduceProcessor");
235         record = new ChukwaRecord();
236         record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
237         record.add("JOBID", keys.get("JOBID"));
238         record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
239         record.add(Record.tagsField, chunk.getTags());
240         
241         output.collect(key, record);
242 
243         key = new ChukwaRecordKey();
244         key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
245             + keys.get("SORT_FINISHED"));
246         key.setReduceType("JobLogHistoryReduceProcessor");
247         record = new ChukwaRecord();
248         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
249         record.add("JOBID", keys.get("JOBID"));
250         record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
251         record.add(Record.tagsField, chunk.getTags());
252         
253         output.collect(key, record);
254 
255         
256         key = new ChukwaRecordKey();
257         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
258             + keys.get("SORT_FINISHED"));
259         key.setReduceType("JobLogHistoryReduceProcessor");
260         record = new ChukwaRecord();
261         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
262         record.add("JOBID", keys.get("JOBID"));
263         record.add("START_TIME", keys.get("SORT_FINISHED"));
264         record.add(Record.tagsField, chunk.getTags());
265         
266         output.collect(key, record);
267 
268         key = new ChukwaRecordKey();
269         key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
270             + keys.get("FINISH_TIME"));
271         key.setReduceType("JobLogHistoryReduceProcessor");
272         record = new ChukwaRecord();
273         record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
274         record.add("JOBID", keys.get("JOBID"));
275         record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
276         record.add(Record.tagsField, chunk.getTags());
277         
278         output.collect(key, record);
279 
280       } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) {
281         
282         
283         
284         
285         
286 
287         
288         
289         
290 
291         
292         
293         
294         
295         
296         
297         
298         
299         
300         
301         
302         
303         
304         
305         
306         
307         
308         
309         
310         
311 
312         record = new ChukwaRecord();
313         key = new ChukwaRecordKey();
314         buildGenericRecord(record, null, Long
315             .parseLong(keys.get("FINISH_TIME")), "MRJob");
316         if (keys.containsKey("COUNTERS")) {
317           extractCounters(record, keys.get("COUNTERS"));
318         }
319 
320         key = new ChukwaRecordKey();
321         key.setKey("MRJob/" + keys.get("JOBID"));
322         key.setReduceType("MRJobReduceProcessor");
323 
324         record = new ChukwaRecord();
325         record.add(Record.tagsField, chunk.getTags());
326         if (keys.containsKey("SUBMIT_TIME")) {
327           record.setTime(Long.parseLong(keys.get("SUBMIT_TIME")));
328         } else if (keys.containsKey("LAUNCH_TIME")) {
329           record.setTime(Long.parseLong(keys.get("LAUNCH_TIME")));
330         } else if (keys.containsKey("FINISH_TIME")) {
331           record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
332         }
333 
334         for(Entry<String, String> entry : keys.entrySet()) {
335           record.add(entry.getKey(), entry.getValue());
336         }
337 
338         output.collect(key, record);
339       }
340 
341       if (keys.containsKey("TASK_TYPE")
342           && keys.containsKey("COUNTERS")
343           && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
344               "TASK_TYPE").equalsIgnoreCase("MAP"))) {
345         
346         
347         
348         
349         
350         
351         
352         
353         
354         
355         
356         
357 
358         
359         
360         
361         
362         
363         
364         
365         
366         
367         
368 
369         record = new ChukwaRecord();
370         key = new ChukwaRecordKey();
371         buildGenericRecord(record, null, Long
372             .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
373         extractCounters(record, keys.get("COUNTERS"));
374         record.add("JOBID", keys.get("JOBID"));
375         record.add("TASKID", keys.get("TASKID"));
376         record.add("TASK_TYPE", keys.get("TASK_TYPE"));
377         record.add(Record.tagsField, chunk.getTags());
378         
379         output.collect(key, record);
380 
381       }
382     } catch (IOException e) {
383       log.warn("Unable to collect output in JobLogHistoryProcessor ["
384           + recordEntry + "]", e);
385       e.printStackTrace();
386       throw e;
387     }
388 
389   }
390 
391   protected void extractCounters(ChukwaRecord record, String input) {
392 
393     String[] data = null;
394     String[] counters = input.split(",");
395 
396     for (String counter : counters) {
397       data = counter.split(":");
398       record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
399           .toUpperCase(), data[1]);
400     }
401   }
402 
403   public String getDataType() {
404     return JobLogHistoryProcessor.recordType;
405   }
406 }