This project has retired. For details please refer to its
Attic page.
FSMBuilder xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.ArrayList;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.chukwa.extraction.demux.*;
30 import org.apache.hadoop.chukwa.extraction.engine.*;
31 import org.apache.hadoop.conf.*;
32 import org.apache.hadoop.mapred.*;
33 import org.apache.hadoop.util.*;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class FSMBuilder extends Configured implements Tool {
61
62 private static Log log = LogFactory.getLog(FSMBuilder.class);
63
64 public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS,
65 OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED,
66 COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
67
68 protected static final String SEP = "/";
69
70 public static class FSMReducer
71 extends MapReduceBase
72 implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
73
74
75
76
77
78 final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
79
80 protected final static String JCDF_ID1 = "JCDF_ID1";
81 protected final static String JCDF_ID2 = "JCDF_ID2";
82 protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME";
83 protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL";
84 protected final static String JCDF_SEP = "@";
85
86
87
88
89
90 protected void addStitchingFields_blockread
91 (ChukwaRecord cr, ArrayList<String> fnl)
92 {
93 assert(fnl.contains("JOB_ID"));
94 assert(fnl.contains("TASK_ID"));
95 assert(fnl.contains("TIME_END"));
96 assert(fnl.contains("TIME_START"));
97 assert(fnl.contains("COUNTER_BYTES"));
98
99 String id1 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString();
100 String id2 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString();
101 String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START"))));
102 String ev = new StringBuilder().append(cr.getValue("COUNTER_BYTES")).toString();
103 cr.add(JCDF_ID1, id1);
104 cr.add(JCDF_ID2, id2);
105 cr.add(JCDF_EDGE_TIME, et);
106 cr.add(JCDF_EDGE_VOL, ev);
107 }
108
109
110
111
112 protected void addStitchingFields_map
113 (ChukwaRecord cr, ArrayList<String> fnl)
114 {
115 assert(fnl.contains("TASK_ID"));
116 assert(fnl.contains("TIME_END"));
117 assert(fnl.contains("TIME_START"));
118 assert(fnl.contains("COUNTER_INPUT_BYTES"));
119
120 String id1 = new StringBuilder().append("map").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
121 String id2 = new StringBuilder().append("shuf").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
122 String et = Long.toString((Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START"))));
123 String ev = cr.getValue("COUNTER_INPUT_BYTES");
124 cr.add(JCDF_ID1, id1);
125 cr.add(JCDF_ID2, id2);
126 cr.add(JCDF_EDGE_TIME, et);
127 cr.add(JCDF_EDGE_VOL, ev);
128 }
129
130
131
132
133 protected void addStitchingFields_shuffle
134 (ChukwaRecord cr, ArrayList<String> fnl)
135 {
136 assert(fnl.contains("TASK_ID"));
137 assert(fnl.contains("TIME_END"));
138 assert(fnl.contains("TIME_START"));
139 assert(fnl.contains("COUNTER_BYTES"));
140
141 String mapid, redid;
142 String id_parts[];
143
144 id_parts = (cr.getValue("TASK_ID")).split("@");
145 if (id_parts.length != 2) {
146 log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
147 }
148 redid = id_parts[0];
149 mapid = id_parts[1];
150
151 String id1 = new StringBuilder().append("shuf").append(JCDF_SEP).append(mapid).toString();
152 String id2 = new StringBuilder().append("shufred").append(JCDF_SEP).append(redid).toString();
153 String et = Long.toString(
154 Long.parseLong(cr.getValue("TIME_END")) -
155 Long.parseLong(cr.getValue("TIME_START"))
156 );
157 String ev = cr.getValue("COUNTER_BYTES");
158 cr.add(JCDF_ID1, id1);
159 cr.add(JCDF_ID2, id2);
160 cr.add(JCDF_EDGE_TIME, et);
161 cr.add(JCDF_EDGE_VOL, ev);
162 }
163
164
165
166
167 protected void addStitchingFields_redshufwait
168 (ChukwaRecord cr, ArrayList<String> fnl)
169 {
170 assert(fnl.contains("TASK_ID"));
171 assert(fnl.contains("TIME_END"));
172 assert(fnl.contains("TIME_START"));
173 assert(fnl.contains("COUNTER_INPUT_BYTES"));
174
175 String id1 = new StringBuilder().append("shufred").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
176 String id2 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
177 String et = Long.toString(
178 (Long.parseLong(cr.getValue("TIME_END")) -
179 Long.parseLong(cr.getValue("TIME_START")))
180 );
181 String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString();
182 cr.add(JCDF_ID1, id1);
183 cr.add(JCDF_ID2, id2);
184 cr.add(JCDF_EDGE_TIME, et);
185 cr.add(JCDF_EDGE_VOL, ev);
186 }
187
188
189
190
191 protected void addStitchingFields_redsort
192 (ChukwaRecord cr, ArrayList<String> fnl)
193 {
194 assert(fnl.contains("TASK_ID"));
195 assert(fnl.contains("TIME_END"));
196 assert(fnl.contains("TIME_START"));
197 assert(fnl.contains("COUNTER_INPUT_BYTES"));
198
199 String id1 = new StringBuilder().append("redsort").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
200 String id2 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
201 String et = Long.toString(
202 Long.parseLong(cr.getValue("TIME_END")) -
203 Long.parseLong(cr.getValue("TIME_START"))
204 );
205 String ev = new StringBuilder().append(cr.getValue("COUNTER_INPUT_BYTES")).toString();
206 cr.add(JCDF_ID1, id1);
207 cr.add(JCDF_ID2, id2);
208 cr.add(JCDF_EDGE_TIME, et);
209 cr.add(JCDF_EDGE_VOL, ev);
210 }
211
212
213
214
215 protected void addStitchingFields_redreducer
216 (ChukwaRecord cr, ArrayList<String> fnl)
217 {
218 assert(fnl.contains("TASK_ID"));
219 assert(fnl.contains("TIME_END"));
220 assert(fnl.contains("TIME_START"));
221 assert(fnl.contains("COUNTER_INPUT_BYTES"));
222
223 String id1 = new StringBuilder().append("red").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
224 String id2 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("TASK_ID")).toString();
225 String et = Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")));
226 String ev = cr.getValue("COUNTER_INPUT_BYTES");
227 cr.add(JCDF_ID1, id1);
228 cr.add(JCDF_ID2, id2);
229 cr.add(JCDF_EDGE_TIME, et);
230 cr.add(JCDF_EDGE_VOL, ev);
231 }
232
233 protected void addStitchingFields_blockwrite
234 (ChukwaRecord cr, ArrayList<String> fnl)
235 {
236 assert(fnl.contains("JOB_ID"));
237 assert(fnl.contains("TASK_ID"));
238 assert(fnl.contains("TIME_END"));
239 assert(fnl.contains("TIME_START"));
240 assert(fnl.contains("COUNTER_BYTES"));
241
242 String id1 = new StringBuilder().append("redout").append(JCDF_SEP).append(cr.getValue("JOB_ID")).toString();
243 String id2 = new StringBuilder().append(cr.getValue("TASK_ID")).append(JCDF_SEP).append(cr.getValue("TIME_START")).toString();
244 String et = new StringBuilder().append(Long.toString(Long.parseLong(cr.getValue("TIME_END")) - Long.parseLong(cr.getValue("TIME_START")))).toString();
245 String ev = cr.getValue("COUNTER_BYTES");
246 cr.add(JCDF_ID1, id1);
247 cr.add(JCDF_ID2, id2);
248 cr.add(JCDF_EDGE_TIME, et);
249 cr.add(JCDF_EDGE_VOL, ev);
250 }
251
252 public void addStitchingFields
253 (ChukwaRecord cr)
254 {
255 String state_name = null;
256 String [] fieldNames = cr.getFields();
257
258
259 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
260 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
261
262
263 assert(fieldNamesList.contains("STATE_NAME"));
264
265 state_name = cr.getValue("STATE_NAME");
266 if (state_name.equals("MAP")) {
267 addStitchingFields_map(cr, fieldNamesList);
268 } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) {
269 addStitchingFields_redshufwait(cr, fieldNamesList);
270 } else if (state_name.equals("REDUCE_SORT")) {
271 addStitchingFields_redsort(cr, fieldNamesList);
272 } else if (state_name.equals("REDUCE_REDUCER")) {
273 addStitchingFields_redreducer(cr, fieldNamesList);
274 } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) {
275 addStitchingFields_shuffle(cr, fieldNamesList);
276 } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) {
277 addStitchingFields_blockread(cr, fieldNamesList);
278 } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
279 addStitchingFields_blockwrite(cr, fieldNamesList);
280 }
281
282 }
283
284 public void reduce
285 (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
286 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
287 Reporter reporter)
288 throws IOException
289 {
290 FSMIntermedEntry start_rec = null, end_rec = null;
291 FSMIntermedEntry tmpent;
292 String keystr = key.getKey();
293 String newkey;
294 ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
295 ArrayList<String> noncounters = new ArrayList<String>();
296 keystr = keystr.trim();
297 ChukwaRecord cr = new ChukwaRecord();
298
299 for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
300
301 ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
302
303 int itemcount = 0;
304 try {
305 while (values.hasNext()) {
306 itemcount++;
307 tmpent = values.next();
308 ents.add(tmpent.clone());
309 }
310 } catch (CloneNotSupportedException e) {
311
312 }
313
314 log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
315
316 if (itemcount == 2) {
317
318 if (ents.get(0).state_type.val == StateType.STATE_START &&
319 ents.get(1).state_type.val == StateType.STATE_END)
320 {
321 start_rec = ents.get(0); end_rec = ents.get(1);
322 } else if (ents.get(1).state_type.val == StateType.STATE_START &&
323 ents.get(0).state_type.val == StateType.STATE_END)
324 {
325 start_rec = ents.get(1); end_rec = ents.get(0);
326 } else {
327 log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
328
329 }
330
331 cr.add("STATE_NAME",start_rec.state_name);
332 cr.add("STATE_UNIQ_ID",start_rec.getUniqueID());
333 cr.add("TIMESTAMP",start_rec.timestamp);
334 cr.add("TIME_START",start_rec.time_start);
335 cr.add("TIME_END",end_rec.time_end);
336 cr.add("TIME_START_MILLIS",start_rec.time_start.substring(start_rec.time_start.length()-3));
337 cr.add("TIME_END_MILLIS",end_rec.time_end.substring(end_rec.time_end.length()-3));
338 cr.add("HOST",start_rec.host_exec);
339 cr.add("HOST_OTHER",start_rec.host_other);
340 cr.add("JOB_ID",start_rec.job_id);
341 cr.add("TASK_ID",start_rec.getFriendlyID());
342
343 Set<String> treemapkeys = end_rec.add_info.keySet();
344 Iterator<String> keyIter = treemapkeys.iterator();
345
346 for (int i = 0; i < treemapkeys.size(); i++) {
347 assert(keyIter.hasNext());
348 String currkey = keyIter.next();
349 if (currkey != null &&
350 !noncounters.contains(currkey)) {
351 cr.add("COUNTER_" + currkey, end_rec.add_info.get(currkey));
352 } else if (currkey != null && noncounters.contains(currkey)) {
353 cr.add(currkey, end_rec.add_info.get(currkey));
354 }
355 }
356 assert(!keyIter.hasNext());
357 cr.setTime(Long.parseLong(start_rec.timestamp));
358
359 newkey = null;
360 newkey = new StringBuilder().append(start_rec.time_orig_epoch).append(SEP).append(start_rec.getUniqueID()).
361 append(SEP).append(start_rec.time_orig).toString();
362
363 log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
364
365 addStitchingFields(cr);
366 log.debug(cr);
367 coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
368
369 } else if (itemcount == 1) {
370
371
372
373
374 log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
375
376 } else {
377
378
379 log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
380
381 }
382 }
383 }
384
385
386 public int run (String args[]) throws Exception {
387 int num_inputs;
388 JobConf conf = new JobConf(getConf(), FSMBuilder.class);
389 String [] args2 = args;
390
391 if (args2.length < 4 || !"-in".equals(args2[0]))
392 {
393 System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
394 System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
395 return(1);
396 }
397
398 conf.setJobName("Salsa_FSMBuilder");
399
400
401 String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
402 log.info("Mapper class: " + mapclassname);
403 Class mapperClass = null;
404 try {
405 mapperClass = Class.forName(mapclassname);
406 } catch (ClassNotFoundException c) {
407 System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
408 }
409
410
411 conf.setMapperClass(mapperClass);
412 conf.setReducerClass(FSMReducer.class);
413 conf.setOutputKeyClass(ChukwaRecordKey.class);
414 conf.setOutputValueClass(ChukwaRecord.class);
415 conf.setInputFormat(SequenceFileInputFormat.class);
416 conf.setOutputFormat(ChukwaRecordOutputFormat.class);
417 conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
418 conf.setMapOutputValueClass(FSMIntermedEntry.class);
419 conf.setMapOutputKeyClass(ChukwaRecordKey.class);
420 conf.setNumReduceTasks(1);
421
422
423 try {
424 num_inputs = Integer.parseInt(args2[1]);
425 } catch (NumberFormatException e) {
426 System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
427 System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
428 return(1);
429 }
430
431 if (num_inputs <= 0) {
432 System.err.println("Must have at least 1 input.");
433 return(1);
434 }
435
436 for (int i = 2; i < 2+num_inputs; i++) {
437 Path in = new Path(args2[i]);
438 FileInputFormat.addInputPath(conf, in);
439 }
440
441 Path out = new Path(args2[2+num_inputs]);
442 FileOutputFormat.setOutputPath(conf, out);
443
444 JobClient.runJob(conf);
445
446 return(0);
447 }
448
449 public static void main (String [] args) throws Exception {
450
451 int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
452
453 return;
454 }
455
456
457
458 }
459