This project has retired. For details please refer to its Attic page.
Sar xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.text.SimpleDateFormat;
23  import java.util.Date;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  
27  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30  import org.apache.hadoop.mapred.OutputCollector;
31  import org.apache.hadoop.mapred.Reporter;
32  import org.apache.log4j.Logger;
33  
34  @Table(name="SystemMetrics",columnFamily="SystemMetrics")
35  public class Sar extends AbstractProcessor {
36    static Logger log = Logger.getLogger(Sar.class);
37    public static final String reduceType = "SystemMetrics";
38    public final String recordType = this.getClass().getName();
39  
40    private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
41    private static Pattern p = null;
42  
43    private Matcher matcher = null;
44    private SimpleDateFormat sdf = null;
45  
46    public Sar() {
47      // TODO move that to config
48      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
49      p = Pattern.compile(regex);
50    }
51  
52    @Override
53    protected void parse(String recordEntry,
54        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55        throws Throwable {
56  
57      log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType()
58          + "]");
59      int i = 0;
60  
61      // String logLevel = null;
62      // String className = null;
63  
64      matcher = p.matcher(recordEntry);
65      while (matcher.find()) {
66        log.debug("Sar Processor Matches");
67  
68        try {
69          Date d = sdf.parse(matcher.group(1).trim());
70  
71          // logLevel = matcher.group(2);
72          // className = matcher.group(3);
73  
74          // TODO create a more specific key structure
75          // part of ChukwaArchiveKey + record index if needed
76          key.setKey("" + d.getTime());
77  
78          String[] lines = recordEntry.split("\n");
79  
80          String[] headers = null;
81          while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) {
82            // Skip to the average lines
83            log.debug("skip:" + lines[i]);
84            i++;
85          }
86          while (i < lines.length) {
87            ChukwaRecord record = null;
88            if (lines[i].equals("")) {
89              i++;
90              headers = parseHeader(lines[i]);
91              i++;
92            }
93            String data[] = parseData(lines[i]);
94  
95            // FIXME please validate this
96            if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
97              log.debug("Matched Sar-Network");
98  
99              record = new ChukwaRecord();
100             key = new ChukwaRecordKey();
101             this.buildGenericRecord(record, null, d.getTime(), reduceType);
102           } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
103             log.debug("Matched Sar-Network");
104 
105             record = new ChukwaRecord();
106             key = new ChukwaRecordKey();
107             this.buildGenericRecord(record, null, d.getTime(), reduceType);
108           } else if (headers[1].equals("kbmemfree")) {
109             log.debug("Matched Sar-Memory");
110 
111             record = new ChukwaRecord();
112             key = new ChukwaRecordKey();
113             this.buildGenericRecord(record, null, d.getTime(), reduceType);
114           } else if (headers[1].equals("totsck")) {
115             log.debug("Matched Sar-NetworkSockets");
116 
117             record = new ChukwaRecord();
118             key = new ChukwaRecordKey();
119             this.buildGenericRecord(record, null, d.getTime(), reduceType);
120           } else if (headers[1].equals("runq-sz")) {
121             log.debug("Matched Sar-LoadAverage");
122 
123             record = new ChukwaRecord();
124             key = new ChukwaRecordKey();
125             this.buildGenericRecord(record, null, d.getTime(), reduceType);
126           } else {
127             log.debug("No match:" + headers[1] + " " + headers[2]);
128           }
129           if (record != null) {
130             int j = 0;
131 
132             log.debug("Data Length: " + data.length);
133             while (j < data.length) {
134               log.debug("header:" + headers[j] + " data:" + data[j]);
135               
136                 //special case code to work around peculiar versions of Sar
137               if(headers[j].equals("rxkB/s")) {
138                 record.add("rxbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
139               } else if(headers[j].equals("txkB/s")){
140                 record.add("txbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000));
141               } else if (!headers[j].equals("Average:")) { //common case
142                 record.add(headers[j], data[j]);
143               }
144               j++;
145             }
146 
147             output.collect(key, record);
148           }
149           i++;
150         }
151         // End of parsing
152       } catch (Exception e) {
153         e.printStackTrace();
154         throw e;
155       }
156     }
157   }
158 
159   public String[] parseHeader(String header) {
160     String[] headers = header.split("\\s+");
161     return headers;
162   }
163 
164   public String[] parseData(String dataLine) {
165     String[] data = dataLine.split("\\s+");
166     return data;
167   }
168 
169   public String getDataType() {
170     return recordType;
171   }
172 }