This project has retired. For details please refer to its Attic page.
Top xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.text.SimpleDateFormat;
23  import java.util.Date;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.apache.log4j.Logger;
36  
37  @Tables(annotations={
38  @Table(name="SystemMetrics",columnFamily="SystemMetrics"),
39  @Table(name="SystemMetrics",columnFamily="Top")
40  })
41  public class Top extends AbstractProcessor {
42    static Logger log = Logger.getLogger(Top.class);
43    public final String reduceType = "SystemMetrics";
44    public final String recordType = this.getClass().getName();
45  
46    private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
47    private static Pattern p = null;
48  
49    private Matcher matcher = null;
50    private SimpleDateFormat sdf = null;
51  
52    public Top() {
53      // TODO move that to config
54      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
55      p = Pattern.compile(regex);
56    }
57  
58    @Override
59    protected void parse(String recordEntry,
60        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
61        throws Throwable {
62  
63      log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType()
64          + "]");
65  
66      matcher = p.matcher(recordEntry);
67      while (matcher.find()) {
68        log.debug("Top Processor Matches");
69  
70        try {
71          Date d = sdf.parse(matcher.group(1).trim());
72  
73          ChukwaRecord record = new ChukwaRecord();
74          String[] lines = recordEntry.split("\n");
75          int i = 0;
76          if (lines.length < 2) {
77            return;
78          }
79          String summaryString = "";
80          while (!lines[i].equals("")) {
81            summaryString = summaryString + lines[i] + "\n";
82            i++;
83          }
84          i++;
85          record = new ChukwaRecord();
86          key = new ChukwaRecordKey();
87          parseSummary(record, summaryString);
88          this.buildGenericRecord(record, null, d.getTime(), reduceType);
89          output.collect(key, record);
90  
91          StringBuffer buffer = new StringBuffer();
92          // FIXME please validate this
93          while (i < lines.length) {
94            record = null;
95            buffer.append(lines[i] + "\n");
96            i++;
97  
98          }
99          record = new ChukwaRecord();
100         key = new ChukwaRecordKey();
101         this.buildGenericRecord(record, buffer.toString(), d.getTime(), recordType);
102         // Output Top info to database
103         output.collect(key, record);
104 
105         // End of parsing
106       } catch (Exception e) {
107         e.printStackTrace();
108         throw e;
109       }
110     }
111   }
112 
113   public void parseSummary(ChukwaRecord record, String header) {
114     HashMap<String, Object> keyValues = new HashMap<String, Object>();
115     String[] headers = header.split("\n");
116     Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
117     Matcher matcher = p.matcher(headers[0]);
118     if (matcher.find()) {
119       record.add("uptime", matcher.group(2));
120       record.add("users", matcher.group(3));
121     }
122     p = Pattern
123         .compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
124     matcher = p.matcher(headers[1]);
125     if (matcher.find()) {
126       record.add("tasks_total", matcher.group(1));
127       record.add("tasks_running", matcher.group(2));
128       record.add("tasks_sleeping", matcher.group(3));
129       record.add("tasks_stopped", matcher.group(4));
130       record.add("tasks_zombie", matcher.group(5));
131     }
132     p = Pattern
133         .compile("Cpu\\(s\\):\\s*(.*?)%\\s*us,\\s*(.*?)%\\s*sy,\\s*(.*?)%\\s*ni,\\s*(.*?)%\\s*id,\\s*(.*?)%\\s*wa,\\s*(.*?)%\\s*hi,\\s*(.*?)%\\s*si");
134     matcher = p.matcher(headers[2]);
135     if (matcher.find()) {
136       record.add("cpu_user%", matcher.group(1));
137       record.add("cpu_sys%", matcher.group(2));
138       record.add("cpu_nice%", matcher.group(3));
139       record.add("cpu_wait%", matcher.group(4));
140       record.add("cpu_hi%", matcher.group(5));
141       record.add("cpu_si%", matcher.group(6));
142     }
143     p = Pattern
144         .compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
145     matcher = p.matcher(headers[3]);
146     if (matcher.find()) {
147       record.add("mem_total", matcher.group(1));
148       record.add("mem_used", matcher.group(2));
149       record.add("mem_free", matcher.group(3));
150       record.add("mem_buffers", matcher.group(4));
151     }
152     p = Pattern
153         .compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
154     matcher = p.matcher(headers[4]);
155     if (matcher.find()) {
156       record.add("swap_total", matcher.group(1));
157       record.add("swap_used", matcher.group(2));
158       record.add("swap_free", matcher.group(3));
159       record.add("swap_cached", matcher.group(4));
160     }
161     Iterator<String> ki = keyValues.keySet().iterator();
162     while (ki.hasNext()) {
163       String key = ki.next();
164       log.debug(key + ":" + keyValues.get(key));
165     }
166   }
167 
168   public String getDataType() {
169     return recordType;
170   }
171 }