This project has retired. For details please refer to its
Attic page.
YWatch xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.regex.Matcher;
24 import java.util.regex.Pattern;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27 import org.apache.hadoop.mapred.OutputCollector;
28 import org.apache.hadoop.mapred.Reporter;
29 import org.apache.log4j.Logger;
30 import org.json.JSONException;
31 import org.json.JSONObject;
32
33 public class YWatch extends AbstractProcessor {
34 static Logger log = Logger.getLogger(YWatch.class);
35
36 private static final String ywatchType = "YWatch";
37
38 private static String regex = null;
39
40 private static Pattern p = null;
41
42 private Matcher matcher = null;
43
44 public YWatch() {
45
46 regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
47 p = Pattern.compile(regex);
48 matcher = p.matcher("-");
49 }
50
51 @SuppressWarnings("unchecked")
52 @Override
53 protected void parse(String recordEntry,
54 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55 throws Throwable {
56 if (log.isDebugEnabled()) {
57 log.debug("YWatchProcessor record: [" + recordEntry + "] type["
58 + chunk.getDataType() + "]");
59 }
60
61 matcher.reset(recordEntry);
62 if (matcher.matches()) {
63 log.info("YWatchProcessor Matches");
64
65 try {
66 String body = matcher.group(4);
67
68 try {
69 JSONObject json = new JSONObject(body);
70
71 String poller = json.getString("poller");
72 String host = json.getString("host");
73 String metricName = json.getString("metricName");
74
75
76 JSONObject jsonData = json.getJSONObject("data")
77 .getJSONObject("data");
78
79 String jsonTs = null;
80 long ts = Long.parseLong(jsonTs);
81
82 String jsonValue = null;
83 Iterator<String> it = jsonData.keys();
84
85 ChukwaRecord record = null;
86
87 while (it.hasNext()) {
88 jsonTs = it.next();
89 jsonValue = jsonData.getString(jsonTs);
90
91 record = new ChukwaRecord();
92 key = new ChukwaRecordKey();
93 this.buildGenericRecord(record, null, ts, "Ywatch");
94 record.add("poller", poller);
95 record.add("host", host);
96 record.add("metricName", metricName);
97 record.add("value", jsonValue);
98 output.collect(key, record);
99 log.info("YWatchProcessor output 1 metric");
100 }
101
102 } catch (IOException e) {
103 log.warn("Unable to collect output in YWatchProcessor ["
104 + recordEntry + "]", e);
105 e.printStackTrace();
106 } catch (JSONException e) {
107 e.printStackTrace();
108 log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
109 }
110
111 } catch (Exception e) {
112 e.printStackTrace();
113 throw e;
114 }
115 }
116 }
117
118 public String getDataType() {
119 return YWatch.ywatchType;
120 }
121 }