This project has retired. For details please refer to its Attic page.
JobHistoryTaskDataMapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  
27  import org.apache.hadoop.chukwa.extraction.engine.*;
28  import org.apache.hadoop.mapred.*;
29  
30  /**
31   * Pluggable mapper for FSMBuilder
32   * Supports only 0.20+ JobHistory files
33   * because of explicitly coded counter names
34   *
35   * K2 = State Name + State ID 
36   * (We use ChukwaRecordKey since it would already have implemented a bunch of
37   *  useful things such as Comparators etc.)
38   * V2 = TreeMap
39   */
40  public class JobHistoryTaskDataMapper 
41    extends MapReduceBase 
42    implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry> 
43  {
44    private static Log log = LogFactory.getLog(FSMBuilder.class);
45  	protected static final String SEP = "/";
46  	
47  	protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
48  
49  	/*
50  	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
51  	 * with input/output counters for Map records
52  	 */
53  	protected FSMIntermedEntry populateRecord_MapCounters
54  		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
55  	{
56  		String mapCounterNames [] = {		
57  			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
58  			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
59  			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
60  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
61  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
62  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
63  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
64  			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
65  		};
66  		String mapCounterDestNames[] = {
67  			"FILE_BYTES_WRITTEN",
68  			"COMBINE_INPUT_RECORDS",
69  			"COMBINE_OUTPUT_RECORDS",
70  			"INPUT_BYTES",
71  			"INPUT_RECORDS",
72  			"OUTPUT_BYTES",
73  			"OUTPUT_RECORDS",
74  			"SPILLED_RECORDS"
75  		};
76  		
77  		assert(mapCounterDestNames.length == mapCounterNames.length);
78  		
79  		for (int i = 0; i < mapCounterDestNames.length; i++) {
80  			if (fieldNamesList.contains(mapCounterNames[i])) {
81  				this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));				
82  			}
83  		}
84  		this_rec.add_info.put("FILE_BYTES_READ","0"); // to have same fields as reduce
85  		this_rec.add_info.put("INPUT_GROUPS","0"); // to have same fields as reduce
86  		
87  		return this_rec;
88  	}
89  
90  	/*
91  	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
92  	 * with input/output counters for Reduce records
93  	 */
94  	protected FSMIntermedEntry populateRecord_ReduceCounters
95  		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
96  	{
97  		String redCounterNames [] = {		
98  			"Counter:FileSystemCounters:FILE_BYTES_READ",
99  			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
100 			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
101 			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
102 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
103 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
104 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
105 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
106 			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
107 		};
108 		String redCounterDestNames[] = {
109 			"FILE_BYTES_READ",
110 			"FILE_BYTES_WRITTEN",
111 			"COMBINE_INPUT_RECORDS",
112 			"COMBINE_OUTPUT_RECORDS",
113 			"INPUT_GROUPS",
114 			"INPUT_RECORDS",
115 			"OUTPUT_RECORDS",
116 			"INPUT_BYTES", // NOTE: shuffle bytes are mapped to "input_bytes"
117 			"SPILLED_RECORDS"
118 		};
119 		
120 		assert(redCounterDestNames.length == redCounterNames.length);
121 		
122 		for (int i = 0; i < redCounterDestNames.length; i++) {
123 			if (fieldNamesList.contains(redCounterNames[i])) {
124 				this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));				
125 			}
126 		}
127 		
128 		this_rec.add_info.put("OUTPUT_BYTES","0"); // to have same fields as map		
129 		
130 		return this_rec;		
131 	}
132 
133   public void map
134     (ChukwaRecordKey key, ChukwaRecord val,
135      OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
136 		 Reporter reporter)
137     throws IOException 
138   {
139 		String task_type;
140 		FSMIntermedEntry this_rec = new FSMIntermedEntry(); 
141 		boolean add_record = true;
142 
143 		/* Extract field names for checking */
144 		String [] fieldNames = val.getFields();
145 		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
146 		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
147 					
148 		/* Check state (Map or Reduce), generate unique ID */
149 		if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return; // Ignore "TASK" entries
150 		if (!fieldNamesList.contains("TASK_TYPE")) { // Malformed, ignore
151 			return;
152 		} else {
153 			task_type = val.getValue("TASK_TYPE"); 
154 			if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
155 				return; // do nothing
156 			} 
157 		} 
158 
159 		/* Check if this is a start or end entry, set state type, extract start/end times */
160 		if (fieldNamesList.contains("START_TIME")) {
161 			this_rec.state_type.val = StateType.STATE_START;
162 			this_rec.timestamp = val.getValue("START_TIME");
163 			this_rec.time_start = val.getValue("START_TIME");
164 			this_rec.time_end = "";
165 			if (val.getValue("START_TIME").length() < 4+2) { // needs to at least have milliseconds
166 			  add_record = add_record & false;
167 			} 
168 		} else if (fieldNamesList.contains("FINISH_TIME")) {
169 			this_rec.state_type.val = StateType.STATE_END;
170 			this_rec.timestamp = val.getValue("FINISH_TIME");
171 			this_rec.time_start = "";
172 			this_rec.time_end = val.getValue("FINISH_TIME");
173 			if (val.getValue("FINISH_TIME").length() < 4+2) { // needs to at least have milliseconds
174 			  add_record = add_record & false;
175 			} 		
176 		} else {
177 			this_rec.state_type.val = StateType.STATE_NOOP;
178 		}
179 		
180 		/* Fill in common intermediate state entry information */
181 		
182 		// Extract original ChukwaRecordKey values for later key reconstruction by reducer
183 		try {
184 			this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
185 		} catch (Exception e) {
186 			log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
187 			return;
188 		}
189 		
190 		// Populate state enum information
191 		this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
192 		if (task_type.equals("MAP")) {
193 			this_rec.state_mapred = new MapRedState(MapRedState.MAP);
194 		} else if (task_type.equals("REDUCE")) {
195 			this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
196 		} else {
197 			this_rec.state_mapred = new MapRedState(MapRedState.NONE); // error handling here?
198 		}
199 		
200 		// Fill state name, unique ID
201 		this_rec.state_name = this_rec.state_mapred.toString();
202 		this_rec.identifier = val.getValue("TASK_ATTEMPT_ID");
203 		this_rec.generateUniqueID();
204 		
205 		// Extract hostname from tracker name (if present), or directly fill from hostname (<= 0.18)
206 		if (fieldNamesList.contains("HOSTNAME")) {
207 			this_rec.host_exec = val.getValue("HOSTNAME");
208 			this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
209 		} else if (fieldNamesList.contains("TRACKER_NAME")) {
210 			this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
211 		} else {
212 			this_rec.host_exec = "";
213 		}
214 		
215 		if (this_rec.state_type.val == StateType.STATE_END) {
216 			assert(fieldNamesList.contains("TASK_STATUS"));
217 			String tmpstring = null;
218 			tmpstring = val.getValue("TASK_STATUS");
219 			if (tmpstring != null && (tmpstring.equals("KILLED") || tmpstring.equals("FAILED"))) {
220 			  add_record = add_record & false;
221 			}
222 			if (tmpstring != null && tmpstring.length() > 0) {
223 				this_rec.add_info.put("STATE_STRING",tmpstring);
224 			} else {
225 				this_rec.add_info.put("STATE_STRING","");
226 			}
227 			
228 			switch(this_rec.state_mapred.val) {
229 				case MapRedState.MAP:
230 					this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
231 					break;
232 				case MapRedState.REDUCE:
233 					this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
234 					break;
235 				default:
236 					// do nothing
237 					break;
238 			}
239 		}
240 		// manually add clustername etc
241 		assert(fieldNamesList.contains(Record.tagsField));
242 		assert(fieldNamesList.contains("csource"));
243 		this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
244 		this_rec.add_info.put("csource",val.getValue("csource"));
245 				
246 		/* Special handling for Reduce Ends */
247 		if (task_type.equals("REDUCE")) {
248 			if (this_rec.state_type.val == StateType.STATE_END) {
249 				add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
250 			} else if (this_rec.state_type.val == StateType.STATE_START) {
251 				add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);				
252 			}
253 		} else if (task_type.equals("MAP")) {
254 		  add_record = add_record & true;
255 		}
256 		
257 		if (add_record) {
258 		  log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
259 		  output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec); 
260 	  }
261 		
262   } // end of map()
263 
264 	protected boolean expandReduceStart
265 			(ChukwaRecordKey key, ChukwaRecord val,
266 	   	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
267 		 	 Reporter reporter, FSMIntermedEntry this_rec)
268 			throws IOException
269 	{
270 		FSMIntermedEntry redshuf_start_rec = null;
271 		
272 		try {
273 			redshuf_start_rec = this_rec.clone();
274 		} catch (CloneNotSupportedException e) {
275 			// TODO: Error handling
276 		}
277 				
278 		redshuf_start_rec.state_type 		= new StateType(StateType.STATE_START);
279 		redshuf_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
280 	 	
281 		redshuf_start_rec.timestamp = this_rec.timestamp;
282 		redshuf_start_rec.time_start = this_rec.timestamp;
283 		redshuf_start_rec.time_end = "";
284 		
285 		redshuf_start_rec.generateUniqueID();
286 		
287 		log.debug("Collecting record " + redshuf_start_rec + 
288 			"("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
289 		output.collect(
290 			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
291 			redshuf_start_rec
292 		);
293 		
294 		return true;
295 	}
296 	/*
297 	 * Generates 5 extra FSMIntermedEntry's for a given reduce_end entry
298 	 */
299 	protected boolean expandReduceEnd 
300 		(ChukwaRecordKey key, ChukwaRecord val,
301    	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
302 	 	 Reporter reporter, FSMIntermedEntry this_rec)
303 		throws IOException
304 	{
305 		/* Split into ReduceShuffleWait, ReduceSort, ReduceReducer
306 		 * But also retain the original combined Reduce at the same time 
307 		 */
308 		FSMIntermedEntry redshuf_end_rec = null;
309 		FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
310 		FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
311 		
312 		/* Extract field names for checking */
313 		String [] fieldNames = val.getFields();
314 		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
315 		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
316 		
317 		try {
318 		 	redsort_start_rec 	= this_rec.clone();
319 		 	redred_start_rec 		= this_rec.clone();
320 		 	redshuf_end_rec		= this_rec.clone();
321 		 	redsort_end_rec 	= this_rec.clone();
322 		 	redred_end_rec 		= this_rec.clone();
323 		} catch (CloneNotSupportedException e) {
324 			// TODO: Error handling
325 		}
326 
327 		redshuf_end_rec.state_type 			= new StateType(StateType.STATE_END);
328 		redshuf_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
329 
330 		redsort_start_rec.state_type 		= new StateType(StateType.STATE_START);
331 		redsort_end_rec.state_type 			= new StateType(StateType.STATE_END);
332 		redsort_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SORT);
333 		redsort_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SORT);
334 
335 		redred_start_rec.state_type 		= new StateType(StateType.STATE_START);
336 		redred_end_rec.state_type 			= new StateType(StateType.STATE_END);
337 		redred_start_rec.state_mapred		= new MapRedState(MapRedState.REDUCE_REDUCER);
338 		redred_end_rec.state_mapred			= new MapRedState(MapRedState.REDUCE_REDUCER);
339 		
340 		redshuf_end_rec.generateUniqueID();
341 		redsort_start_rec.generateUniqueID();
342 		redsort_end_rec.generateUniqueID();
343 		redred_start_rec.generateUniqueID();
344 		redred_end_rec.generateUniqueID();
345 		
346 		if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
347 		  if (val.getValue("SHUFFLE_FINISHED") == null) return false;
348 		  if (val.getValue("SORT_FINISHED") == null) return false;
349 		} else {
350 		  return false;
351 		}
352 		redshuf_end_rec.timestamp = val.getValue("SHUFFLE_FINISHED");
353 		redshuf_end_rec.time_start = "";
354 		redshuf_end_rec.time_end = val.getValue("SHUFFLE_FINISHED");
355 		redsort_start_rec.timestamp = val.getValue("SHUFFLE_FINISHED"); 
356 		redsort_start_rec.time_start = val.getValue("SHUFFLE_FINISHED"); 
357 		redsort_start_rec.time_end = "";
358 		
359 		assert(fieldNamesList.contains("SORT_FINISHED"));
360 		redsort_end_rec.timestamp = val.getValue("SORT_FINISHED");
361 		redsort_end_rec.time_start = "";
362 		redsort_end_rec.time_end = val.getValue("SORT_FINISHED");
363 		redred_start_rec.timestamp = val.getValue("SORT_FINISHED");
364 		redred_start_rec.time_start = val.getValue("SORT_FINISHED");
365 		redred_start_rec.time_end = "";
366 		
367 		/* redred_end times are exactly the same as the original red_end times */
368 		
369 		log.debug("Collecting record " + redshuf_end_rec + 
370 			"("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
371 		output.collect(
372 			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
373 			redshuf_end_rec
374 		);
375 		
376 		log.debug("Collecting record " + redsort_start_rec + 
377 			"("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
378 		output.collect(
379 			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
380 			redsort_start_rec
381 		);
382 		
383 		log.debug("Collecting record " + redsort_end_rec + 
384 			"("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
385 		output.collect(
386 			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
387 			redsort_end_rec
388 		);
389 		
390 		log.debug("Collecting record " + redred_start_rec + 
391 			"("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
392 		output.collect(
393 			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
394 			redred_start_rec
395 		);
396 		
397 		log.debug("Collecting record " + redred_end_rec + 
398 			"("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");	
399 		output.collect(
400 			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
401 			redred_end_rec
402 		);
403 		
404 		return true;
405 	}
406 
407 } // end of mapper class