This project has retired. For details please refer to its Attic page.
JobTrackerProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.TimeZone;
26  import java.util.concurrent.ConcurrentHashMap;
27  
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  import org.json.simple.JSONObject;
37  import org.json.simple.JSONValue;
38  
39  
40  @Tables(annotations={
41  @Table(name="JobTracker",columnFamily="jt"),
42  @Table(name="JobTracker",columnFamily="jvm"),
43  @Table(name="JobTracker",columnFamily="rpc")
44  })
45  public class JobTrackerProcessor extends AbstractProcessor{
46  	static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
47  	static {
48  		long zero = 0L;	
49  		rateMap.put("SentBytes", zero);
50  		rateMap.put("ReceivedBytes", zero);
51  		rateMap.put("rpcAuthorizationSuccesses", zero);
52  		rateMap.put("rpcAuthorizationFailures", zero);
53  		rateMap.put("RpcQueueTime_num_ops", zero);
54  		rateMap.put("RpcProcessingTime_num_ops", zero);
55  		rateMap.put("heartbeats", zero);
56  		rateMap.put("jobs_submitted", zero);
57  		rateMap.put("jobs_completed", zero);
58  		rateMap.put("jobs_failed", zero);
59  		rateMap.put("jobs_killed", zero);
60  		rateMap.put("maps_launched", zero);
61  		rateMap.put("maps_completed", zero);
62  		rateMap.put("maps_failed", zero);
63  		rateMap.put("maps_killed", zero);
64  		rateMap.put("reduces_launched", zero);
65  		rateMap.put("reduces_completed", zero);
66  		rateMap.put("reduces_failed", zero);
67  		rateMap.put("reduces_killed", zero);
68  		rateMap.put("gcCount", zero);
69  	}
70  
71  	@Override
72  	protected void parse(String recordEntry,
73  			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
74  			Reporter reporter) throws Throwable {
75  		Logger log = Logger.getLogger(JobTrackerProcessor.class); 
76  		long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
77  		
78  		final ChukwaRecord mapred_jt = new ChukwaRecord();
79  		final ChukwaRecord jt_jvm = new ChukwaRecord();
80  		final ChukwaRecord jt_rpc = new ChukwaRecord();
81  		
82  		Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
83  			private static final long serialVersionUID = 1L;
84  			{
85  				put("gcCount", jt_jvm);
86  				put("gcTimeMillis", jt_jvm);
87  				put("logError", jt_jvm);
88  				put("logFatal", jt_jvm);
89  				put("logInfo", jt_jvm);
90  				put("logWarn", jt_jvm);
91  				put("memHeapCommittedM", jt_jvm);
92  				put("memHeapUsedM", jt_jvm);
93  				put("threadsBlocked", jt_jvm);
94  				put("threadsNew", jt_jvm);
95  				put("threadsRunnable", jt_jvm);
96  				put("threadsTerminated", jt_jvm);
97  				put("threadsTimedWaiting", jt_jvm);
98  				put("threadsWaiting", jt_jvm);
99  
100 				put("ReceivedBytes", jt_rpc);				
101 				put("RpcProcessingTime_avg_time", jt_rpc);	
102 				put("RpcProcessingTime_num_ops", jt_rpc);	
103 				put("RpcQueueTime_avg_time", jt_rpc);	
104 				put("RpcQueueTime_num_ops", jt_rpc);	
105 				put("SentBytes", jt_rpc);	
106 				put("rpcAuthorizationSuccesses", jt_rpc);	
107 				put("rpcAuthorizationnFailures", jt_rpc);	
108 			}
109 		};		
110 		try{
111 			JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);	
112 			String ttTag = chunk.getTag("timeStamp");
113 			if(ttTag == null){
114 				log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
115 			}
116 			else{
117 				timeStamp = Long.parseLong(ttTag);
118 			}
119 			Iterator<JSONObject> iter = obj.entrySet().iterator();
120 			
121 			while(iter.hasNext()){
122 				Map.Entry entry = (Map.Entry)iter.next();
123 				String key = (String) entry.getKey();
124 				Object value = entry.getValue();
125 				String valueString = value == null?"":value.toString();	
126 				
127 				//Calculate rate for some of the metrics
128 				if(rateMap.containsKey(key)){
129 					long oldValue = rateMap.get(key);
130 					long curValue = Long.parseLong(valueString);
131 					rateMap.put(key, curValue);
132 					long newValue = curValue - oldValue;
133 					if(newValue < 0){
134 						log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "+key);						
135 						newValue = 0L;
136 					}					
137 					valueString = Long.toString(newValue);
138 				}
139 				
140 				//These metrics are string types with JSON structure. So we parse them and get the count
141 				if(key.indexOf("Json") >= 0){	
142 					//ignore these for now. Parsing of JSON array is throwing class cast exception. 
143 				}	
144 				else if(metricsMap.containsKey(key)){
145 					ChukwaRecord rec = metricsMap.get(key);
146 					rec.add(key, valueString);
147 				}
148 				else {
149 					mapred_jt.add(key, valueString);
150 				}
151 			}			
152 			
153 			buildGenericRecord(mapred_jt, null, timeStamp, "jt");
154 			output.collect(key, mapred_jt);
155 			buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
156 			output.collect(key, jt_jvm);
157 			buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
158 			output.collect(key, jt_rpc);
159 		}
160 		catch(Exception e){
161 			log.error(ExceptionUtil.getStackTrace(e));
162 		}
163 	}
164 }
165