This project has retired. For details please refer to its
Attic page.
Sar xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.text.SimpleDateFormat;
23 import java.util.Date;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26
27 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30 import org.apache.hadoop.mapred.OutputCollector;
31 import org.apache.hadoop.mapred.Reporter;
32 import org.apache.log4j.Logger;
33
34 @Table(name="SystemMetrics",columnFamily="SystemMetrics")
35 public class Sar extends AbstractProcessor {
36 static Logger log = Logger.getLogger(Sar.class);
37 public static final String reduceType = "SystemMetrics";
38 public final String recordType = this.getClass().getName();
39
40 private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
41 private static Pattern p = null;
42
43 private Matcher matcher = null;
44 private SimpleDateFormat sdf = null;
45
46 public Sar() {
47
48 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
49 p = Pattern.compile(regex);
50 }
51
52 @Override
53 protected void parse(String recordEntry,
54 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55 throws Throwable {
56
57 log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
58 + "]");
59 int i = 0;
60
61
62
63
64 matcher = p.matcher(recordEntry);
65 while (matcher.find()) {
66 log.debug("Sar Processor Matches");
67
68 try {
69 Date d = sdf.parse(matcher.group(1).trim());
70
71
72
73
74
75
76 key.setKey("" + d.getTime());
77
78 String[] lines = recordEntry.split("\n");
79
80 String[] headers = null;
81 while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
82
83 log.debug("skip:" + lines[i]);
84 i++;
85 }
86 while (i < lines.length) {
87 ChukwaRecord record = null;
88 if (lines[i].equals("")) {
89 i++;
90 headers = parseHeader(lines[i]);
91 i++;
92 }
93 String data[] = parseData(lines[i]);
94
95
96 if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
97 log.debug("Matched Sar-Network");
98
99 record = new ChukwaRecord();
100 key = new ChukwaRecordKey();
101 this.buildGenericRecord(record, null, d.getTime(), reduceType);
102 } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
103 log.debug("Matched Sar-Network");
104
105 record = new ChukwaRecord();
106 key = new ChukwaRecordKey();
107 this.buildGenericRecord(record, null, d.getTime(), reduceType);
108 } else if (headers[1].equals("kbmemfree")) {
109 log.debug("Matched Sar-Memory");
110
111 record = new ChukwaRecord();
112 key = new ChukwaRecordKey();
113 this.buildGenericRecord(record, null, d.getTime(), reduceType);
114 } else if (headers[1].equals("totsck")) {
115 log.debug("Matched Sar-NetworkSockets");
116
117 record = new ChukwaRecord();
118 key = new ChukwaRecordKey();
119 this.buildGenericRecord(record, null, d.getTime(), reduceType);
120 } else if (headers[1].equals("runq-sz")) {
121 log.debug("Matched Sar-LoadAverage");
122
123 record = new ChukwaRecord();
124 key = new ChukwaRecordKey();
125 this.buildGenericRecord(record, null, d.getTime(), reduceType);
126 } else {
127 log.debug("No match:" + headers[1] + " " + headers[2]);
128 }
129 if (record != null) {
130 int j = 0;
131
132 log.debug("Data Length: " + data.length);
133 while (j < data.length) {
134 log.debug("header:" + headers[j] + " data:" + data[j]);
135
136
137 if(headers[j].equals("rxkB/s")) {
138 record.add("rxbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
139 } else if(headers[j].equals("txkB/s")){
140 record.add("txbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
141 } else if (!headers[j].equals("Average:")) {
142 record.add(headers[j], data[j]);
143 }
144 j++;
145 }
146
147 output.collect(key, record);
148 }
149 i++;
150 }
151
152 } catch (Exception e) {
153 e.printStackTrace();
154 throw e;
155 }
156 }
157 }
158
159 public String[] parseHeader(String header) {
160 String[] headers = header.split("\\s+");
161 return headers;
162 }
163
164 public String[] parseData(String dataLine) {
165 String[] data = dataLine.split("\\s+");
166 return data;
167 }
168
169 public String getDataType() {
170 return recordType;
171 }
172 }