This project has retired. For details please refer to its
Attic page.
HadoopMetricsProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.io.IOException;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.Date;
27 import java.util.Iterator;
28
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.chukwa.extraction.engine.Record;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.apache.log4j.Logger;
37 import org.json.simple.JSONObject;
38 import org.json.simple.JSONValue;
39
40 @Tables(annotations={
41 @Table(name="Hadoop",columnFamily="jvm_metrics"),
42 @Table(name="Hadoop",columnFamily="mapred_metrics"),
43 @Table(name="Hadoop",columnFamily="dfs_metrics"),
44 @Table(name="Hadoop",columnFamily="dfs_namenode"),
45 @Table(name="Hadoop",columnFamily="dfs_FSNamesystem"),
46 @Table(name="Hadoop",columnFamily="dfs_datanode"),
47 @Table(name="Hadoop",columnFamily="mapred_jobtracker"),
48 @Table(name="Hadoop",columnFamily="mapred_shuffleInput"),
49 @Table(name="Hadoop",columnFamily="mapred_shuffleOutput"),
50 @Table(name="Hadoop",columnFamily="mapred_tasktracker"),
51 @Table(name="Hadoop",columnFamily="rpc_metrics")
52 })
53 public class HadoopMetricsProcessor extends AbstractProcessor {
54
55
56
57
58
59
60
61
62
63
64
65
66
67 static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
68 static final String chukwaTimestampField = "chukwa_timestamp";
69 static final String contextNameField = "contextName";
70 static final String recordNameField = "recordName";
71
72 private SimpleDateFormat sdf = null;
73
74 public HadoopMetricsProcessor() {
75
76 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
77 }
78
79 @SuppressWarnings("unchecked")
80 @Override
81 protected void parse(String recordEntry,
82 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
83 throws Throwable {
84 try {
85
86 int idx = recordEntry.indexOf('>', 0);
87 String dStr = recordEntry.substring(idx+1, idx+23);
88 int start = idx + 25;
89 idx = recordEntry.indexOf(' ', start);
90
91 start = idx + 1;
92 idx = recordEntry.indexOf(' ', start);
93
94 String body = recordEntry.substring(idx + 1);
95 body = body.replaceAll("\n", "");
96
97 Date d = sdf.parse(dStr);
98
99 start = body.indexOf('{');
100 JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
101
102 ChukwaRecord record = new ChukwaRecord();
103 StringBuilder datasource = new StringBuilder();
104 String contextName = null;
105 String recordName = null;
106
107 Iterator<String> ki = json.keySet().iterator();
108 while (ki.hasNext()) {
109 String keyName = ki.next();
110 if (chukwaTimestampField.intern() == keyName.intern()) {
111 d = new Date((Long) json.get(keyName));
112 Calendar cal = Calendar.getInstance();
113 cal.setTimeInMillis(d.getTime());
114 cal.set(Calendar.SECOND, 0);
115 cal.set(Calendar.MILLISECOND, 0);
116 d.setTime(cal.getTimeInMillis());
117 } else if (contextNameField.intern() == keyName.intern()) {
118 contextName = (String) json.get(keyName);
119 } else if (recordNameField.intern() == keyName.intern()) {
120 recordName = (String) json.get(keyName);
121 record.add(keyName, json.get(keyName).toString());
122 } else {
123 if(json.get(keyName)!=null) {
124 record.add(keyName, json.get(keyName).toString());
125 }
126 }
127 }
128 if(contextName!=null) {
129 datasource.append(contextName);
130 datasource.append("_");
131 }
132 datasource.append(recordName);
133 record.add("cluster", chunk.getTag("cluster"));
134 if(contextName!=null && contextName.equals("jvm")) {
135 buildJVMRecord(record, d.getTime(), datasource.toString());
136 } else {
137 buildGenericRecord(record, null, d.getTime(), datasource.toString());
138 }
139 output.collect(key, record);
140 } catch (ParseException e) {
141 log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
142 e);
143 throw e;
144 } catch (IOException e) {
145 log.warn("Unable to collect output in HadoopMetricsProcessor ["
146 + recordEntry + "]", e);
147 throw e;
148 } catch (Exception e) {
149 log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
150 e);
151 throw e;
152 }
153
154 }
155
156 protected void buildJVMRecord(ChukwaRecord record, long timestamp, String dataSource) {
157 calendar.setTimeInMillis(timestamp);
158 calendar.set(Calendar.MINUTE, 0);
159 calendar.set(Calendar.SECOND, 0);
160 calendar.set(Calendar.MILLISECOND, 0);
161
162 key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + ":" +
163 record.getValue("processName")+ "/" + timestamp);
164 key.setReduceType(dataSource);
165 record.setTime(timestamp);
166
167 record.add(Record.tagsField, chunk.getTags());
168 record.add(Record.sourceField, chunk.getSource());
169 record.add(Record.applicationField, chunk.getStreamName());
170 }
171
172 public String getDataType() {
173 return HadoopMetricsProcessor.class.getName();
174 }
175
176 }