This project has retired. For details please refer to its
Attic page.
SystemMetrics xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24
25 import java.util.Calendar;
26 import java.util.Iterator;
27 import java.util.TimeZone;
28
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.json.simple.JSONArray;
36 import org.json.simple.JSONObject;
37 import org.json.simple.JSONValue;
38
39 @Tables(annotations={
40 @Table(name="SystemMetrics",columnFamily="cpu"),
41 @Table(name="SystemMetrics",columnFamily="system"),
42 @Table(name="SystemMetrics",columnFamily="memory"),
43 @Table(name="SystemMetrics",columnFamily="network"),
44 @Table(name="SystemMetrics",columnFamily="disk")
45 })
46 public class SystemMetrics extends AbstractProcessor {
47
48 @Override
49 protected void parse(String recordEntry,
50 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51 throws Throwable {
52 JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
53 long timestamp = ((Long)json.get("timestamp")).longValue();
54 ChukwaRecord record = new ChukwaRecord();
55 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
56 cal.setTimeInMillis(timestamp);
57 cal.set(Calendar.SECOND, 0);
58 cal.set(Calendar.MILLISECOND, 0);
59 JSONArray cpuList = (JSONArray) json.get("cpu");
60 double combined = 0.0;
61 double user = 0.0;
62 double sys = 0.0;
63 double idle = 0.0;
64 int actualSize = 0;
65 for(int i = 0; i< cpuList.size(); i++) {
66 JSONObject cpu = (JSONObject) cpuList.get(i);
67
68 if(cpu.get("combined") == null){
69 continue;
70 }
71 actualSize++;
72 Iterator<String> keys = cpu.keySet().iterator();
73 combined = combined + Double.parseDouble(cpu.get("combined").toString());
74 user = user + Double.parseDouble(cpu.get("user").toString());
75 sys = sys + Double.parseDouble(cpu.get("sys").toString());
76 idle = idle + Double.parseDouble(cpu.get("idle").toString());
77 while(keys.hasNext()) {
78 String key = keys.next();
79 record.add(key + "." + i, cpu.get(key).toString());
80 }
81 }
82 combined = combined / actualSize;
83 user = user / actualSize;
84 sys = sys / actualSize;
85 idle = idle / actualSize;
86 record.add("combined", Double.toString(combined));
87 record.add("user", Double.toString(user));
88 record.add("idle", Double.toString(idle));
89 record.add("sys", Double.toString(sys));
90 buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
91 output.collect(key, record);
92
93 record = new ChukwaRecord();
94 record.add("Uptime", json.get("uptime").toString());
95 JSONArray loadavg = (JSONArray) json.get("loadavg");
96 record.add("LoadAverage.1", loadavg.get(0).toString());
97 record.add("LoadAverage.5", loadavg.get(1).toString());
98 record.add("LoadAverage.15", loadavg.get(2).toString());
99 buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
100 output.collect(key, record);
101
102 record = new ChukwaRecord();
103 JSONObject memory = (JSONObject) json.get("memory");
104 Iterator<String> memKeys = memory.keySet().iterator();
105 while(memKeys.hasNext()) {
106 String key = memKeys.next();
107 record.add(key, memory.get(key).toString());
108 }
109 buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
110 output.collect(key, record);
111
112 record = new ChukwaRecord();
113 JSONObject swap = (JSONObject) json.get("swap");
114 Iterator<String> swapKeys = swap.keySet().iterator();
115 while(swapKeys.hasNext()) {
116 String key = swapKeys.next();
117 record.add(key, swap.get(key).toString());
118 }
119 buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
120 output.collect(key, record);
121
122 double rxBytes = 0;
123 double rxDropped = 0;
124 double rxErrors = 0;
125 double rxPackets = 0;
126 double txBytes = 0;
127 double txCollisions = 0;
128 double txErrors = 0;
129 double txPackets = 0;
130 record = new ChukwaRecord();
131 JSONArray netList = (JSONArray) json.get("network");
132 for(int i = 0;i < netList.size(); i++) {
133 JSONObject netIf = (JSONObject) netList.get(i);
134 Iterator<String> keys = netIf.keySet().iterator();
135 while(keys.hasNext()) {
136 String key = keys.next();
137 record.add(key + "." + i, netIf.get(key).toString());
138 if(i!=0) {
139 if(key.equals("RxBytes")) {
140 rxBytes = rxBytes + (Long) netIf.get(key);
141 } else if(key.equals("RxDropped")) {
142 rxDropped = rxDropped + (Long) netIf.get(key);
143 } else if(key.equals("RxErrors")) {
144 rxErrors = rxErrors + (Long) netIf.get(key);
145 } else if(key.equals("RxPackets")) {
146 rxPackets = rxPackets + (Long) netIf.get(key);
147 } else if(key.equals("TxBytes")) {
148 txBytes = txBytes + (Long) netIf.get(key);
149 } else if(key.equals("TxCollisions")) {
150 txCollisions = txCollisions + (Long) netIf.get(key);
151 } else if(key.equals("TxErrors")) {
152 txErrors = txErrors + (Long) netIf.get(key);
153 } else if(key.equals("TxPackets")) {
154 txPackets = txPackets + (Long) netIf.get(key);
155 }
156 }
157 }
158 }
159 buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
160 record.add("RxBytes", Double.toString(rxBytes));
161 record.add("RxDropped", Double.toString(rxDropped));
162 record.add("RxErrors", Double.toString(rxErrors));
163 record.add("RxPackets", Double.toString(rxPackets));
164 record.add("TxBytes", Double.toString(txBytes));
165 record.add("TxCollisions", Double.toString(txCollisions));
166 record.add("TxErrors", Double.toString(txErrors));
167 record.add("TxPackets", Double.toString(txPackets));
168 output.collect(key, record);
169
170 double readBytes = 0;
171 double reads = 0;
172 double writeBytes = 0;
173 double writes = 0;
174 double total = 0;
175 double used = 0;
176 record = new ChukwaRecord();
177 JSONArray diskList = (JSONArray) json.get("disk");
178 for(int i = 0;i < diskList.size(); i++) {
179 JSONObject disk = (JSONObject) diskList.get(i);
180 Iterator<String> keys = disk.keySet().iterator();
181 while(keys.hasNext()) {
182 String key = keys.next();
183 record.add(key + "." + i, disk.get(key).toString());
184 if(key.equals("ReadBytes")) {
185 readBytes = readBytes + (Long) disk.get("ReadBytes");
186 } else if(key.equals("Reads")) {
187 reads = reads + (Long) disk.get("Reads");
188 } else if(key.equals("WriteBytes")) {
189 writeBytes = writeBytes + (Long) disk.get("WriteBytes");
190 } else if(key.equals("Writes")) {
191 writes = writes + (Long) disk.get("Writes");
192 } else if(key.equals("Total")) {
193 total = total + (Long) disk.get("Total");
194 } else if(key.equals("Used")) {
195 used = used + (Long) disk.get("Used");
196 }
197 }
198 }
199 double percentUsed = used/total;
200 record.add("ReadBytes", Double.toString(readBytes));
201 record.add("Reads", Double.toString(reads));
202 record.add("WriteBytes", Double.toString(writeBytes));
203 record.add("Writes", Double.toString(writes));
204 record.add("Total", Double.toString(total));
205 record.add("Used", Double.toString(used));
206 record.add("PercentUsed", Double.toString(percentUsed));
207 buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
208 output.collect(key, record);
209
210 record = new ChukwaRecord();
211 record.add("cluster", chunk.getTag("cluster"));
212 buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
213 output.collect(key, record);
214 }
215
216 }