This project has retired. For details please refer to its
Attic page.
JobTrackerProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.TimeZone;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.apache.log4j.Logger;
36 import org.json.simple.JSONObject;
37 import org.json.simple.JSONValue;
38
39
40 @Tables(annotations={
41 @Table(name="JobTracker",columnFamily="jt"),
42 @Table(name="JobTracker",columnFamily="jvm"),
43 @Table(name="JobTracker",columnFamily="rpc")
44 })
45 public class JobTrackerProcessor extends AbstractProcessor{
46 static Map<String, Long> rateMap = new ConcurrentHashMap<String,Long>();
47 static {
48 long zero = 0L;
49 rateMap.put("SentBytes", zero);
50 rateMap.put("ReceivedBytes", zero);
51 rateMap.put("rpcAuthorizationSuccesses", zero);
52 rateMap.put("rpcAuthorizationFailures", zero);
53 rateMap.put("RpcQueueTime_num_ops", zero);
54 rateMap.put("RpcProcessingTime_num_ops", zero);
55 rateMap.put("heartbeats", zero);
56 rateMap.put("jobs_submitted", zero);
57 rateMap.put("jobs_completed", zero);
58 rateMap.put("jobs_failed", zero);
59 rateMap.put("jobs_killed", zero);
60 rateMap.put("maps_launched", zero);
61 rateMap.put("maps_completed", zero);
62 rateMap.put("maps_failed", zero);
63 rateMap.put("maps_killed", zero);
64 rateMap.put("reduces_launched", zero);
65 rateMap.put("reduces_completed", zero);
66 rateMap.put("reduces_failed", zero);
67 rateMap.put("reduces_killed", zero);
68 rateMap.put("gcCount", zero);
69 }
70
71 @Override
72 protected void parse(String recordEntry,
73 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
74 Reporter reporter) throws Throwable {
75 Logger log = Logger.getLogger(JobTrackerProcessor.class);
76 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
77
78 final ChukwaRecord mapred_jt = new ChukwaRecord();
79 final ChukwaRecord jt_jvm = new ChukwaRecord();
80 final ChukwaRecord jt_rpc = new ChukwaRecord();
81
82 Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>(){
83 private static final long serialVersionUID = 1L;
84 {
85 put("gcCount", jt_jvm);
86 put("gcTimeMillis", jt_jvm);
87 put("logError", jt_jvm);
88 put("logFatal", jt_jvm);
89 put("logInfo", jt_jvm);
90 put("logWarn", jt_jvm);
91 put("memHeapCommittedM", jt_jvm);
92 put("memHeapUsedM", jt_jvm);
93 put("threadsBlocked", jt_jvm);
94 put("threadsNew", jt_jvm);
95 put("threadsRunnable", jt_jvm);
96 put("threadsTerminated", jt_jvm);
97 put("threadsTimedWaiting", jt_jvm);
98 put("threadsWaiting", jt_jvm);
99
100 put("ReceivedBytes", jt_rpc);
101 put("RpcProcessingTime_avg_time", jt_rpc);
102 put("RpcProcessingTime_num_ops", jt_rpc);
103 put("RpcQueueTime_avg_time", jt_rpc);
104 put("RpcQueueTime_num_ops", jt_rpc);
105 put("SentBytes", jt_rpc);
106 put("rpcAuthorizationSuccesses", jt_rpc);
107 put("rpcAuthorizationnFailures", jt_rpc);
108 }
109 };
110 try{
111 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
112 String ttTag = chunk.getTag("timeStamp");
113 if(ttTag == null){
114 log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
115 }
116 else{
117 timeStamp = Long.parseLong(ttTag);
118 }
119 Iterator<JSONObject> iter = obj.entrySet().iterator();
120
121 while(iter.hasNext()){
122 Map.Entry entry = (Map.Entry)iter.next();
123 String key = (String) entry.getKey();
124 Object value = entry.getValue();
125 String valueString = value == null?"":value.toString();
126
127
128 if(rateMap.containsKey(key)){
129 long oldValue = rateMap.get(key);
130 long curValue = Long.parseLong(valueString);
131 rateMap.put(key, curValue);
132 long newValue = curValue - oldValue;
133 if(newValue < 0){
134 log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "+key);
135 newValue = 0L;
136 }
137 valueString = Long.toString(newValue);
138 }
139
140
141 if(key.indexOf("Json") >= 0){
142
143 }
144 else if(metricsMap.containsKey(key)){
145 ChukwaRecord rec = metricsMap.get(key);
146 rec.add(key, valueString);
147 }
148 else {
149 mapred_jt.add(key, valueString);
150 }
151 }
152
153 buildGenericRecord(mapred_jt, null, timeStamp, "jt");
154 output.collect(key, mapred_jt);
155 buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
156 output.collect(key, jt_jvm);
157 buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
158 output.collect(key, jt_rpc);
159 }
160 catch(Exception e){
161 log.error(ExceptionUtil.getStackTrace(e));
162 }
163 }
164 }
165