This project has retired. For details please refer to its
Attic page.
JobHistoryTaskDataMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.hadoop.chukwa.extraction.engine.*;
28 import org.apache.hadoop.mapred.*;
29
30
31
32
33
34
35
36
37
38
39
40 public class JobHistoryTaskDataMapper
41 extends MapReduceBase
42 implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
43 {
44 private static Log log = LogFactory.getLog(FSMBuilder.class);
45 protected static final String SEP = "/";
46
47 protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
48
49
50
51
52
53 protected FSMIntermedEntry populateRecord_MapCounters
54 (FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
55 {
56 String mapCounterNames [] = {
57 "Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
58 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
59 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
60 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
61 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
62 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
63 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
64 "Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
65 };
66 String mapCounterDestNames[] = {
67 "FILE_BYTES_WRITTEN",
68 "COMBINE_INPUT_RECORDS",
69 "COMBINE_OUTPUT_RECORDS",
70 "INPUT_BYTES",
71 "INPUT_RECORDS",
72 "OUTPUT_BYTES",
73 "OUTPUT_RECORDS",
74 "SPILLED_RECORDS"
75 };
76
77 assert(mapCounterDestNames.length == mapCounterNames.length);
78
79 for (int i = 0; i < mapCounterDestNames.length; i++) {
80 if (fieldNamesList.contains(mapCounterNames[i])) {
81 this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));
82 }
83 }
84 this_rec.add_info.put("FILE_BYTES_READ","0");
85 this_rec.add_info.put("INPUT_GROUPS","0");
86
87 return this_rec;
88 }
89
90
91
92
93
94 protected FSMIntermedEntry populateRecord_ReduceCounters
95 (FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
96 {
97 String redCounterNames [] = {
98 "Counter:FileSystemCounters:FILE_BYTES_READ",
99 "Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
100 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
101 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
102 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
103 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
104 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
105 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
106 "Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
107 };
108 String redCounterDestNames[] = {
109 "FILE_BYTES_READ",
110 "FILE_BYTES_WRITTEN",
111 "COMBINE_INPUT_RECORDS",
112 "COMBINE_OUTPUT_RECORDS",
113 "INPUT_GROUPS",
114 "INPUT_RECORDS",
115 "OUTPUT_RECORDS",
116 "INPUT_BYTES",
117 "SPILLED_RECORDS"
118 };
119
120 assert(redCounterDestNames.length == redCounterNames.length);
121
122 for (int i = 0; i < redCounterDestNames.length; i++) {
123 if (fieldNamesList.contains(redCounterNames[i])) {
124 this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));
125 }
126 }
127
128 this_rec.add_info.put("OUTPUT_BYTES","0");
129
130 return this_rec;
131 }
132
133 public void map
134 (ChukwaRecordKey key, ChukwaRecord val,
135 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
136 Reporter reporter)
137 throws IOException
138 {
139 String task_type;
140 FSMIntermedEntry this_rec = new FSMIntermedEntry();
141 boolean add_record = true;
142
143
144 String [] fieldNames = val.getFields();
145 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
146 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
147
148
149 if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return;
150 if (!fieldNamesList.contains("TASK_TYPE")) {
151 return;
152 } else {
153 task_type = val.getValue("TASK_TYPE");
154 if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
155 return;
156 }
157 }
158
159
160 if (fieldNamesList.contains("START_TIME")) {
161 this_rec.state_type.val = StateType.STATE_START;
162 this_rec.timestamp = val.getValue("START_TIME");
163 this_rec.time_start = val.getValue("START_TIME");
164 this_rec.time_end = "";
165 if (val.getValue("START_TIME").length() < 4+2) {
166 add_record = add_record & false;
167 }
168 } else if (fieldNamesList.contains("FINISH_TIME")) {
169 this_rec.state_type.val = StateType.STATE_END;
170 this_rec.timestamp = val.getValue("FINISH_TIME");
171 this_rec.time_start = "";
172 this_rec.time_end = val.getValue("FINISH_TIME");
173 if (val.getValue("FINISH_TIME").length() < 4+2) {
174 add_record = add_record & false;
175 }
176 } else {
177 this_rec.state_type.val = StateType.STATE_NOOP;
178 }
179
180
181
182
183 try {
184 this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
185 } catch (Exception e) {
186 log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
187 return;
188 }
189
190
191 this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
192 if (task_type.equals("MAP")) {
193 this_rec.state_mapred = new MapRedState(MapRedState.MAP);
194 } else if (task_type.equals("REDUCE")) {
195 this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
196 } else {
197 this_rec.state_mapred = new MapRedState(MapRedState.NONE);
198 }
199
200
201 this_rec.state_name = this_rec.state_mapred.toString();
202 this_rec.identifier = val.getValue("TASK_ATTEMPT_ID");
203 this_rec.generateUniqueID();
204
205
206 if (fieldNamesList.contains("HOSTNAME")) {
207 this_rec.host_exec = val.getValue("HOSTNAME");
208 this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
209 } else if (fieldNamesList.contains("TRACKER_NAME")) {
210 this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
211 } else {
212 this_rec.host_exec = "";
213 }
214
215 if (this_rec.state_type.val == StateType.STATE_END) {
216 assert(fieldNamesList.contains("TASK_STATUS"));
217 String tmpstring = null;
218 tmpstring = val.getValue("TASK_STATUS");
219 if (tmpstring != null && (tmpstring.equals("KILLED") || tmpstring.equals("FAILED"))) {
220 add_record = add_record & false;
221 }
222 if (tmpstring != null && tmpstring.length() > 0) {
223 this_rec.add_info.put("STATE_STRING",tmpstring);
224 } else {
225 this_rec.add_info.put("STATE_STRING","");
226 }
227
228 switch(this_rec.state_mapred.val) {
229 case MapRedState.MAP:
230 this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
231 break;
232 case MapRedState.REDUCE:
233 this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
234 break;
235 default:
236
237 break;
238 }
239 }
240
241 assert(fieldNamesList.contains(Record.tagsField));
242 assert(fieldNamesList.contains("csource"));
243 this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
244 this_rec.add_info.put("csource",val.getValue("csource"));
245
246
247 if (task_type.equals("REDUCE")) {
248 if (this_rec.state_type.val == StateType.STATE_END) {
249 add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
250 } else if (this_rec.state_type.val == StateType.STATE_START) {
251 add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);
252 }
253 } else if (task_type.equals("MAP")) {
254 add_record = add_record & true;
255 }
256
257 if (add_record) {
258 log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
259 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec);
260 }
261
262 }
263
264 protected boolean expandReduceStart
265 (ChukwaRecordKey key, ChukwaRecord val,
266 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
267 Reporter reporter, FSMIntermedEntry this_rec)
268 throws IOException
269 {
270 FSMIntermedEntry redshuf_start_rec = null;
271
272 try {
273 redshuf_start_rec = this_rec.clone();
274 } catch (CloneNotSupportedException e) {
275
276 }
277
278 redshuf_start_rec.state_type = new StateType(StateType.STATE_START);
279 redshuf_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
280
281 redshuf_start_rec.timestamp = this_rec.timestamp;
282 redshuf_start_rec.time_start = this_rec.timestamp;
283 redshuf_start_rec.time_end = "";
284
285 redshuf_start_rec.generateUniqueID();
286
287 log.debug("Collecting record " + redshuf_start_rec +
288 "("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
289 output.collect(
290 new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
291 redshuf_start_rec
292 );
293
294 return true;
295 }
296
297
298
299 protected boolean expandReduceEnd
300 (ChukwaRecordKey key, ChukwaRecord val,
301 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
302 Reporter reporter, FSMIntermedEntry this_rec)
303 throws IOException
304 {
305
306
307
308 FSMIntermedEntry redshuf_end_rec = null;
309 FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
310 FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
311
312
313 String [] fieldNames = val.getFields();
314 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
315 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
316
317 try {
318 redsort_start_rec = this_rec.clone();
319 redred_start_rec = this_rec.clone();
320 redshuf_end_rec = this_rec.clone();
321 redsort_end_rec = this_rec.clone();
322 redred_end_rec = this_rec.clone();
323 } catch (CloneNotSupportedException e) {
324
325 }
326
327 redshuf_end_rec.state_type = new StateType(StateType.STATE_END);
328 redshuf_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
329
330 redsort_start_rec.state_type = new StateType(StateType.STATE_START);
331 redsort_end_rec.state_type = new StateType(StateType.STATE_END);
332 redsort_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
333 redsort_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
334
335 redred_start_rec.state_type = new StateType(StateType.STATE_START);
336 redred_end_rec.state_type = new StateType(StateType.STATE_END);
337 redred_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
338 redred_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
339
340 redshuf_end_rec.generateUniqueID();
341 redsort_start_rec.generateUniqueID();
342 redsort_end_rec.generateUniqueID();
343 redred_start_rec.generateUniqueID();
344 redred_end_rec.generateUniqueID();
345
346 if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
347 if (val.getValue("SHUFFLE_FINISHED") == null) return false;
348 if (val.getValue("SORT_FINISHED") == null) return false;
349 } else {
350 return false;
351 }
352 redshuf_end_rec.timestamp = val.getValue("SHUFFLE_FINISHED");
353 redshuf_end_rec.time_start = "";
354 redshuf_end_rec.time_end = val.getValue("SHUFFLE_FINISHED");
355 redsort_start_rec.timestamp = val.getValue("SHUFFLE_FINISHED");
356 redsort_start_rec.time_start = val.getValue("SHUFFLE_FINISHED");
357 redsort_start_rec.time_end = "";
358
359 assert(fieldNamesList.contains("SORT_FINISHED"));
360 redsort_end_rec.timestamp = val.getValue("SORT_FINISHED");
361 redsort_end_rec.time_start = "";
362 redsort_end_rec.time_end = val.getValue("SORT_FINISHED");
363 redred_start_rec.timestamp = val.getValue("SORT_FINISHED");
364 redred_start_rec.time_start = val.getValue("SORT_FINISHED");
365 redred_start_rec.time_end = "";
366
367
368
369 log.debug("Collecting record " + redshuf_end_rec +
370 "("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
371 output.collect(
372 new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
373 redshuf_end_rec
374 );
375
376 log.debug("Collecting record " + redsort_start_rec +
377 "("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
378 output.collect(
379 new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
380 redsort_start_rec
381 );
382
383 log.debug("Collecting record " + redsort_end_rec +
384 "("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
385 output.collect(
386 new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
387 redsort_end_rec
388 );
389
390 log.debug("Collecting record " + redred_start_rec +
391 "("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
392 output.collect(
393 new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
394 redred_start_rec
395 );
396
397 log.debug("Collecting record " + redred_end_rec +
398 "("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
399 output.collect(
400 new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
401 redred_end_rec
402 );
403
404 return true;
405 }
406
407 }