This project has retired. For details please refer to its Attic page.
FSMBuilder xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.ArrayList;
24  import java.util.Set;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  
29  import org.apache.hadoop.chukwa.extraction.demux.*;
30  import org.apache.hadoop.chukwa.extraction.engine.*;
31  import org.apache.hadoop.conf.*;
32  import org.apache.hadoop.mapred.*;
33  import org.apache.hadoop.util.*;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
36  
37  /**
38   * FSM Builder
39   * 
40   * Input: start/end pairs i.e. JobHistory data
41   * 
42   * Input handling is controlled by choosing a custom mapper that 
43   * is able to parse the desired input format (e.g. JobHistory lines)
44   * One map class is provided for each type of input data provided
45   * Each map class "standardizes" the different input log types
46   * to the standardized internal FSMIntermedEntry representation
47   *
48   * Currently available mapper classes:
49   * DataNodeClientTraceMapper
50   * TaskTrackerClientTraceMapper
51   * JobHistoryTaskDataMapper
52   *
53   * Parameterizing choice of mapper class - read in as config parameter
54   *
55   * Output is standardized, regardless of input, and is generated by
56   * the common reducer
57   * 
58   */
59  
60  public class FSMBuilder extends Configured implements Tool {
61  
62    private static Log log = LogFactory.getLog(FSMBuilder.class);
63  
64  	public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS, 
65  		OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED, 
66  		COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
67  	
68  	protected static final String SEP = "/";
69  
70    public static class FSMReducer
71      extends MapReduceBase
72      implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
73    
74  		/**
75  		 * These are used for the add_info TreeMap; keys not listed here are automatically
76  		 * prepended with "COUNTER_"
77  		 */
78      final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
79  
80      protected final static String JCDF_ID1 = "JCDF_ID1";
81      protected final static String JCDF_ID2 = "JCDF_ID2";
82      protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME";
83      protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL";
84      protected final static String JCDF_SEP = "@";
85  
86  
87      /**
88       * Populates fields used by Pig script for stitching together causal flows
89       */
90      protected void addStitchingFields_blockread
91        (ChukwaRecord cr, ArrayList<String> fnl)
92      {
93        assert(fnl.contains("JOB_ID"));
94        assert(fnl.contains("TASK_ID"));
95        assert(fnl.contains("TIME_END"));
96        assert(fnl.contains("TIME_START"));
97        assert(fnl.contains("COUNTER_BYTES"));
98  
99        String id1 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString();
100       String id2 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString();
101       String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START"))));
102       String ev = new StringBuilder().append(cr.getValue("COUNTER_BYTES")).toString();
103       cr.add(JCDF_ID1, id1);
104       cr.add(JCDF_ID2, id2);
105       cr.add(JCDF_EDGE_TIME, et);
106       cr.add(JCDF_EDGE_VOL, ev);
107     }
108 
109     /**
110      * Populates fields used by Pig script for stitching together causal flows
111      */
112     protected void addStitchingFields_map
113       (ChukwaRecord cr, ArrayList<String> fnl)
114     {
115       assert(fnl.contains("TASK_ID"));
116       assert(fnl.contains("TIME_END"));
117       assert(fnl.contains("TIME_START"));
118       assert(fnl.contains("COUNTER_INPUT_BYTES"));
119 
120       String id1 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
121       String id2 = new StringBuilder().append("shuf").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
122       String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START"))));
123       String ev = cr.getValue("COUNTER_INPUT_BYTES");
124       cr.add(JCDF_ID1, id1);
125       cr.add(JCDF_ID2, id2);
126       cr.add(JCDF_EDGE_TIME, et);
127       cr.add(JCDF_EDGE_VOL, ev);
128     }
129 
130     /**
131      * Populates fields used by Pig script for stitching together causal flows
132      */
133     protected void addStitchingFields_shuffle
134       (ChukwaRecord cr, ArrayList<String> fnl)
135     {
136       assert(fnl.contains("TASK_ID"));
137       assert(fnl.contains("TIME_END"));
138       assert(fnl.contains("TIME_START"));
139       assert(fnl.contains("COUNTER_BYTES"));
140 
141       String mapid, redid;
142       String id_parts[];
143       
144       id_parts = (cr.getValue("TASK_ID")).split("@");
145       if (id_parts.length != 2) {
146         log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
147       }
148       redid = id_parts[0];
149       mapid = id_parts[1];
150 
151       String id1 = new StringBuilder().append("shuf").append(JCDF_SEP).append(mapid).toString();
152       String id2 = new StringBuilder().append("shufred").append(JCDF_SEP).append(redid).toString();
153       String et = Long.toString(
154         Long.parseLong(cr.getValue("TIME_END")) - 
155         Long.parseLong(cr.getValue("TIME_START"))
156       );
157       String ev = cr.getValue("COUNTER_BYTES");
158       cr.add(JCDF_ID1, id1);
159       cr.add(JCDF_ID2, id2);
160       cr.add(JCDF_EDGE_TIME, et);
161       cr.add(JCDF_EDGE_VOL, ev);
162     }    
163 
164     /**
165      * Populates fields used by Pig script for stitching together causal flows
166      */
167     protected void addStitchingFields_redshufwait
168       (ChukwaRecord cr, ArrayList<String> fnl)
169     {
170       assert(fnl.contains("TASK_ID"));
171       assert(fnl.contains("TIME_END"));
172       assert(fnl.contains("TIME_START"));
173       assert(fnl.contains("COUNTER_INPUT_BYTES"));
174 
175       String id1 = new StringBuilder().append("shufred").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
176       String id2 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
177       String et = Long.toString(
178         (Long.parseLong(cr.getValue("TIME_END")) - 
179         Long.parseLong(cr.getValue("TIME_START")))
180       );
181       String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString();
182       cr.add(JCDF_ID1, id1);
183       cr.add(JCDF_ID2, id2);
184       cr.add(JCDF_EDGE_TIME, et);
185       cr.add(JCDF_EDGE_VOL, ev);
186     }
187 
188     /**
189      * Populates fields used by Pig script for stitching together causal flows
190      */
191     protected void addStitchingFields_redsort
192       (ChukwaRecord cr, ArrayList<String> fnl)
193     {
194       assert(fnl.contains("TASK_ID"));
195       assert(fnl.contains("TIME_END"));
196       assert(fnl.contains("TIME_START"));
197       assert(fnl.contains("COUNTER_INPUT_BYTES"));
198 
199       String id1 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
200       String id2 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
201       String et = Long.toString(
202         Long.parseLong(cr.getValue("TIME_END")) - 
203         Long.parseLong(cr.getValue("TIME_START"))
204       );
205       String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString();
206       cr.add(JCDF_ID1, id1);
207       cr.add(JCDF_ID2, id2);
208       cr.add(JCDF_EDGE_TIME, et);
209       cr.add(JCDF_EDGE_VOL, ev);
210     }    
211     
212     /**
213      * Populates fields used by Pig script for stitching together causal flows
214      */
215     protected void addStitchingFields_redreducer
216       (ChukwaRecord cr, ArrayList<String> fnl)
217     {
218       assert(fnl.contains("TASK_ID"));
219       assert(fnl.contains("TIME_END"));
220       assert(fnl.contains("TIME_START"));
221       assert(fnl.contains("COUNTER_INPUT_BYTES"));
222 
223       String id1 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
224       String id2 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
225       String et = Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")));
226       String ev = cr.getValue("COUNTER_INPUT_BYTES");
227       cr.add(JCDF_ID1, id1);
228       cr.add(JCDF_ID2, id2);
229       cr.add(JCDF_EDGE_TIME, et);
230       cr.add(JCDF_EDGE_VOL, ev);
231     }
232 
233     protected void addStitchingFields_blockwrite
234       (ChukwaRecord cr, ArrayList<String> fnl)
235     {
236       assert(fnl.contains("JOB_ID"));
237       assert(fnl.contains("TASK_ID"));
238       assert(fnl.contains("TIME_END"));
239       assert(fnl.contains("TIME_START"));
240       assert(fnl.contains("COUNTER_BYTES"));
241 
242       String id1 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString();
243       String id2 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString();
244       String et = new StringBuilder().append(Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")))).toString();
245       String ev = cr.getValue("COUNTER_BYTES");
246       cr.add(JCDF_ID1, id1);
247       cr.add(JCDF_ID2, id2);
248       cr.add(JCDF_EDGE_TIME, et);
249       cr.add(JCDF_EDGE_VOL, ev);
250     }
251 
252     public void addStitchingFields
253      (ChukwaRecord cr)
254     {
255       String state_name = null;
256   		String [] fieldNames = cr.getFields();
257   		
258   		// get field name list
259   		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
260   		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
261 
262       // safety checks
263       assert(fieldNamesList.contains("STATE_NAME"));
264       
265       state_name = cr.getValue("STATE_NAME");
266       if (state_name.equals("MAP")) {
267         addStitchingFields_map(cr, fieldNamesList);
268       } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) { 
269         addStitchingFields_redshufwait(cr, fieldNamesList);
270       } else if (state_name.equals("REDUCE_SORT")) { 
271         addStitchingFields_redsort(cr, fieldNamesList);        
272       } else if (state_name.equals("REDUCE_REDUCER")) { 
273         addStitchingFields_redreducer(cr, fieldNamesList);
274       } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) { 
275         addStitchingFields_shuffle(cr, fieldNamesList);
276       } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) { 
277         addStitchingFields_blockread(cr, fieldNamesList);        
278       } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
279         addStitchingFields_blockwrite(cr, fieldNamesList);
280       } 
281       // else add nothing
282     }
283 
284     public void reduce
285       (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
286        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, 
287        Reporter reporter) 
288       throws IOException
289     {
290 			FSMIntermedEntry start_rec = null, end_rec = null;
291 			FSMIntermedEntry tmpent;
292       String keystr = key.getKey();
293 			String newkey;
294 			ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
295 			ArrayList<String> noncounters = new ArrayList<String>();
296 			keystr = keystr.trim();
297 			ChukwaRecord cr = new ChukwaRecord();
298 			
299 			for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
300 			
301 			ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
302 
303       int itemcount = 0;
304 			try {
305       	while (values.hasNext()) { 
306 					itemcount++; 
307 					tmpent = values.next(); 
308 					ents.add(tmpent.clone()); 
309 				}
310 			} catch (CloneNotSupportedException e) {
311 				// do nothing
312 			}
313 
314       log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
315       
316 			if (itemcount == 2) { // i.e. we have both start and end events
317 
318 				if (ents.get(0).state_type.val == StateType.STATE_START && 
319 						ents.get(1).state_type.val == StateType.STATE_END) 
320 				{
321 					start_rec = ents.get(0); end_rec = ents.get(1);
322 				} else if	(ents.get(1).state_type.val == StateType.STATE_START && 
323 									 ents.get(0).state_type.val == StateType.STATE_END)
324 				{
325 					start_rec = ents.get(1); end_rec = ents.get(0);
326 				} else {
327 					log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
328 					// error handling?
329 				}
330 						
331 				cr.add("STATE_NAME",start_rec.state_name);
332 				cr.add("STATE_UNIQ_ID",start_rec.getUniqueID());
333 				cr.add("TIMESTAMP",start_rec.timestamp);
334 				cr.add("TIME_START",start_rec.time_start);
335 				cr.add("TIME_END",end_rec.time_end);
336 				cr.add("TIME_START_MILLIS",start_rec.time_start.substring(start_rec.time_start.length()-3));
337 				cr.add("TIME_END_MILLIS",end_rec.time_end.substring(end_rec.time_end.length()-3));
338 				cr.add("HOST",start_rec.host_exec);
339 				cr.add("HOST_OTHER",start_rec.host_other);
340 				cr.add("JOB_ID",start_rec.job_id); 
341 				cr.add("TASK_ID",start_rec.getFriendlyID());
342 
343 				Set<String> treemapkeys = end_rec.add_info.keySet();
344 				Iterator<String> keyIter = treemapkeys.iterator();
345 				
346 				for (int i = 0; i < treemapkeys.size(); i++) {
347 					assert(keyIter.hasNext());
348 					String currkey = keyIter.next();
349 					if (currkey != null && 
350 					    !noncounters.contains(currkey)) {
351 						cr.add("COUNTER_" + currkey, end_rec.add_info.get(currkey));	
352 					} else if (currkey != null && noncounters.contains(currkey)) {
353 						cr.add(currkey, end_rec.add_info.get(currkey));				
354 					} 
355 				}
356 				assert(!keyIter.hasNext());
357 				cr.setTime(Long.parseLong(start_rec.timestamp));
358 				
359 				newkey = null;
360 				newkey = new StringBuilder().append(start_rec.time_orig_epoch).append(SEP).append(start_rec.getUniqueID()).
361 				    append(SEP).append(start_rec.time_orig).toString();
362 
363 				log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
364 
365         addStitchingFields(cr);
366         log.debug(cr);
367 				coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
368 				
369 			} else if (itemcount == 1) { 
370 				// check that we have only the start; if we have only the end, dump it
371 				// otherwise change the reducetype to get record written to file for
372 				// incomplete entries
373 				
374 				log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
375 				
376 			} else { // any other value is invalid
377 				// malformed data; print debug info?
378 
379 				log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
380 
381 			}
382     }
383   }
384   
385   
386   public int run (String args[]) throws Exception {
387 		int num_inputs;
388     JobConf conf = new JobConf(getConf(), FSMBuilder.class);
389 		String [] args2 = args;
390 
391 		if (args2.length < 4 || !"-in".equals(args2[0]))
392 		{
393 			System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
394 			System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
395 			return(1);
396 		} 
397 
398     conf.setJobName("Salsa_FSMBuilder");
399     
400 		/* Get name of Mapper class to use */
401 		String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
402 		log.info("Mapper class: " + mapclassname);
403 		Class mapperClass = null;
404 		try {
405 			mapperClass = Class.forName(mapclassname);
406 		} catch (ClassNotFoundException c) {
407 			System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
408 		}
409 
410     /* Get on with usual job setup */
411     conf.setMapperClass(mapperClass);
412     conf.setReducerClass(FSMReducer.class);
413     conf.setOutputKeyClass(ChukwaRecordKey.class);
414     conf.setOutputValueClass(ChukwaRecord.class);
415     conf.setInputFormat(SequenceFileInputFormat.class);
416     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
417     conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
418 		conf.setMapOutputValueClass(FSMIntermedEntry.class);
419 		conf.setMapOutputKeyClass(ChukwaRecordKey.class);
420 		conf.setNumReduceTasks(1); // fixed at 1 to ensure that all records are grouped together
421                              
422 		/* Setup inputs/outputs */
423 		try {
424 			num_inputs = Integer.parseInt(args2[1]);
425 		} catch (NumberFormatException e) {
426 			System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
427 			System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
428 			return(1);
429 		}
430 		
431 		if (num_inputs <= 0) {
432 			System.err.println("Must have at least 1 input.");
433 			return(1);
434 		}
435 		
436 		for (int i = 2; i < 2+num_inputs; i++) {
437 	    Path in = new Path(args2[i]);
438 	    FileInputFormat.addInputPath(conf, in);			
439 		}
440 
441     Path out = new Path(args2[2+num_inputs]);
442     FileOutputFormat.setOutputPath(conf, out);
443     
444     JobClient.runJob(conf);
445     
446     return(0);
447   }
448   
449   public static void main (String [] args) throws Exception {
450         
451     int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
452     
453     return;
454   }
455 
456 
457   
458 }
459