This project has retired. For details please refer to its Attic page.
SystemMetrics xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  /**
20   * Demux parser for system metrics data collected through
21   * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
22   */
23  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24  
25  import java.util.Calendar;
26  import java.util.Iterator;
27  import java.util.TimeZone;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.json.simple.JSONArray;
36  import org.json.simple.JSONObject;
37  import org.json.simple.JSONValue;
38  
39  @Tables(annotations={
40      @Table(name="SystemMetrics",columnFamily="cpu"),
41      @Table(name="SystemMetrics",columnFamily="system"),
42      @Table(name="SystemMetrics",columnFamily="memory"),
43      @Table(name="SystemMetrics",columnFamily="network"),
44      @Table(name="SystemMetrics",columnFamily="disk")
45      })
46  public class SystemMetrics extends AbstractProcessor {
47  
48    @Override
49    protected void parse(String recordEntry,
50        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51        throws Throwable {
52      JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
53      long timestamp = ((Long)json.get("timestamp")).longValue();
54      ChukwaRecord record = new ChukwaRecord();
55      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
56      cal.setTimeInMillis(timestamp);
57      cal.set(Calendar.SECOND, 0);
58      cal.set(Calendar.MILLISECOND, 0);
59      JSONArray cpuList = (JSONArray) json.get("cpu");
60      double combined = 0.0;
61      double user = 0.0;
62      double sys = 0.0;
63      double idle = 0.0;
64      int actualSize = 0;
65      for(int i = 0; i< cpuList.size(); i++) {
66        JSONObject cpu = (JSONObject) cpuList.get(i);
67        //Work around for sigar returning null sometimes for cpu metrics on pLinux
68        if(cpu.get("combined") == null){
69      	  continue;
70        }
71        actualSize++;
72        Iterator<String> keys = cpu.keySet().iterator();
73        combined = combined + Double.parseDouble(cpu.get("combined").toString());
74        user = user + Double.parseDouble(cpu.get("user").toString());
75        sys = sys + Double.parseDouble(cpu.get("sys").toString());
76        idle = idle + Double.parseDouble(cpu.get("idle").toString());
77        while(keys.hasNext()) {
78          String key = keys.next();
79          record.add(key + "." + i, cpu.get(key).toString());
80        }
81      }
82      combined = combined / actualSize;
83      user = user / actualSize;
84      sys = sys / actualSize;
85      idle = idle / actualSize;
86      record.add("combined", Double.toString(combined));
87      record.add("user", Double.toString(user));
88      record.add("idle", Double.toString(idle));    
89      record.add("sys", Double.toString(sys));
90      buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
91      output.collect(key, record);    
92  
93      record = new ChukwaRecord();
94      record.add("Uptime", json.get("uptime").toString());
95      JSONArray loadavg = (JSONArray) json.get("loadavg");
96      record.add("LoadAverage.1", loadavg.get(0).toString());
97      record.add("LoadAverage.5", loadavg.get(1).toString());
98      record.add("LoadAverage.15", loadavg.get(2).toString());
99      buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
100     output.collect(key, record);    
101 
102     record = new ChukwaRecord();
103     JSONObject memory = (JSONObject) json.get("memory");
104     Iterator<String> memKeys = memory.keySet().iterator();
105     while(memKeys.hasNext()) {
106       String key = memKeys.next();
107       record.add(key, memory.get(key).toString());
108     }
109     buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
110     output.collect(key, record);    
111 
112     record = new ChukwaRecord();
113     JSONObject swap = (JSONObject) json.get("swap");
114     Iterator<String> swapKeys = swap.keySet().iterator();
115     while(swapKeys.hasNext()) {
116       String key = swapKeys.next();
117       record.add(key, swap.get(key).toString());
118     }
119     buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
120     output.collect(key, record);
121     
122     double rxBytes = 0;
123     double rxDropped = 0;
124     double rxErrors = 0;
125     double rxPackets = 0;
126     double txBytes = 0;
127     double txCollisions = 0;
128     double txErrors = 0;
129     double txPackets = 0;
130     record = new ChukwaRecord();
131     JSONArray netList = (JSONArray) json.get("network");
132     for(int i = 0;i < netList.size(); i++) {
133       JSONObject netIf = (JSONObject) netList.get(i);
134       Iterator<String> keys = netIf.keySet().iterator();
135       while(keys.hasNext()) {
136         String key = keys.next();
137         record.add(key + "." + i, netIf.get(key).toString());
138         if(i!=0) {
139           if(key.equals("RxBytes")) {
140             rxBytes = rxBytes + (Long) netIf.get(key);
141           } else if(key.equals("RxDropped")) {
142             rxDropped = rxDropped + (Long) netIf.get(key);
143           } else if(key.equals("RxErrors")) {          
144             rxErrors = rxErrors + (Long) netIf.get(key);
145           } else if(key.equals("RxPackets")) {
146             rxPackets = rxPackets + (Long) netIf.get(key);
147           } else if(key.equals("TxBytes")) {
148             txBytes = txBytes + (Long) netIf.get(key);
149           } else if(key.equals("TxCollisions")) {
150             txCollisions = txCollisions + (Long) netIf.get(key);
151           } else if(key.equals("TxErrors")) {
152             txErrors = txErrors + (Long) netIf.get(key);
153           } else if(key.equals("TxPackets")) {
154             txPackets = txPackets + (Long) netIf.get(key);
155           }
156         }
157       }
158     }
159     buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
160     record.add("RxBytes", Double.toString(rxBytes));
161     record.add("RxDropped", Double.toString(rxDropped));
162     record.add("RxErrors", Double.toString(rxErrors));
163     record.add("RxPackets", Double.toString(rxPackets));
164     record.add("TxBytes", Double.toString(txBytes));
165     record.add("TxCollisions", Double.toString(txCollisions));
166     record.add("TxErrors", Double.toString(txErrors));
167     record.add("TxPackets", Double.toString(txPackets));
168     output.collect(key, record);    
169     
170     double readBytes = 0;
171     double reads = 0;
172     double writeBytes = 0;
173     double writes = 0;
174     double total = 0;
175     double used = 0;
176     record = new ChukwaRecord();
177     JSONArray diskList = (JSONArray) json.get("disk");
178     for(int i = 0;i < diskList.size(); i++) {
179       JSONObject disk = (JSONObject) diskList.get(i);
180       Iterator<String> keys = disk.keySet().iterator();
181       while(keys.hasNext()) {
182         String key = keys.next();
183         record.add(key + "." + i, disk.get(key).toString());
184         if(key.equals("ReadBytes")) {
185           readBytes = readBytes + (Long) disk.get("ReadBytes");
186         } else if(key.equals("Reads")) {
187           reads = reads + (Long) disk.get("Reads");
188         } else if(key.equals("WriteBytes")) {
189           writeBytes = writeBytes + (Long) disk.get("WriteBytes");
190         } else if(key.equals("Writes")) {
191           writes = writes + (Long) disk.get("Writes");
192         }  else if(key.equals("Total")) {
193           total = total + (Long) disk.get("Total");
194         } else if(key.equals("Used")) {
195           used = used + (Long) disk.get("Used");
196         }
197       }
198     }
199     double percentUsed = used/total; 
200     record.add("ReadBytes", Double.toString(readBytes));
201     record.add("Reads", Double.toString(reads));
202     record.add("WriteBytes", Double.toString(writeBytes));
203     record.add("Writes", Double.toString(writes));
204     record.add("Total", Double.toString(total));
205     record.add("Used", Double.toString(used));
206     record.add("PercentUsed", Double.toString(percentUsed));
207     buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
208     output.collect(key, record);
209     
210     record = new ChukwaRecord();
211     record.add("cluster", chunk.getTag("cluster"));
212     buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
213     output.collect(key, record);
214   }
215 
216 }