This project has retired. For details please refer to its
Attic page.
JobTrackerProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Set;
27 import java.util.TimeZone;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34 import org.apache.hadoop.chukwa.util.ExceptionUtil;
35 import org.apache.hadoop.mapred.OutputCollector;
36 import org.apache.hadoop.mapred.Reporter;
37 import org.apache.log4j.Logger;
38 import org.json.simple.JSONObject;
39 import org.json.simple.JSONValue;
40
41 @Tables(annotations = { @Table(name = "JobTracker", columnFamily = "jt"),
42 @Table(name = "JobTracker", columnFamily = "jvm"),
43 @Table(name = "JobTracker", columnFamily = "rpc") })
44 public class JobTrackerProcessor extends AbstractProcessor {
45 static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
46 static {
47 long zero = 0L;
48 rateMap.put("SentBytes", zero);
49 rateMap.put("ReceivedBytes", zero);
50 rateMap.put("rpcAuthorizationSuccesses", zero);
51 rateMap.put("rpcAuthorizationFailures", zero);
52 rateMap.put("RpcQueueTime_num_ops", zero);
53 rateMap.put("RpcProcessingTime_num_ops", zero);
54 rateMap.put("heartbeats", zero);
55 rateMap.put("jobs_submitted", zero);
56 rateMap.put("jobs_completed", zero);
57 rateMap.put("jobs_failed", zero);
58 rateMap.put("jobs_killed", zero);
59 rateMap.put("maps_launched", zero);
60 rateMap.put("maps_completed", zero);
61 rateMap.put("maps_failed", zero);
62 rateMap.put("maps_killed", zero);
63 rateMap.put("reduces_launched", zero);
64 rateMap.put("reduces_completed", zero);
65 rateMap.put("reduces_failed", zero);
66 rateMap.put("reduces_killed", zero);
67 rateMap.put("gcCount", zero);
68 }
69
70 @Override
71 protected void parse(String recordEntry,
72 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
73 throws Throwable {
74 Logger log = Logger.getLogger(JobTrackerProcessor.class);
75 long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
76 .getTimeInMillis();
77
78 final ChukwaRecord mapred_jt = new ChukwaRecord();
79 final ChukwaRecord jt_jvm = new ChukwaRecord();
80 final ChukwaRecord jt_rpc = new ChukwaRecord();
81
82 Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
83 private static final long serialVersionUID = 1L;
84 {
85 put("gcCount", jt_jvm);
86 put("gcTimeMillis", jt_jvm);
87 put("logError", jt_jvm);
88 put("logFatal", jt_jvm);
89 put("logInfo", jt_jvm);
90 put("logWarn", jt_jvm);
91 put("memHeapCommittedM", jt_jvm);
92 put("memHeapUsedM", jt_jvm);
93 put("threadsBlocked", jt_jvm);
94 put("threadsNew", jt_jvm);
95 put("threadsRunnable", jt_jvm);
96 put("threadsTerminated", jt_jvm);
97 put("threadsTimedWaiting", jt_jvm);
98 put("threadsWaiting", jt_jvm);
99
100 put("ReceivedBytes", jt_rpc);
101 put("RpcProcessingTime_avg_time", jt_rpc);
102 put("RpcProcessingTime_num_ops", jt_rpc);
103 put("RpcQueueTime_avg_time", jt_rpc);
104 put("RpcQueueTime_num_ops", jt_rpc);
105 put("SentBytes", jt_rpc);
106 put("rpcAuthorizationSuccesses", jt_rpc);
107 put("rpcAuthorizationnFailures", jt_rpc);
108 }
109 };
110 try {
111 JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
112 String ttTag = chunk.getTag("timeStamp");
113 if (ttTag == null) {
114 log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
115 } else {
116 timeStamp = Long.parseLong(ttTag);
117 }
118 for(Entry<String, Object> entry : (Set<Entry<String, Object>>) obj.entrySet()) {
119 String key = entry.getKey();
120 String valueString = entry.getValue().toString();
121
122
123 if (rateMap.containsKey(key)) {
124 long oldValue = rateMap.get(key);
125 long curValue = Long.parseLong(valueString);
126 rateMap.put(key, curValue);
127 long newValue = curValue - oldValue;
128 if (newValue < 0) {
129 log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "
130 + key);
131 newValue = 0L;
132 }
133 valueString = Long.toString(newValue);
134 }
135
136
137
138 if (key.indexOf("Json") >= 0) {
139
140
141 } else if (metricsMap.containsKey(key)) {
142 ChukwaRecord rec = metricsMap.get(key);
143 rec.add(key, valueString);
144 } else {
145 mapred_jt.add(key, valueString);
146 }
147 }
148
149 buildGenericRecord(mapred_jt, null, timeStamp, "jt");
150 output.collect(key, mapred_jt);
151 buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
152 output.collect(key, jt_jvm);
153 buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
154 output.collect(key, jt_rpc);
155 } catch (Exception e) {
156 log.error(ExceptionUtil.getStackTrace(e));
157 }
158 }
159 }