This project has retired. For details please refer to its
Attic page.
Iostat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.text.SimpleDateFormat;
23 import java.util.Date;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26
27 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30 import org.apache.hadoop.mapred.OutputCollector;
31 import org.apache.hadoop.mapred.Reporter;
32 import org.apache.log4j.Logger;
33
34 @Table(name="SystemMetrics",columnFamily="SystemMetrics")
35 public class Iostat extends AbstractProcessor {
36 static Logger log = Logger.getLogger(Iostat.class);
37 public final String recordType = this.getClass().getName();
38
39 private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
40 private static Pattern p = null;
41
42 private Matcher matcher = null;
43 private SimpleDateFormat sdf = null;
44
45 public Iostat() {
46
47 sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
48 p = Pattern.compile(regex);
49 }
50
51 @Override
52 protected void parse(String recordEntry,
53 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
54 throws Throwable {
55
56 log.debug("Iostat record: [" + recordEntry + "] type["
57 + chunk.getDataType() + "]");
58 int i = 0;
59
60 matcher = p.matcher(recordEntry);
61 while (matcher.find()) {
62 log.debug("Iostat Processor Matches");
63
64 try {
65 Date d = sdf.parse(matcher.group(1).trim());
66
67 String[] lines = recordEntry.split("\n");
68 String[] headers = null;
69 for (int skip = 0; skip < 2; skip++) {
70 i++;
71 while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) {
72
73
74 log.debug("skip line:" + lines[i]);
75 i++;
76 }
77 }
78 while (i < lines.length) {
79 ChukwaRecord record = null;
80
81 if (lines[i].indexOf("avg-cpu") >= 0
82 || lines[i].indexOf("Device") >= 0) {
83 headers = parseHeader(lines[i]);
84 i++;
85 }
86 String data[] = parseData(lines[i]);
87 if (headers[0].equals("avg-cpu:")) {
88 log.debug("Matched CPU-Utilization");
89 record = new ChukwaRecord();
90 key = new ChukwaRecordKey();
91 buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
92 } else if (headers[0].equals("Device:")) {
93 log.debug("Matched Iostat");
94 record = new ChukwaRecord();
95 key = new ChukwaRecordKey();
96 buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
97 } else {
98 log.debug("No match:" + headers[0]);
99 }
100 if (record != null) {
101 int j = 0;
102 log.debug("Data Length: " + data.length);
103 while (j < data.length) {
104 log.debug("header:" + headers[j] + " data:" + data[j]);
105 if (!headers[j].equals("avg-cpu:")) {
106 try {
107
108 long x=Long.parseLong(data[j]);
109 if(x<100000000000L) {
110 record.add(headers[j],data[j]);
111 }
112 } catch(NumberFormatException ex) {
113 record.add(headers[j],data[j]);
114 }
115 }
116 j++;
117 }
118 record.setTime(d.getTime());
119 if (data.length > 3) {
120 output.collect(key, record);
121 }
122 }
123 i++;
124 }
125
126 } catch (Exception e) {
127 e.printStackTrace();
128 throw e;
129 }
130 }
131 }
132
133 public String[] parseHeader(String header) {
134 String[] headers = header.split("\\s+");
135 return headers;
136 }
137
138 public String[] parseData(String dataLine) {
139 String[] data = dataLine.split("\\s+");
140 return data;
141 }
142
143 public String getDataType() {
144 return recordType;
145 }
146 }