This project has retired. For details please refer to its
Attic page.
SystemMetrics xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20
21
22 import java.io.IOException;
23 import java.util.Iterator;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26 import org.apache.hadoop.mapred.OutputCollector;
27 import org.apache.hadoop.mapred.Reporter;
28 import org.apache.log4j.Logger;
29
30 public class SystemMetrics implements ReduceProcessor {
31 static Logger log = Logger.getLogger(SystemMetrics.class);
32
33 @Override
34 public String getDataType() {
35 return this.getClass().getName();
36 }
37
38 @Override
39 public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
40 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
41 try {
42
43 ChukwaRecord record = null;
44 ChukwaRecord newRecord = new ChukwaRecord();
45
46 while (values.hasNext()) {
47 record = values.next();
48 newRecord.setTime(record.getTime());
49
50 if (record.containsField("IFACE")) {
51 if (record.containsField("rxpck/s")) {
52 if (record.containsField("rxbyt/s")
53 && record.containsField("txbyt/s")) {
54 double netBusyPcnt = 0, netRxByts = 0, netTxByts = 0, netSpeed = 128000000.00;
55 netRxByts = Double.parseDouble(record.getValue("rxbyt/s"));
56 netTxByts = Double.parseDouble(record.getValue("txbyt/s"));
57 netBusyPcnt = (netRxByts / netSpeed * 100)
58 + (netTxByts / netSpeed * 100);
59 record.add(record.getValue("IFACE") + "_busy_pcnt", ""
60 + netBusyPcnt);
61 record.add("csource", record.getValue("csource"));
62 }
63 record.add(record.getValue("IFACE") + ".rxbyt/s", record
64 .getValue("rxbyt/s"));
65 record.add(record.getValue("IFACE") + ".rxpck/s", record
66 .getValue("rxpck/s"));
67 record.add(record.getValue("IFACE") + ".txbyt/s", record
68 .getValue("txbyt/s"));
69 record.add(record.getValue("IFACE") + ".txpck/s", record
70 .getValue("txpck/s"));
71 record.removeValue("rxbyt/s");
72 record.removeValue("rxpck/s");
73 record.removeValue("txbyt/s");
74 record.removeValue("txpck/s");
75 }
76 if (record.containsField("rxerr/s")) {
77 record.add(record.getValue("IFACE") + ".rxerr/s", record
78 .getValue("rxerr/s"));
79 record.add(record.getValue("IFACE") + ".rxdrop/s", record
80 .getValue("rxdrop/s"));
81 record.add(record.getValue("IFACE") + ".txerr/s", record
82 .getValue("txerr/s"));
83 record.add(record.getValue("IFACE") + ".txdrop/s", record
84 .getValue("txdrop/s"));
85 record.removeValue("rxerr/s");
86 record.removeValue("rxdrop/s");
87 record.removeValue("txerr/s");
88 record.removeValue("txdrop/s");
89 }
90 record.removeValue("IFACE");
91 }
92
93 if (record.containsField("Device:")) {
94 record.add(record.getValue("Device:") + ".r/s", record
95 .getValue("r/s"));
96 record.add(record.getValue("Device:") + ".w/s", record
97 .getValue("w/s"));
98 record.add(record.getValue("Device:") + ".rkB/s", record
99 .getValue("rkB/s"));
100 record.add(record.getValue("Device:") + ".wkB/s", record
101 .getValue("wkB/s"));
102 record.add(record.getValue("Device:") + ".%util", record
103 .getValue("%util"));
104 record.removeValue("r/s");
105 record.removeValue("w/s");
106 record.removeValue("rkB/s");
107 record.removeValue("wkB/s");
108 record.removeValue("%util");
109 record.removeValue("Device:");
110 }
111
112 if (record.containsField("swap_free")) {
113 float swapUsedPcnt = 0, swapUsed = 0, swapTotal = 0;
114 swapUsed = Long.parseLong(record.getValue("swap_used"));
115 swapTotal = Long.parseLong(record.getValue("swap_total"));
116 swapUsedPcnt = swapUsed / swapTotal * 100;
117 record.add("swap_used_pcnt", "" + swapUsedPcnt);
118 record.add("csource", record.getValue("csource"));
119 }
120
121 if (record.containsField("mem_used")) {
122 double memUsedPcnt = 0, memTotal = 0, memUsed = 0;
123 memTotal = Double.parseDouble(record.getValue("mem_total"));
124 memUsed = Double.parseDouble(record.getValue("mem_used"));
125 memUsedPcnt = memUsed / memTotal * 100;
126 record.add("mem_used_pcnt", "" + memUsedPcnt);
127 record.add("csource", record.getValue("csource"));
128 }
129
130 if (record.containsField("mem_buffers")) {
131 double memBuffersPcnt = 0, memTotal = 0, memBuffers = 0;
132 memTotal = Double.parseDouble(record.getValue("mem_total"));
133 memBuffers = Double.parseDouble(record.getValue("mem_buffers"));
134 memBuffersPcnt = memBuffers / memTotal * 100;
135 record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
136 record.add("csource", record.getValue("csource"));
137 }
138
139
140 String[] fields = record.getFields();
141 for (String f : fields) {
142 newRecord.add(f, record.getValue(f));
143 }
144 }
145 record.add("capp", "systemMetrics");
146 output.collect(key, newRecord);
147 } catch (IOException e) {
148 log.warn("Unable to collect output in SystemMetricsReduceProcessor ["
149 + key + "]", e);
150 e.printStackTrace();
151 }
152
153 }
154 }