This project has retired. For details please refer to its
Attic page.
FSMBuilder xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.ArrayList;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.chukwa.extraction.demux.*;
30 import org.apache.hadoop.chukwa.extraction.engine.*;
31 import org.apache.hadoop.conf.*;
32 import org.apache.hadoop.mapred.*;
33 import org.apache.hadoop.util.*;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class FSMBuilder extends Configured implements Tool {
61
62 private static Log log = LogFactory.getLog(FSMBuilder.class);
63
64 public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS,
65 OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED,
66 COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
67
68 protected static final String SEP = "/";
69
70 public static class FSMReducer
71 extends MapReduceBase
72 implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
73
74
75
76
77
78 protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
79
80 protected static String JCDF_ID1 = "JCDF_ID1";
81 protected static String JCDF_ID2 = "JCDF_ID2";
82 protected static String JCDF_EDGE_TIME = "JCDF_E_TIME";
83 protected static String JCDF_EDGE_VOL = "JCDF_E_VOL";
84 protected static String JCDF_SEP = "@";
85
86
87
88
89
90 protected void addStitchingFields_blockread
91 (ChukwaRecord cr, ArrayList<String> fnl)
92 {
93 assert(fnl.contains("JOB_ID"));
94 assert(fnl.contains("TASK_ID"));
95 assert(fnl.contains("TIME_END"));
96 assert(fnl.contains("TIME_START"));
97 assert(fnl.contains("COUNTER_BYTES"));
98
99 String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
100 String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID"));
101 String et = new String(
102 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
103 Long.parseLong(cr.getValue("TIME_START")))).toString()
104 );
105 String ev = new String(cr.getValue("COUNTER_BYTES"));
106 cr.add(JCDF_ID1, id1);
107 cr.add(JCDF_ID2, id2);
108 cr.add(JCDF_EDGE_TIME, et);
109 cr.add(JCDF_EDGE_VOL, ev);
110 }
111
112
113
114
115 protected void addStitchingFields_map
116 (ChukwaRecord cr, ArrayList<String> fnl)
117 {
118 assert(fnl.contains("TASK_ID"));
119 assert(fnl.contains("TIME_END"));
120 assert(fnl.contains("TIME_START"));
121 assert(fnl.contains("COUNTER_INPUT_BYTES"));
122
123 String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID"));
124 String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID"));
125 String et = new String(
126 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
127 Long.parseLong(cr.getValue("TIME_START")))).toString()
128 );
129 String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
130 cr.add(JCDF_ID1, id1);
131 cr.add(JCDF_ID2, id2);
132 cr.add(JCDF_EDGE_TIME, et);
133 cr.add(JCDF_EDGE_VOL, ev);
134 }
135
136
137
138
139 protected void addStitchingFields_shuffle
140 (ChukwaRecord cr, ArrayList<String> fnl)
141 {
142 assert(fnl.contains("TASK_ID"));
143 assert(fnl.contains("TIME_END"));
144 assert(fnl.contains("TIME_START"));
145 assert(fnl.contains("COUNTER_BYTES"));
146
147 String mapid, redid;
148 String id_parts[];
149
150 id_parts = (cr.getValue("TASK_ID")).split("@");
151 if (id_parts.length != 2) {
152 log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
153 }
154 redid = id_parts[0];
155 mapid = id_parts[1];
156
157 String id1 = new String("shuf"+JCDF_SEP+mapid);
158 String id2 = new String("shufred"+JCDF_SEP+redid);
159 String et = new String(
160 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
161 Long.parseLong(cr.getValue("TIME_START")))).toString()
162 );
163 String ev = new String(cr.getValue("COUNTER_BYTES"));
164 cr.add(JCDF_ID1, id1);
165 cr.add(JCDF_ID2, id2);
166 cr.add(JCDF_EDGE_TIME, et);
167 cr.add(JCDF_EDGE_VOL, ev);
168 }
169
170
171
172
173 protected void addStitchingFields_redshufwait
174 (ChukwaRecord cr, ArrayList<String> fnl)
175 {
176 assert(fnl.contains("TASK_ID"));
177 assert(fnl.contains("TIME_END"));
178 assert(fnl.contains("TIME_START"));
179 assert(fnl.contains("COUNTER_INPUT_BYTES"));
180
181 String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID"));
182 String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
183 String et = new String(
184 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
185 Long.parseLong(cr.getValue("TIME_START")))).toString()
186 );
187 String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
188 cr.add(JCDF_ID1, id1);
189 cr.add(JCDF_ID2, id2);
190 cr.add(JCDF_EDGE_TIME, et);
191 cr.add(JCDF_EDGE_VOL, ev);
192 }
193
194
195
196
197 protected void addStitchingFields_redsort
198 (ChukwaRecord cr, ArrayList<String> fnl)
199 {
200 assert(fnl.contains("TASK_ID"));
201 assert(fnl.contains("TIME_END"));
202 assert(fnl.contains("TIME_START"));
203 assert(fnl.contains("COUNTER_INPUT_BYTES"));
204
205 String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
206 String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
207 String et = new String(
208 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
209 Long.parseLong(cr.getValue("TIME_START")))).toString()
210 );
211 String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
212 cr.add(JCDF_ID1, id1);
213 cr.add(JCDF_ID2, id2);
214 cr.add(JCDF_EDGE_TIME, et);
215 cr.add(JCDF_EDGE_VOL, ev);
216 }
217
218
219
220
221 protected void addStitchingFields_redreducer
222 (ChukwaRecord cr, ArrayList<String> fnl)
223 {
224 assert(fnl.contains("TASK_ID"));
225 assert(fnl.contains("TIME_END"));
226 assert(fnl.contains("TIME_START"));
227 assert(fnl.contains("COUNTER_INPUT_BYTES"));
228
229 String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
230 String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID"));
231 String et = new String(
232 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
233 Long.parseLong(cr.getValue("TIME_START")))).toString()
234 );
235 String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
236 cr.add(JCDF_ID1, id1);
237 cr.add(JCDF_ID2, id2);
238 cr.add(JCDF_EDGE_TIME, et);
239 cr.add(JCDF_EDGE_VOL, ev);
240 }
241
242 protected void addStitchingFields_blockwrite
243 (ChukwaRecord cr, ArrayList<String> fnl)
244 {
245 assert(fnl.contains("JOB_ID"));
246 assert(fnl.contains("TASK_ID"));
247 assert(fnl.contains("TIME_END"));
248 assert(fnl.contains("TIME_START"));
249 assert(fnl.contains("COUNTER_BYTES"));
250
251 String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID"));
252 String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
253 String et = new String(
254 (new Long(Long.parseLong(cr.getValue("TIME_END")) -
255 Long.parseLong(cr.getValue("TIME_START")))).toString()
256 );
257 String ev = new String(cr.getValue("COUNTER_BYTES"));
258 cr.add(JCDF_ID1, id1);
259 cr.add(JCDF_ID2, id2);
260 cr.add(JCDF_EDGE_TIME, et);
261 cr.add(JCDF_EDGE_VOL, ev);
262 }
263
264 public void addStitchingFields
265 (ChukwaRecord cr)
266 {
267 String state_name = null;
268 String [] fieldNames = cr.getFields();
269
270
271 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
272 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
273
274
275 assert(fieldNamesList.contains("STATE_NAME"));
276
277 state_name = cr.getValue("STATE_NAME");
278 if (state_name.equals("MAP")) {
279 addStitchingFields_map(cr, fieldNamesList);
280 } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) {
281 addStitchingFields_redshufwait(cr, fieldNamesList);
282 } else if (state_name.equals("REDUCE_SORT")) {
283 addStitchingFields_redsort(cr, fieldNamesList);
284 } else if (state_name.equals("REDUCE_REDUCER")) {
285 addStitchingFields_redreducer(cr, fieldNamesList);
286 } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) {
287 addStitchingFields_shuffle(cr, fieldNamesList);
288 } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) {
289 addStitchingFields_blockread(cr, fieldNamesList);
290 } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
291 addStitchingFields_blockwrite(cr, fieldNamesList);
292 }
293
294 }
295
296 public void reduce
297 (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
298 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
299 Reporter reporter)
300 throws IOException
301 {
302 FSMIntermedEntry start_rec = null, end_rec = null;
303 FSMIntermedEntry tmpent;
304 String keystr = key.getKey();
305 String newkey;
306 ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
307 ArrayList<String> noncounters = new ArrayList<String>();
308 keystr = keystr.trim();
309 ChukwaRecord cr = new ChukwaRecord();
310
311 for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
312
313 ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
314
315 int itemcount = 0;
316 try {
317 while (values.hasNext()) {
318 itemcount++;
319 tmpent = values.next();
320 ents.add(tmpent.clone());
321 }
322 } catch (CloneNotSupportedException e) {
323
324 }
325
326 log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
327
328 if (itemcount == 2) {
329
330 if (ents.get(0).state_type.val == StateType.STATE_START &&
331 ents.get(1).state_type.val == StateType.STATE_END)
332 {
333 start_rec = ents.get(0); end_rec = ents.get(1);
334 } else if (ents.get(1).state_type.val == StateType.STATE_START &&
335 ents.get(0).state_type.val == StateType.STATE_END)
336 {
337 start_rec = ents.get(1); end_rec = ents.get(0);
338 } else {
339 log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
340
341 }
342
343 cr.add(new String("STATE_NAME"),start_rec.state_name);
344 cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID());
345 cr.add(new String("TIMESTAMP"),start_rec.timestamp);
346 cr.add(new String("TIME_START"),start_rec.time_start);
347 cr.add(new String("TIME_END"),end_rec.time_end);
348 cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3));
349 cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3));
350 cr.add(new String("HOST"),start_rec.host_exec);
351 cr.add(new String("HOST_OTHER"),start_rec.host_other);
352 cr.add(new String("JOB_ID"),start_rec.job_id);
353 cr.add(new String("TASK_ID"),start_rec.getFriendlyID());
354
355 Set<String> treemapkeys = end_rec.add_info.keySet();
356 Iterator<String> keyIter = treemapkeys.iterator();
357
358 for (int i = 0; i < treemapkeys.size(); i++) {
359 assert(keyIter.hasNext());
360 String currkey = keyIter.next();
361 if (currkey != null &&
362 !noncounters.contains(currkey)) {
363 cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey));
364 } else if (currkey != null && noncounters.contains(currkey)) {
365 cr.add(new String(currkey), end_rec.add_info.get(currkey));
366 }
367 }
368 assert(!keyIter.hasNext());
369 cr.setTime(Long.parseLong(start_rec.timestamp));
370
371 newkey = null;
372 newkey = new String(start_rec.time_orig_epoch +
373 SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig);
374
375 log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
376
377 addStitchingFields(cr);
378 log.debug(cr);
379 coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
380
381 } else if (itemcount == 1) {
382
383
384
385
386 log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
387
388 } else {
389
390
391 log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
392
393 }
394 }
395 }
396
397
398 public int run (String args[]) throws Exception {
399 int num_inputs;
400 JobConf conf = new JobConf(getConf(), FSMBuilder.class);
401 String [] args2 = args;
402
403 if (args2.length < 4 || !"-in".equals(args2[0]))
404 {
405 System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
406 System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
407 return(1);
408 }
409
410 conf.setJobName("Salsa_FSMBuilder");
411
412
413 String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
414 log.info("Mapper class: " + mapclassname);
415 Class mapperClass = null;
416 try {
417 mapperClass = Class.forName(mapclassname);
418 } catch (ClassNotFoundException c) {
419 System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
420 }
421
422
423 conf.setMapperClass(mapperClass);
424 conf.setReducerClass(FSMReducer.class);
425 conf.setOutputKeyClass(ChukwaRecordKey.class);
426 conf.setOutputValueClass(ChukwaRecord.class);
427 conf.setInputFormat(SequenceFileInputFormat.class);
428 conf.setOutputFormat(ChukwaRecordOutputFormat.class);
429 conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
430 conf.setMapOutputValueClass(FSMIntermedEntry.class);
431 conf.setMapOutputKeyClass(ChukwaRecordKey.class);
432 conf.setNumReduceTasks(1);
433
434
435 try {
436 num_inputs = Integer.parseInt(args2[1]);
437 } catch (NumberFormatException e) {
438 System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
439 System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
440 return(1);
441 }
442
443 if (num_inputs <= 0) {
444 System.err.println("Must have at least 1 input.");
445 return(1);
446 }
447
448 for (int i = 2; i < 2+num_inputs; i++) {
449 Path in = new Path(args2[i]);
450 FileInputFormat.addInputPath(conf, in);
451 }
452
453 Path out = new Path(args2[2+num_inputs]);
454 FileOutputFormat.setOutputPath(conf, out);
455
456 JobClient.runJob(conf);
457
458 return(0);
459 }
460
461 public static void main (String [] args) throws Exception {
462
463 int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
464
465 System.exit(res);
466 }
467
468
469
470 }
471