This project has retired. For details please refer to its Attic page.
HadoopMetricsProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.io.IOException;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.Date;
27  import java.util.Iterator;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.chukwa.extraction.engine.Record;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  import org.apache.log4j.Logger;
37  import org.json.simple.JSONObject;
38  import org.json.simple.JSONValue;
39  
40  @Tables(annotations={
41  @Table(name="Hadoop",columnFamily="jvm_metrics"),
42  @Table(name="Hadoop",columnFamily="mapred_metrics"),
43  @Table(name="Hadoop",columnFamily="dfs_metrics"),
44  @Table(name="Hadoop",columnFamily="dfs_namenode"),
45  @Table(name="Hadoop",columnFamily="dfs_FSNamesystem"),
46  @Table(name="Hadoop",columnFamily="dfs_datanode"),
47  @Table(name="Hadoop",columnFamily="mapred_jobtracker"),
48  @Table(name="Hadoop",columnFamily="mapred_shuffleInput"),
49  @Table(name="Hadoop",columnFamily="mapred_shuffleOutput"),
50  @Table(name="Hadoop",columnFamily="mapred_tasktracker"),
51  @Table(name="Hadoop",columnFamily="rpc_metrics")
52  })
53  public class HadoopMetricsProcessor extends AbstractProcessor {
54  //  public static final String jvm = "jvm_metrics";
55  //  public static final String mapred = "mapred_metrics";
56  //  public static final String dfs = "dfs_metrics";
57  //  public static final String namenode = "dfs_namenode";
58  //  public static final String fsdir = "dfs_FSDirectory";
59  //  public static final String fsname = "dfs_FSNamesystem";
60  //  public static final String datanode = "dfs_datanode";
61  //  public static final String jobtracker = "mapred_jobtracker";
62  //  public static final String shuffleIn = "mapred_shuffleInput";
63  //  public static final String shuffleOut = "mapred_shuffleOutput";
64  //  public static final String tasktracker = "mapred_tasktracker";
65  //  public static final String mr = "mapred_job";
66    
67    static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
68    static final String chukwaTimestampField = "chukwa_timestamp";
69    static final String contextNameField = "contextName";
70    static final String recordNameField = "recordName";
71  
72    private SimpleDateFormat sdf = null;
73  
74    public HadoopMetricsProcessor() {
75      // TODO move that to config
76      sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
77    }
78  
79    @SuppressWarnings("unchecked")
80    @Override
81    protected void parse(String recordEntry,
82        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
83        throws Throwable {
84      try {
85        // Look for syslog PRI, if PRI is not found, start from offset of 0.
86        int idx = recordEntry.indexOf('>', 0);  
87        String dStr = recordEntry.substring(idx+1, idx+23);
88        int start = idx + 25;
89        idx = recordEntry.indexOf(' ', start);
90        // String level = recordEntry.substring(start, idx);
91        start = idx + 1;
92        idx = recordEntry.indexOf(' ', start);
93        // String className = recordEntry.substring(start, idx-1);
94        String body = recordEntry.substring(idx + 1);
95        body = body.replaceAll("\n", "");
96        // log.info("record [" + recordEntry + "] body [" + body +"]");
97        Date d = sdf.parse(dStr);
98  
99        start = body.indexOf('{');
100       JSONObject json = (JSONObject) JSONValue.parse(body.substring(start));
101 
102       ChukwaRecord record = new ChukwaRecord();
103       StringBuilder datasource = new StringBuilder();
104       String contextName = null;
105       String recordName = null;
106 
107       Iterator<String> ki = json.keySet().iterator();
108       while (ki.hasNext()) {
109         String keyName = ki.next();
110         if (chukwaTimestampField.intern() == keyName.intern()) {
111           d = new Date((Long) json.get(keyName));
112           Calendar cal = Calendar.getInstance();
113           cal.setTimeInMillis(d.getTime());
114           cal.set(Calendar.SECOND, 0);
115           cal.set(Calendar.MILLISECOND, 0);
116           d.setTime(cal.getTimeInMillis());
117         } else if (contextNameField.intern() == keyName.intern()) {
118           contextName = (String) json.get(keyName);
119         } else if (recordNameField.intern() == keyName.intern()) {
120           recordName = (String) json.get(keyName);
121           record.add(keyName, json.get(keyName).toString());
122         } else {
123           if(json.get(keyName)!=null) {
124             record.add(keyName, json.get(keyName).toString());
125           }
126         }
127       }
128       if(contextName!=null) {
129         datasource.append(contextName);
130         datasource.append("_");
131       }
132       datasource.append(recordName);
133       record.add("cluster", chunk.getTag("cluster"));
134       if(contextName!=null && contextName.equals("jvm")) {
135         buildJVMRecord(record, d.getTime(), datasource.toString());        
136       } else {
137         buildGenericRecord(record, null, d.getTime(), datasource.toString());
138       }
139       output.collect(key, record);
140     } catch (ParseException e) {
141       log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
142           e);
143       throw e;
144     } catch (IOException e) {
145       log.warn("Unable to collect output in HadoopMetricsProcessor ["
146           + recordEntry + "]", e);
147       throw e;
148     } catch (Exception e) {
149       log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
150           e);
151       throw e;
152     }
153 
154   }
155 
156   protected void buildJVMRecord(ChukwaRecord record, long timestamp, String dataSource) {
157     calendar.setTimeInMillis(timestamp);
158     calendar.set(Calendar.MINUTE, 0);
159     calendar.set(Calendar.SECOND, 0);
160     calendar.set(Calendar.MILLISECOND, 0);
161 
162     key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + ":" + 
163         record.getValue("processName")+ "/" + timestamp);
164     key.setReduceType(dataSource);
165     record.setTime(timestamp);
166 
167     record.add(Record.tagsField, chunk.getTags());
168     record.add(Record.sourceField, chunk.getSource());
169     record.add(Record.applicationField, chunk.getStreamName());
170   }
171   
172   public String getDataType() {
173     return HadoopMetricsProcessor.class.getName();
174   }
175 
176 }