This project has retired. For details please refer to its Attic page.
JobHistoryTaskDataMapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  
27  import org.apache.hadoop.chukwa.extraction.demux.*;
28  import org.apache.hadoop.chukwa.extraction.engine.*;
29  import org.apache.hadoop.conf.*;
30  import org.apache.hadoop.mapred.*;
31  import org.apache.hadoop.util.*;
32  
33  /**
34   * Pluggable mapper for FSMBuilder
35   * Supports only 0.20+ JobHistory files
36   * because of explicitly coded counter names
37   *
38   * K2 = State Name + State ID 
39   * (We use ChukwaRecordKey since it would already have implemented a bunch of
40   *  useful things such as Comparators etc.)
41   * V2 = TreeMap
42   */
43  public class JobHistoryTaskDataMapper 
44    extends MapReduceBase 
45    implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry> 
46  {
47    private static Log log = LogFactory.getLog(FSMBuilder.class);
48  	protected static final String SEP = "/";
49  	
50  	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
51  
52  	/*
53  	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
54  	 * with input/output counters for Map records
55  	 */
56  	protected FSMIntermedEntry populateRecord_MapCounters
57  		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
58  	{
59  		String mapCounterNames [] = {		
60  			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
61  			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
62  			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
63  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
64  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
65  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
66  			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
67  			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
68  		};
69  		String mapCounterDestNames[] = {
70  			"FILE_BYTES_WRITTEN",
71  			"COMBINE_INPUT_RECORDS",
72  			"COMBINE_OUTPUT_RECORDS",
73  			"INPUT_BYTES",
74  			"INPUT_RECORDS",
75  			"OUTPUT_BYTES",
76  			"OUTPUT_RECORDS",
77  			"SPILLED_RECORDS"
78  		};
79  		
80  		assert(mapCounterDestNames.length == mapCounterNames.length);
81  		
82  		for (int i = 0; i < mapCounterDestNames.length; i++) {
83  			if (fieldNamesList.contains(mapCounterNames[i])) {
84  				this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));				
85  			}
86  		}
87  		this_rec.add_info.put("FILE_BYTES_READ",new String("0")); // to have same fields as reduce
88  		this_rec.add_info.put("INPUT_GROUPS",new String("0")); // to have same fields as reduce
89  		
90  		return this_rec;
91  	}
92  
93  	/*
94  	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
95  	 * with input/output counters for Reduce records
96  	 */
97  	protected FSMIntermedEntry populateRecord_ReduceCounters
98  		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
99  	{
100 		String redCounterNames [] = {		
101 			"Counter:FileSystemCounters:FILE_BYTES_READ",
102 			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
103 			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
104 			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
105 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
106 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
107 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
108 			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
109 			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
110 		};
111 		String redCounterDestNames[] = {
112 			"FILE_BYTES_READ",
113 			"FILE_BYTES_WRITTEN",
114 			"COMBINE_INPUT_RECORDS",
115 			"COMBINE_OUTPUT_RECORDS",
116 			"INPUT_GROUPS",
117 			"INPUT_RECORDS",
118 			"OUTPUT_RECORDS",
119 			"INPUT_BYTES", // NOTE: shuffle bytes are mapped to "input_bytes"
120 			"SPILLED_RECORDS"
121 		};
122 		
123 		assert(redCounterDestNames.length == redCounterNames.length);
124 		
125 		for (int i = 0; i < redCounterDestNames.length; i++) {
126 			if (fieldNamesList.contains(redCounterNames[i])) {
127 				this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));				
128 			}
129 		}
130 		
131 		this_rec.add_info.put("OUTPUT_BYTES",new String("0")); // to have same fields as map		
132 		
133 		return this_rec;		
134 	}
135 
136   public void map
137     (ChukwaRecordKey key, ChukwaRecord val,
138      OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
139 		 Reporter reporter)
140     throws IOException 
141   {
142 		String task_type;
143 		FSMIntermedEntry this_rec = new FSMIntermedEntry(); 
144 		boolean add_record = true;
145 
146 		/* Extract field names for checking */
147 		String [] fieldNames = val.getFields();
148 		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
149 		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
150 					
151 		/* Check state (Map or Reduce), generate unique ID */
152 		if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return; // Ignore "TASK" entries
153 		if (!fieldNamesList.contains("TASK_TYPE")) { // Malformed, ignore
154 			return;
155 		} else {
156 			task_type = val.getValue("TASK_TYPE"); 
157 			if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
158 				return; // do nothing
159 			} 
160 		} 
161 
162 		/* Check if this is a start or end entry, set state type, extract start/end times */
163 		if (fieldNamesList.contains("START_TIME")) {
164 			this_rec.state_type.val = StateType.STATE_START;
165 			this_rec.timestamp = new String(val.getValue("START_TIME"));
166 			this_rec.time_start = new String(val.getValue("START_TIME"));
167 			this_rec.time_end = new String("");
168 			if (val.getValue("START_TIME").length() < 4+2) { // needs to at least have milliseconds
169 			  add_record = add_record & false;
170 			} 
171 		} else if (fieldNamesList.contains("FINISH_TIME")) {
172 			this_rec.state_type.val = StateType.STATE_END;
173 			this_rec.timestamp = new String(val.getValue("FINISH_TIME"));
174 			this_rec.time_start = new String("");
175 			this_rec.time_end = new String(val.getValue("FINISH_TIME"));
176 			if (val.getValue("FINISH_TIME").length() < 4+2) { // needs to at least have milliseconds
177 			  add_record = add_record & false;
178 			} 		
179 		} else {
180 			this_rec.state_type.val = StateType.STATE_NOOP;
181 		}
182 		
183 		/* Fill in common intermediate state entry information */
184 		
185 		// Extract original ChukwaRecordKey values for later key reconstruction by reducer
186 		try {
187 			this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
188 		} catch (Exception e) {
189 			log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
190 			return;
191 		}
192 		
193 		// Populate state enum information
194 		this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
195 		if (task_type.equals("MAP")) {
196 			this_rec.state_mapred = new MapRedState(MapRedState.MAP);
197 		} else if (task_type.equals("REDUCE")) {
198 			this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
199 		} else {
200 			this_rec.state_mapred = new MapRedState(MapRedState.NONE); // error handling here?
201 		}
202 		
203 		// Fill state name, unique ID
204 		this_rec.state_name = new String(this_rec.state_mapred.toString());
205 		this_rec.identifier = new String(val.getValue("TASK_ATTEMPT_ID"));
206 		this_rec.generateUniqueID();
207 		
208 		// Extract hostname from tracker name (if present), or directly fill from hostname (<= 0.18)
209 		if (fieldNamesList.contains("HOSTNAME")) {
210 			this_rec.host_exec = new String(val.getValue("HOSTNAME"));
211 			this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
212 		} else if (fieldNamesList.contains("TRACKER_NAME")) {
213 			this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
214 		} else {
215 			this_rec.host_exec = new String("");
216 		}
217 		
218 		if (this_rec.state_type.val == StateType.STATE_END) {
219 			assert(fieldNamesList.contains("TASK_STATUS"));
220 			String tmpstring = null;
221 			tmpstring = val.getValue("TASK_STATUS");
222 			if (tmpstring.equals("KILLED") || tmpstring.equals("FAILED")) {
223 			  add_record = add_record & false;
224 			}
225 			if (tmpstring != null && tmpstring.length() > 0) {
226 				this_rec.add_info.put("STATE_STRING",tmpstring);
227 			} else {
228 				this_rec.add_info.put("STATE_STRING",new String(""));
229 			}
230 			
231 			switch(this_rec.state_mapred.val) {
232 				case MapRedState.MAP:
233 					this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
234 					break;
235 				case MapRedState.REDUCE:
236 					this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
237 					break;
238 				default:
239 					// do nothing
240 					break;
241 			}
242 		}
243 		// manually add clustername etc
244 		assert(fieldNamesList.contains(Record.tagsField));
245 		assert(fieldNamesList.contains("csource"));
246 		this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
247 		this_rec.add_info.put("csource",val.getValue("csource"));
248 				
249 		/* Special handling for Reduce Ends */
250 		if (task_type.equals("REDUCE")) {
251 			if (this_rec.state_type.val == StateType.STATE_END) {
252 				add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
253 			} else if (this_rec.state_type.val == StateType.STATE_START) {
254 				add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);				
255 			}
256 		} else if (task_type.equals("MAP")) {
257 		  add_record = add_record & true;
258 		}
259 		
260 		if (add_record) {
261 		  log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
262 		  output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec); 
263 	  }
264 		
265   } // end of map()
266 
267 	protected boolean expandReduceStart
268 			(ChukwaRecordKey key, ChukwaRecord val,
269 	   	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
270 		 	 Reporter reporter, FSMIntermedEntry this_rec)
271 			throws IOException
272 	{
273 		FSMIntermedEntry redshuf_start_rec = null;
274 		
275 		try {
276 			redshuf_start_rec = this_rec.clone();
277 		} catch (CloneNotSupportedException e) {
278 			// TODO: Error handling
279 		}
280 				
281 		redshuf_start_rec.state_type 		= new StateType(StateType.STATE_START);
282 		redshuf_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
283 	 	
284 		redshuf_start_rec.timestamp = new String(this_rec.timestamp);
285 		redshuf_start_rec.time_start = new String(this_rec.timestamp);
286 		redshuf_start_rec.time_end = new String("");
287 		
288 		redshuf_start_rec.generateUniqueID();
289 		
290 		log.debug("Collecting record " + redshuf_start_rec + 
291 			"("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
292 		output.collect(
293 			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
294 			redshuf_start_rec
295 		);
296 		
297 		return true;
298 	}
299 	/*
300 	 * Generates 5 extra FSMIntermedEntry's for a given reduce_end entry
301 	 */
302 	protected boolean expandReduceEnd 
303 		(ChukwaRecordKey key, ChukwaRecord val,
304    	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
305 	 	 Reporter reporter, FSMIntermedEntry this_rec)
306 		throws IOException
307 	{
308 		/* Split into ReduceShuffleWait, ReduceSort, ReduceReducer
309 		 * But also retain the original combined Reduce at the same time 
310 		 */
311 		FSMIntermedEntry redshuf_end_rec = null;
312 		FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
313 		FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
314 		
315 		/* Extract field names for checking */
316 		String [] fieldNames = val.getFields();
317 		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
318 		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
319 		
320 		try {
321 		 	redsort_start_rec 	= this_rec.clone();
322 		 	redred_start_rec 		= this_rec.clone();
323 		 	redshuf_end_rec		= this_rec.clone();
324 		 	redsort_end_rec 	= this_rec.clone();
325 		 	redred_end_rec 		= this_rec.clone();
326 		} catch (CloneNotSupportedException e) {
327 			// TODO: Error handling
328 		}
329 
330 		redshuf_end_rec.state_type 			= new StateType(StateType.STATE_END);
331 		redshuf_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
332 
333 		redsort_start_rec.state_type 		= new StateType(StateType.STATE_START);
334 		redsort_end_rec.state_type 			= new StateType(StateType.STATE_END);
335 		redsort_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SORT);
336 		redsort_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SORT);
337 
338 		redred_start_rec.state_type 		= new StateType(StateType.STATE_START);
339 		redred_end_rec.state_type 			= new StateType(StateType.STATE_END);
340 		redred_start_rec.state_mapred		= new MapRedState(MapRedState.REDUCE_REDUCER);
341 		redred_end_rec.state_mapred			= new MapRedState(MapRedState.REDUCE_REDUCER);
342 		
343 		redshuf_end_rec.generateUniqueID();
344 		redsort_start_rec.generateUniqueID();
345 		redsort_end_rec.generateUniqueID();
346 		redred_start_rec.generateUniqueID();
347 		redred_end_rec.generateUniqueID();
348 		
349 		if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
350 		  if (val.getValue("SHUFFLE_FINISHED") == null) return false;
351 		  if (val.getValue("SORT_FINISHED") == null) return false;
352 		} else {
353 		  return false;
354 		}
355 		redshuf_end_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
356 		redshuf_end_rec.time_start = new String("");
357 		redshuf_end_rec.time_end = new String(val.getValue("SHUFFLE_FINISHED"));
358 		redsort_start_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED")); 
359 		redsort_start_rec.time_start = new String(val.getValue("SHUFFLE_FINISHED")); 
360 		redsort_start_rec.time_end = new String("");
361 		
362 		assert(fieldNamesList.contains("SORT_FINISHED"));
363 		redsort_end_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
364 		redsort_end_rec.time_start = new String("");
365 		redsort_end_rec.time_end = new String(val.getValue("SORT_FINISHED"));
366 		redred_start_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
367 		redred_start_rec.time_start = new String(val.getValue("SORT_FINISHED"));
368 		redred_start_rec.time_end = new String("");
369 		
370 		/* redred_end times are exactly the same as the original red_end times */
371 		
372 		log.debug("Collecting record " + redshuf_end_rec + 
373 			"("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
374 		output.collect(
375 			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
376 			redshuf_end_rec
377 		);
378 		
379 		log.debug("Collecting record " + redsort_start_rec + 
380 			"("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
381 		output.collect(
382 			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
383 			redsort_start_rec
384 		);
385 		
386 		log.debug("Collecting record " + redsort_end_rec + 
387 			"("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
388 		output.collect(
389 			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
390 			redsort_end_rec
391 		);
392 		
393 		log.debug("Collecting record " + redred_start_rec + 
394 			"("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
395 		output.collect(
396 			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
397 			redred_start_rec
398 		);
399 		
400 		log.debug("Collecting record " + redred_end_rec + 
401 			"("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");	
402 		output.collect(
403 			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
404 			redred_end_rec
405 		);
406 		
407 		return true;
408 	}
409 
410 } // end of mapper class