This project has retired. For details please refer to its Attic page.
FSMBuilder xref
View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.ArrayList;
24  import java.util.Set;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  
29  import org.apache.hadoop.chukwa.extraction.demux.*;
30  import org.apache.hadoop.chukwa.extraction.engine.*;
31  import org.apache.hadoop.conf.*;
32  import org.apache.hadoop.mapred.*;
33  import org.apache.hadoop.util.*;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
36  
37  /**
38   * FSM Builder
39   * 
40   * Input: start/end pairs i.e. JobHistory data
41   * 
42   * Input handling is controlled by choosing a custom mapper that 
43   * is able to parse the desired input format (e.g. JobHistory lines)
44   * One map class is provided for each type of input data provided
45   * Each map class "standardizes" the different input log types
46   * to the standardized internal FSMIntermedEntry representation
47   *
48   * Currently available mapper classes:
49   * DataNodeClientTraceMapper
50   * TaskTrackerClientTraceMapper
51   * JobHistoryTaskDataMapper
52   *
53   * Parameterizing choice of mapper class - read in as config parameter
54   *
55   * Output is standardized, regardless of input, and is generated by
56   * the common reducer
57   * 
58   */
59  
60  public class FSMBuilder extends Configured implements Tool {
61  
62    private static Log log = LogFactory.getLog(FSMBuilder.class);
63  
64  	public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS, 
65  		OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED, 
66  		COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
67  	
68  	protected static final String SEP = "/";
69  
70    public static class FSMReducer
71      extends MapReduceBase
72      implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
73    
74  		/**
75  		 * These are used for the add_info TreeMap; keys not listed here are automatically
76  		 * prepended with "COUNTER_"
77  		 */
78      protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
79  
80      protected static String JCDF_ID1 = "JCDF_ID1";
81      protected static String JCDF_ID2 = "JCDF_ID2";
82      protected static String JCDF_EDGE_TIME = "JCDF_E_TIME";
83      protected static String JCDF_EDGE_VOL = "JCDF_E_VOL";
84      protected static String JCDF_SEP = "@";
85  
86  
87      /**
88       * Populates fields used by Pig script for stitching together causal flows
89       */
90      protected void addStitchingFields_blockread
91        (ChukwaRecord cr, ArrayList<String> fnl)
92      {
93        assert(fnl.contains("JOB_ID"));
94        assert(fnl.contains("TASK_ID"));
95        assert(fnl.contains("TIME_END"));
96        assert(fnl.contains("TIME_START"));
97        assert(fnl.contains("COUNTER_BYTES"));
98  
99        String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
100       String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID"));
101       String et = new String(
102         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
103         Long.parseLong(cr.getValue("TIME_START")))).toString()
104       );
105       String ev = new String(cr.getValue("COUNTER_BYTES"));      
106       cr.add(JCDF_ID1, id1);
107       cr.add(JCDF_ID2, id2);
108       cr.add(JCDF_EDGE_TIME, et);
109       cr.add(JCDF_EDGE_VOL, ev);
110     }
111 
112     /**
113      * Populates fields used by Pig script for stitching together causal flows
114      */
115     protected void addStitchingFields_map
116       (ChukwaRecord cr, ArrayList<String> fnl)
117     {
118       assert(fnl.contains("TASK_ID"));
119       assert(fnl.contains("TIME_END"));
120       assert(fnl.contains("TIME_START"));
121       assert(fnl.contains("COUNTER_INPUT_BYTES"));
122 
123       String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID"));
124       String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID"));
125       String et = new String(
126         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
127         Long.parseLong(cr.getValue("TIME_START")))).toString()
128       );
129       String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
130       cr.add(JCDF_ID1, id1);
131       cr.add(JCDF_ID2, id2);
132       cr.add(JCDF_EDGE_TIME, et);
133       cr.add(JCDF_EDGE_VOL, ev);
134     }
135 
136     /**
137      * Populates fields used by Pig script for stitching together causal flows
138      */
139     protected void addStitchingFields_shuffle
140       (ChukwaRecord cr, ArrayList<String> fnl)
141     {
142       assert(fnl.contains("TASK_ID"));
143       assert(fnl.contains("TIME_END"));
144       assert(fnl.contains("TIME_START"));
145       assert(fnl.contains("COUNTER_BYTES"));
146 
147       String mapid, redid;
148       String id_parts[];
149       
150       id_parts = (cr.getValue("TASK_ID")).split("@");
151       if (id_parts.length != 2) {
152         log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
153       }
154       redid = id_parts[0];
155       mapid = id_parts[1];
156 
157       String id1 = new String("shuf"+JCDF_SEP+mapid);
158       String id2 = new String("shufred"+JCDF_SEP+redid);
159       String et = new String(
160         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
161         Long.parseLong(cr.getValue("TIME_START")))).toString()
162       );
163       String ev = new String(cr.getValue("COUNTER_BYTES"));      
164       cr.add(JCDF_ID1, id1);
165       cr.add(JCDF_ID2, id2);
166       cr.add(JCDF_EDGE_TIME, et);
167       cr.add(JCDF_EDGE_VOL, ev);
168     }    
169 
170     /**
171      * Populates fields used by Pig script for stitching together causal flows
172      */
173     protected void addStitchingFields_redshufwait
174       (ChukwaRecord cr, ArrayList<String> fnl)
175     {
176       assert(fnl.contains("TASK_ID"));
177       assert(fnl.contains("TIME_END"));
178       assert(fnl.contains("TIME_START"));
179       assert(fnl.contains("COUNTER_INPUT_BYTES"));
180 
181       String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID"));
182       String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
183       String et = new String(
184         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
185         Long.parseLong(cr.getValue("TIME_START")))).toString()
186       );
187       String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
188       cr.add(JCDF_ID1, id1);
189       cr.add(JCDF_ID2, id2);
190       cr.add(JCDF_EDGE_TIME, et);
191       cr.add(JCDF_EDGE_VOL, ev);
192     }
193 
194     /**
195      * Populates fields used by Pig script for stitching together causal flows
196      */
197     protected void addStitchingFields_redsort
198       (ChukwaRecord cr, ArrayList<String> fnl)
199     {
200       assert(fnl.contains("TASK_ID"));
201       assert(fnl.contains("TIME_END"));
202       assert(fnl.contains("TIME_START"));
203       assert(fnl.contains("COUNTER_INPUT_BYTES"));
204 
205       String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
206       String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
207       String et = new String(
208         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
209         Long.parseLong(cr.getValue("TIME_START")))).toString()
210       );
211       String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
212       cr.add(JCDF_ID1, id1);
213       cr.add(JCDF_ID2, id2);
214       cr.add(JCDF_EDGE_TIME, et);
215       cr.add(JCDF_EDGE_VOL, ev);
216     }    
217     
218     /**
219      * Populates fields used by Pig script for stitching together causal flows
220      */
221     protected void addStitchingFields_redreducer
222       (ChukwaRecord cr, ArrayList<String> fnl)
223     {
224       assert(fnl.contains("TASK_ID"));
225       assert(fnl.contains("TIME_END"));
226       assert(fnl.contains("TIME_START"));
227       assert(fnl.contains("COUNTER_INPUT_BYTES"));
228 
229       String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
230       String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID"));
231       String et = new String(
232         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
233         Long.parseLong(cr.getValue("TIME_START")))).toString()
234       );
235       String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
236       cr.add(JCDF_ID1, id1);
237       cr.add(JCDF_ID2, id2);
238       cr.add(JCDF_EDGE_TIME, et);
239       cr.add(JCDF_EDGE_VOL, ev);
240     }    
241     
242     protected void addStitchingFields_blockwrite
243       (ChukwaRecord cr, ArrayList<String> fnl)
244     {
245       assert(fnl.contains("JOB_ID"));
246       assert(fnl.contains("TASK_ID"));
247       assert(fnl.contains("TIME_END"));
248       assert(fnl.contains("TIME_START"));
249       assert(fnl.contains("COUNTER_BYTES"));
250 
251       String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID"));
252       String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
253       String et = new String(
254         (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
255         Long.parseLong(cr.getValue("TIME_START")))).toString()
256       );
257       String ev = new String(cr.getValue("COUNTER_BYTES"));      
258       cr.add(JCDF_ID1, id1);
259       cr.add(JCDF_ID2, id2);
260       cr.add(JCDF_EDGE_TIME, et);
261       cr.add(JCDF_EDGE_VOL, ev);
262     }
263 
264     public void addStitchingFields
265      (ChukwaRecord cr)
266     {
267       String state_name = null;
268   		String [] fieldNames = cr.getFields();
269   		
270   		// get field name list
271   		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
272   		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
273 
274       // safety checks
275       assert(fieldNamesList.contains("STATE_NAME"));
276       
277       state_name = cr.getValue("STATE_NAME");
278       if (state_name.equals("MAP")) {
279         addStitchingFields_map(cr, fieldNamesList);
280       } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) { 
281         addStitchingFields_redshufwait(cr, fieldNamesList);
282       } else if (state_name.equals("REDUCE_SORT")) { 
283         addStitchingFields_redsort(cr, fieldNamesList);        
284       } else if (state_name.equals("REDUCE_REDUCER")) { 
285         addStitchingFields_redreducer(cr, fieldNamesList);
286       } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) { 
287         addStitchingFields_shuffle(cr, fieldNamesList);
288       } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) { 
289         addStitchingFields_blockread(cr, fieldNamesList);        
290       } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
291         addStitchingFields_blockwrite(cr, fieldNamesList);
292       } 
293       // else add nothing
294     }
295 
296     public void reduce
297       (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
298        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, 
299        Reporter reporter) 
300       throws IOException
301     {
302 			FSMIntermedEntry start_rec = null, end_rec = null;
303 			FSMIntermedEntry tmpent;
304       String keystr = key.getKey();
305 			String newkey;
306 			ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
307 			ArrayList<String> noncounters = new ArrayList<String>();
308 			keystr = keystr.trim();
309 			ChukwaRecord cr = new ChukwaRecord();
310 			
311 			for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
312 			
313 			ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
314 
315       int itemcount = 0;
316 			try {
317       	while (values.hasNext()) { 
318 					itemcount++; 
319 					tmpent = values.next(); 
320 					ents.add(tmpent.clone()); 
321 				}
322 			} catch (CloneNotSupportedException e) {
323 				// do nothing
324 			}
325 
326       log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
327       
328 			if (itemcount == 2) { // i.e. we have both start and end events
329 
330 				if (ents.get(0).state_type.val == StateType.STATE_START && 
331 						ents.get(1).state_type.val == StateType.STATE_END) 
332 				{
333 					start_rec = ents.get(0); end_rec = ents.get(1);
334 				} else if	(ents.get(1).state_type.val == StateType.STATE_START && 
335 									 ents.get(0).state_type.val == StateType.STATE_END)
336 				{
337 					start_rec = ents.get(1); end_rec = ents.get(0);
338 				} else {
339 					log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
340 					// error handling?
341 				}
342 						
343 				cr.add(new String("STATE_NAME"),start_rec.state_name);
344 				cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID());
345 				cr.add(new String("TIMESTAMP"),start_rec.timestamp);
346 				cr.add(new String("TIME_START"),start_rec.time_start);
347 				cr.add(new String("TIME_END"),end_rec.time_end);
348 				cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3));
349 				cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3));
350 				cr.add(new String("HOST"),start_rec.host_exec);
351 				cr.add(new String("HOST_OTHER"),start_rec.host_other);
352 				cr.add(new String("JOB_ID"),start_rec.job_id); 
353 				cr.add(new String("TASK_ID"),start_rec.getFriendlyID());
354 
355 				Set<String> treemapkeys = end_rec.add_info.keySet();
356 				Iterator<String> keyIter = treemapkeys.iterator();
357 				
358 				for (int i = 0; i < treemapkeys.size(); i++) {
359 					assert(keyIter.hasNext());
360 					String currkey = keyIter.next();
361 					if (currkey != null && 
362 					    !noncounters.contains(currkey)) {
363 						cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey));	
364 					} else if (currkey != null && noncounters.contains(currkey)) {
365 						cr.add(new String(currkey), end_rec.add_info.get(currkey));				
366 					} 
367 				}
368 				assert(!keyIter.hasNext());
369 				cr.setTime(Long.parseLong(start_rec.timestamp));
370 				
371 				newkey = null;
372 				newkey = new String(start_rec.time_orig_epoch + 
373 					SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig);
374 
375 				log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
376 
377         addStitchingFields(cr);
378         log.debug(cr);
379 				coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
380 				
381 			} else if (itemcount == 1) { 
382 				// check that we have only the start; if we have only the end, dump it
383 				// otherwise change the reducetype to get record written to file for
384 				// incomplete entries
385 				
386 				log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
387 				
388 			} else { // any other value is invalid
389 				// malformed data; print debug info?
390 
391 				log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
392 
393 			}
394     }
395   }
396   
397   
398   public int run (String args[]) throws Exception {
399 		int num_inputs;
400     JobConf conf = new JobConf(getConf(), FSMBuilder.class);
401 		String [] args2 = args;
402 
403 		if (args2.length < 4 || !"-in".equals(args2[0]))
404 		{
405 			System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
406 			System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
407 			return(1);
408 		} 
409 
410     conf.setJobName("Salsa_FSMBuilder");
411     
412 		/* Get name of Mapper class to use */
413 		String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
414 		log.info("Mapper class: " + mapclassname);
415 		Class mapperClass = null;
416 		try {
417 			mapperClass = Class.forName(mapclassname);
418 		} catch (ClassNotFoundException c) {
419 			System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
420 		}
421 
422     /* Get on with usual job setup */
423     conf.setMapperClass(mapperClass);
424     conf.setReducerClass(FSMReducer.class);
425     conf.setOutputKeyClass(ChukwaRecordKey.class);
426     conf.setOutputValueClass(ChukwaRecord.class);
427     conf.setInputFormat(SequenceFileInputFormat.class);
428     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
429     conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
430 		conf.setMapOutputValueClass(FSMIntermedEntry.class);
431 		conf.setMapOutputKeyClass(ChukwaRecordKey.class);
432 		conf.setNumReduceTasks(1); // fixed at 1 to ensure that all records are grouped together
433                              
434 		/* Setup inputs/outputs */
435 		try {
436 			num_inputs = Integer.parseInt(args2[1]);
437 		} catch (NumberFormatException e) {
438 			System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
439 			System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
440 			return(1);
441 		}
442 		
443 		if (num_inputs <= 0) {
444 			System.err.println("Must have at least 1 input.");
445 			return(1);
446 		}
447 		
448 		for (int i = 2; i < 2+num_inputs; i++) {
449 	    Path in = new Path(args2[i]);
450 	    FileInputFormat.addInputPath(conf, in);			
451 		}
452 
453     Path out = new Path(args2[2+num_inputs]);
454     FileOutputFormat.setOutputPath(conf, out);
455     
456     JobClient.runJob(conf);
457     
458     return(0);
459   }
460   
461   public static void main (String [] args) throws Exception {
462         
463     int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
464     
465     System.exit(res);
466   }
467 
468 
469   
470 }
471