This project has retired. For details please refer to its Attic page.
JobTrackerProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Set;
27  import java.util.TimeZone;
28  import java.util.concurrent.ConcurrentHashMap;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34  import org.apache.hadoop.chukwa.util.ExceptionUtil;
35  import org.apache.hadoop.mapred.OutputCollector;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.log4j.Logger;
38  import org.json.simple.JSONObject;
39  import org.json.simple.JSONValue;
40  
41  @Tables(annotations = { @Table(name = "JobTracker", columnFamily = "jt"),
42      @Table(name = "JobTracker", columnFamily = "jvm"),
43      @Table(name = "JobTracker", columnFamily = "rpc") })
44  public class JobTrackerProcessor extends AbstractProcessor {
45    static Map<String, Long> rateMap = new ConcurrentHashMap<String, Long>();
46    static {
47      long zero = 0L;
48      rateMap.put("SentBytes", zero);
49      rateMap.put("ReceivedBytes", zero);
50      rateMap.put("rpcAuthorizationSuccesses", zero);
51      rateMap.put("rpcAuthorizationFailures", zero);
52      rateMap.put("RpcQueueTime_num_ops", zero);
53      rateMap.put("RpcProcessingTime_num_ops", zero);
54      rateMap.put("heartbeats", zero);
55      rateMap.put("jobs_submitted", zero);
56      rateMap.put("jobs_completed", zero);
57      rateMap.put("jobs_failed", zero);
58      rateMap.put("jobs_killed", zero);
59      rateMap.put("maps_launched", zero);
60      rateMap.put("maps_completed", zero);
61      rateMap.put("maps_failed", zero);
62      rateMap.put("maps_killed", zero);
63      rateMap.put("reduces_launched", zero);
64      rateMap.put("reduces_completed", zero);
65      rateMap.put("reduces_failed", zero);
66      rateMap.put("reduces_killed", zero);
67      rateMap.put("gcCount", zero);
68    }
69  
70    @Override
71    protected void parse(String recordEntry,
72        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
73        throws Throwable {
74      Logger log = Logger.getLogger(JobTrackerProcessor.class);
75      long timeStamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
76          .getTimeInMillis();
77  
78      final ChukwaRecord mapred_jt = new ChukwaRecord();
79      final ChukwaRecord jt_jvm = new ChukwaRecord();
80      final ChukwaRecord jt_rpc = new ChukwaRecord();
81  
82      Map<String, ChukwaRecord> metricsMap = new HashMap<String, ChukwaRecord>() {
83        private static final long serialVersionUID = 1L;
84        {
85          put("gcCount", jt_jvm);
86          put("gcTimeMillis", jt_jvm);
87          put("logError", jt_jvm);
88          put("logFatal", jt_jvm);
89          put("logInfo", jt_jvm);
90          put("logWarn", jt_jvm);
91          put("memHeapCommittedM", jt_jvm);
92          put("memHeapUsedM", jt_jvm);
93          put("threadsBlocked", jt_jvm);
94          put("threadsNew", jt_jvm);
95          put("threadsRunnable", jt_jvm);
96          put("threadsTerminated", jt_jvm);
97          put("threadsTimedWaiting", jt_jvm);
98          put("threadsWaiting", jt_jvm);
99  
100         put("ReceivedBytes", jt_rpc);
101         put("RpcProcessingTime_avg_time", jt_rpc);
102         put("RpcProcessingTime_num_ops", jt_rpc);
103         put("RpcQueueTime_avg_time", jt_rpc);
104         put("RpcQueueTime_num_ops", jt_rpc);
105         put("SentBytes", jt_rpc);
106         put("rpcAuthorizationSuccesses", jt_rpc);
107         put("rpcAuthorizationnFailures", jt_rpc);
108       }
109     };
110     try {
111       JSONObject obj = (JSONObject) JSONValue.parse(recordEntry);
112       String ttTag = chunk.getTag("timeStamp");
113       if (ttTag == null) {
114         log.warn("timeStamp tag not set in JMX adaptor for jobtracker");
115       } else {
116         timeStamp = Long.parseLong(ttTag);
117       }
118       for(Entry<String, Object> entry : (Set<Entry<String, Object>>) obj.entrySet()) {
119         String key = entry.getKey();
120         String valueString = entry.getValue().toString();
121 
122         // Calculate rate for some of the metrics
123         if (rateMap.containsKey(key)) {
124           long oldValue = rateMap.get(key);
125           long curValue = Long.parseLong(valueString);
126           rateMap.put(key, curValue);
127           long newValue = curValue - oldValue;
128           if (newValue < 0) {
129             log.warn("JobTrackerProcessor's rateMap might be reset or corrupted for metric "
130                 + key);
131             newValue = 0L;
132           }
133           valueString = Long.toString(newValue);
134         }
135 
136         // These metrics are string types with JSON structure. So we parse them
137         // and get the count
138         if (key.indexOf("Json") >= 0) {
139           // ignore these for now. Parsing of JSON array is throwing class cast
140           // exception.
141         } else if (metricsMap.containsKey(key)) {
142           ChukwaRecord rec = metricsMap.get(key);
143           rec.add(key, valueString);
144         } else {
145           mapred_jt.add(key, valueString);
146         }
147       }
148 
149       buildGenericRecord(mapred_jt, null, timeStamp, "jt");
150       output.collect(key, mapred_jt);
151       buildGenericRecord(jt_jvm, null, timeStamp, "jvm");
152       output.collect(key, jt_jvm);
153       buildGenericRecord(jt_rpc, null, timeStamp, "rpc");
154       output.collect(key, jt_rpc);
155     } catch (Exception e) {
156       log.error(ExceptionUtil.getStackTrace(e));
157     }
158   }
159 }