This project has retired. For details please refer to its Attic page.
SystemMetrics xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20  
21  
22  import java.io.IOException;
23  import java.util.Iterator;
24  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26  import org.apache.hadoop.mapred.OutputCollector;
27  import org.apache.hadoop.mapred.Reporter;
28  import org.apache.log4j.Logger;
29  
30  public class SystemMetrics implements ReduceProcessor {
31    static Logger log = Logger.getLogger(SystemMetrics.class);
32  
33    @Override
34    public String getDataType() {
35      return this.getClass().getName();
36    }
37  
38    @Override
39    public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
40        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
41      try {
42  
43        ChukwaRecord record = null;
44        ChukwaRecord newRecord = new ChukwaRecord();
45  
46        while (values.hasNext()) {
47          record = values.next();
48          newRecord.setTime(record.getTime());
49  
50          if (record.containsField("IFACE")) {
51            if (record.containsField("rxpck/s")) {
52              if (record.containsField("rxbyt/s")
53                  && record.containsField("txbyt/s")) {
54                double netBusyPcnt = 0, netRxByts = 0, netTxByts = 0, netSpeed = 128000000.00;
55                netRxByts = Double.parseDouble(record.getValue("rxbyt/s"));
56                netTxByts = Double.parseDouble(record.getValue("txbyt/s"));
57                netBusyPcnt = (netRxByts / netSpeed * 100)
58                    + (netTxByts / netSpeed * 100);
59                record.add(record.getValue("IFACE") + "_busy_pcnt", ""
60                    + netBusyPcnt);
61                record.add("csource", record.getValue("csource"));
62              }
63              record.add(record.getValue("IFACE") + ".rxbyt/s", record
64                  .getValue("rxbyt/s"));
65              record.add(record.getValue("IFACE") + ".rxpck/s", record
66                  .getValue("rxpck/s"));
67              record.add(record.getValue("IFACE") + ".txbyt/s", record
68                  .getValue("txbyt/s"));
69              record.add(record.getValue("IFACE") + ".txpck/s", record
70                  .getValue("txpck/s"));
71              record.removeValue("rxbyt/s");
72              record.removeValue("rxpck/s");
73              record.removeValue("txbyt/s");
74              record.removeValue("txpck/s");
75            }
76            if (record.containsField("rxerr/s")) {
77              record.add(record.getValue("IFACE") + ".rxerr/s", record
78                  .getValue("rxerr/s"));
79              record.add(record.getValue("IFACE") + ".rxdrop/s", record
80                  .getValue("rxdrop/s"));
81              record.add(record.getValue("IFACE") + ".txerr/s", record
82                  .getValue("txerr/s"));
83              record.add(record.getValue("IFACE") + ".txdrop/s", record
84                  .getValue("txdrop/s"));
85              record.removeValue("rxerr/s");
86              record.removeValue("rxdrop/s");
87              record.removeValue("txerr/s");
88              record.removeValue("txdrop/s");
89            }
90            record.removeValue("IFACE");
91          }
92  
93          if (record.containsField("Device:")) {
94            record.add(record.getValue("Device:") + ".r/s", record
95                .getValue("r/s"));
96            record.add(record.getValue("Device:") + ".w/s", record
97                .getValue("w/s"));
98            record.add(record.getValue("Device:") + ".rkB/s", record
99                .getValue("rkB/s"));
100           record.add(record.getValue("Device:") + ".wkB/s", record
101               .getValue("wkB/s"));
102           record.add(record.getValue("Device:") + ".%util", record
103               .getValue("%util"));
104           record.removeValue("r/s");
105           record.removeValue("w/s");
106           record.removeValue("rkB/s");
107           record.removeValue("wkB/s");
108           record.removeValue("%util");
109           record.removeValue("Device:");
110         }
111 
112         if (record.containsField("swap_free")) {
113           float swapUsedPcnt = 0, swapUsed = 0, swapTotal = 0;
114           swapUsed = Long.parseLong(record.getValue("swap_used"));
115           swapTotal = Long.parseLong(record.getValue("swap_total"));
116           swapUsedPcnt = swapUsed / swapTotal * 100;
117           record.add("swap_used_pcnt", "" + swapUsedPcnt);
118           record.add("csource", record.getValue("csource"));
119         }
120 
121         if (record.containsField("mem_used")) {
122           double memUsedPcnt = 0, memTotal = 0, memUsed = 0;
123           memTotal = Double.parseDouble(record.getValue("mem_total"));
124           memUsed = Double.parseDouble(record.getValue("mem_used"));
125           memUsedPcnt = memUsed / memTotal * 100;
126           record.add("mem_used_pcnt", "" + memUsedPcnt);
127           record.add("csource", record.getValue("csource"));
128         }
129 
130         if (record.containsField("mem_buffers")) {
131           double memBuffersPcnt = 0, memTotal = 0, memBuffers = 0;
132           memTotal = Double.parseDouble(record.getValue("mem_total"));
133           memBuffers = Double.parseDouble(record.getValue("mem_buffers"));
134           memBuffersPcnt = memBuffers / memTotal * 100;
135           record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
136           record.add("csource", record.getValue("csource"));
137         }
138 
139         // Copy over all fields
140         String[] fields = record.getFields();
141         for (String f : fields) {
142           newRecord.add(f, record.getValue(f));
143         }
144       }
145       record.add("capp", "systemMetrics");
146       output.collect(key, newRecord);
147     } catch (IOException e) {
148       log.warn("Unable to collect output in SystemMetricsReduceProcessor ["
149           + key + "]", e);
150       e.printStackTrace();
151     }
152 
153   }
154 }