This project has retired. For details please refer to its
Attic page.
JobHistoryTaskDataMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.hadoop.chukwa.extraction.demux.*;
28 import org.apache.hadoop.chukwa.extraction.engine.*;
29 import org.apache.hadoop.conf.*;
30 import org.apache.hadoop.mapred.*;
31 import org.apache.hadoop.util.*;
32
33
34
35
36
37
38
39
40
41
42
43 public class JobHistoryTaskDataMapper
44 extends MapReduceBase
45 implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
46 {
47 private static Log log = LogFactory.getLog(FSMBuilder.class);
48 protected static final String SEP = "/";
49
50 protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
51
52
53
54
55
56 protected FSMIntermedEntry populateRecord_MapCounters
57 (FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
58 {
59 String mapCounterNames [] = {
60 "Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
61 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
62 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
63 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
64 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
65 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
66 "Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
67 "Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
68 };
69 String mapCounterDestNames[] = {
70 "FILE_BYTES_WRITTEN",
71 "COMBINE_INPUT_RECORDS",
72 "COMBINE_OUTPUT_RECORDS",
73 "INPUT_BYTES",
74 "INPUT_RECORDS",
75 "OUTPUT_BYTES",
76 "OUTPUT_RECORDS",
77 "SPILLED_RECORDS"
78 };
79
80 assert(mapCounterDestNames.length == mapCounterNames.length);
81
82 for (int i = 0; i < mapCounterDestNames.length; i++) {
83 if (fieldNamesList.contains(mapCounterNames[i])) {
84 this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));
85 }
86 }
87 this_rec.add_info.put("FILE_BYTES_READ",new String("0"));
88 this_rec.add_info.put("INPUT_GROUPS",new String("0"));
89
90 return this_rec;
91 }
92
93
94
95
96
97 protected FSMIntermedEntry populateRecord_ReduceCounters
98 (FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
99 {
100 String redCounterNames [] = {
101 "Counter:FileSystemCounters:FILE_BYTES_READ",
102 "Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
103 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
104 "Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
105 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
106 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
107 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
108 "Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
109 "Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
110 };
111 String redCounterDestNames[] = {
112 "FILE_BYTES_READ",
113 "FILE_BYTES_WRITTEN",
114 "COMBINE_INPUT_RECORDS",
115 "COMBINE_OUTPUT_RECORDS",
116 "INPUT_GROUPS",
117 "INPUT_RECORDS",
118 "OUTPUT_RECORDS",
119 "INPUT_BYTES",
120 "SPILLED_RECORDS"
121 };
122
123 assert(redCounterDestNames.length == redCounterNames.length);
124
125 for (int i = 0; i < redCounterDestNames.length; i++) {
126 if (fieldNamesList.contains(redCounterNames[i])) {
127 this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));
128 }
129 }
130
131 this_rec.add_info.put("OUTPUT_BYTES",new String("0"));
132
133 return this_rec;
134 }
135
136 public void map
137 (ChukwaRecordKey key, ChukwaRecord val,
138 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
139 Reporter reporter)
140 throws IOException
141 {
142 String task_type;
143 FSMIntermedEntry this_rec = new FSMIntermedEntry();
144 boolean add_record = true;
145
146
147 String [] fieldNames = val.getFields();
148 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
149 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
150
151
152 if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return;
153 if (!fieldNamesList.contains("TASK_TYPE")) {
154 return;
155 } else {
156 task_type = val.getValue("TASK_TYPE");
157 if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
158 return;
159 }
160 }
161
162
163 if (fieldNamesList.contains("START_TIME")) {
164 this_rec.state_type.val = StateType.STATE_START;
165 this_rec.timestamp = new String(val.getValue("START_TIME"));
166 this_rec.time_start = new String(val.getValue("START_TIME"));
167 this_rec.time_end = new String("");
168 if (val.getValue("START_TIME").length() < 4+2) {
169 add_record = add_record & false;
170 }
171 } else if (fieldNamesList.contains("FINISH_TIME")) {
172 this_rec.state_type.val = StateType.STATE_END;
173 this_rec.timestamp = new String(val.getValue("FINISH_TIME"));
174 this_rec.time_start = new String("");
175 this_rec.time_end = new String(val.getValue("FINISH_TIME"));
176 if (val.getValue("FINISH_TIME").length() < 4+2) {
177 add_record = add_record & false;
178 }
179 } else {
180 this_rec.state_type.val = StateType.STATE_NOOP;
181 }
182
183
184
185
186 try {
187 this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
188 } catch (Exception e) {
189 log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
190 return;
191 }
192
193
194 this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
195 if (task_type.equals("MAP")) {
196 this_rec.state_mapred = new MapRedState(MapRedState.MAP);
197 } else if (task_type.equals("REDUCE")) {
198 this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
199 } else {
200 this_rec.state_mapred = new MapRedState(MapRedState.NONE);
201 }
202
203
204 this_rec.state_name = new String(this_rec.state_mapred.toString());
205 this_rec.identifier = new String(val.getValue("TASK_ATTEMPT_ID"));
206 this_rec.generateUniqueID();
207
208
209 if (fieldNamesList.contains("HOSTNAME")) {
210 this_rec.host_exec = new String(val.getValue("HOSTNAME"));
211 this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
212 } else if (fieldNamesList.contains("TRACKER_NAME")) {
213 this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
214 } else {
215 this_rec.host_exec = new String("");
216 }
217
218 if (this_rec.state_type.val == StateType.STATE_END) {
219 assert(fieldNamesList.contains("TASK_STATUS"));
220 String tmpstring = null;
221 tmpstring = val.getValue("TASK_STATUS");
222 if (tmpstring.equals("KILLED") || tmpstring.equals("FAILED")) {
223 add_record = add_record & false;
224 }
225 if (tmpstring != null && tmpstring.length() > 0) {
226 this_rec.add_info.put("STATE_STRING",tmpstring);
227 } else {
228 this_rec.add_info.put("STATE_STRING",new String(""));
229 }
230
231 switch(this_rec.state_mapred.val) {
232 case MapRedState.MAP:
233 this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
234 break;
235 case MapRedState.REDUCE:
236 this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
237 break;
238 default:
239
240 break;
241 }
242 }
243
244 assert(fieldNamesList.contains(Record.tagsField));
245 assert(fieldNamesList.contains("csource"));
246 this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
247 this_rec.add_info.put("csource",val.getValue("csource"));
248
249
250 if (task_type.equals("REDUCE")) {
251 if (this_rec.state_type.val == StateType.STATE_END) {
252 add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
253 } else if (this_rec.state_type.val == StateType.STATE_START) {
254 add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);
255 }
256 } else if (task_type.equals("MAP")) {
257 add_record = add_record & true;
258 }
259
260 if (add_record) {
261 log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
262 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec);
263 }
264
265 }
266
267 protected boolean expandReduceStart
268 (ChukwaRecordKey key, ChukwaRecord val,
269 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
270 Reporter reporter, FSMIntermedEntry this_rec)
271 throws IOException
272 {
273 FSMIntermedEntry redshuf_start_rec = null;
274
275 try {
276 redshuf_start_rec = this_rec.clone();
277 } catch (CloneNotSupportedException e) {
278
279 }
280
281 redshuf_start_rec.state_type = new StateType(StateType.STATE_START);
282 redshuf_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
283
284 redshuf_start_rec.timestamp = new String(this_rec.timestamp);
285 redshuf_start_rec.time_start = new String(this_rec.timestamp);
286 redshuf_start_rec.time_end = new String("");
287
288 redshuf_start_rec.generateUniqueID();
289
290 log.debug("Collecting record " + redshuf_start_rec +
291 "("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
292 output.collect(
293 new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
294 redshuf_start_rec
295 );
296
297 return true;
298 }
299
300
301
302 protected boolean expandReduceEnd
303 (ChukwaRecordKey key, ChukwaRecord val,
304 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
305 Reporter reporter, FSMIntermedEntry this_rec)
306 throws IOException
307 {
308
309
310
311 FSMIntermedEntry redshuf_end_rec = null;
312 FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
313 FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
314
315
316 String [] fieldNames = val.getFields();
317 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
318 for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
319
320 try {
321 redsort_start_rec = this_rec.clone();
322 redred_start_rec = this_rec.clone();
323 redshuf_end_rec = this_rec.clone();
324 redsort_end_rec = this_rec.clone();
325 redred_end_rec = this_rec.clone();
326 } catch (CloneNotSupportedException e) {
327
328 }
329
330 redshuf_end_rec.state_type = new StateType(StateType.STATE_END);
331 redshuf_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
332
333 redsort_start_rec.state_type = new StateType(StateType.STATE_START);
334 redsort_end_rec.state_type = new StateType(StateType.STATE_END);
335 redsort_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
336 redsort_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
337
338 redred_start_rec.state_type = new StateType(StateType.STATE_START);
339 redred_end_rec.state_type = new StateType(StateType.STATE_END);
340 redred_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
341 redred_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
342
343 redshuf_end_rec.generateUniqueID();
344 redsort_start_rec.generateUniqueID();
345 redsort_end_rec.generateUniqueID();
346 redred_start_rec.generateUniqueID();
347 redred_end_rec.generateUniqueID();
348
349 if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
350 if (val.getValue("SHUFFLE_FINISHED") == null) return false;
351 if (val.getValue("SORT_FINISHED") == null) return false;
352 } else {
353 return false;
354 }
355 redshuf_end_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
356 redshuf_end_rec.time_start = new String("");
357 redshuf_end_rec.time_end = new String(val.getValue("SHUFFLE_FINISHED"));
358 redsort_start_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
359 redsort_start_rec.time_start = new String(val.getValue("SHUFFLE_FINISHED"));
360 redsort_start_rec.time_end = new String("");
361
362 assert(fieldNamesList.contains("SORT_FINISHED"));
363 redsort_end_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
364 redsort_end_rec.time_start = new String("");
365 redsort_end_rec.time_end = new String(val.getValue("SORT_FINISHED"));
366 redred_start_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
367 redred_start_rec.time_start = new String(val.getValue("SORT_FINISHED"));
368 redred_start_rec.time_end = new String("");
369
370
371
372 log.debug("Collecting record " + redshuf_end_rec +
373 "("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
374 output.collect(
375 new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
376 redshuf_end_rec
377 );
378
379 log.debug("Collecting record " + redsort_start_rec +
380 "("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
381 output.collect(
382 new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
383 redsort_start_rec
384 );
385
386 log.debug("Collecting record " + redsort_end_rec +
387 "("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
388 output.collect(
389 new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
390 redsort_end_rec
391 );
392
393 log.debug("Collecting record " + redred_start_rec +
394 "("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
395 output.collect(
396 new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
397 redred_start_rec
398 );
399
400 log.debug("Collecting record " + redred_end_rec +
401 "("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
402 output.collect(
403 new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
404 redred_end_rec
405 );
406
407 return true;
408 }
409
410 }