This project has retired. For details please refer to its Attic page.
ClientTraceProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.text.ParseException;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.extraction.engine.Record;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  
36  @Table(name="Hadoop",columnFamily="ClientTrace")
37  public class ClientTraceProcessor extends AbstractProcessor {
38    private static final String recordType = "ClientTrace";
39    private final SimpleDateFormat sdf =
40      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
41    private final Matcher kvMatcher;
42    private final Matcher idMatcher;
43    private final Matcher ipMatcher;
44    // extract date, source
45    private final Pattern idPattern =
46      Pattern.compile("^(.{23}).*clienttrace.*");
47    // extract "key: value" pairs
48    private final Pattern kvPattern =
49      Pattern.compile("\\s+(\\w+):\\s+([^,]+)");
50    private final Pattern ipPattern =
51      Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
52  
53    public ClientTraceProcessor() {
54      super();
55      kvMatcher = kvPattern.matcher("");
56      idMatcher = idPattern.matcher("");
57      ipMatcher = ipPattern.matcher("");
58    }
59  
60    public enum Locality {
61      LOCAL("local"), INTRA("intra_rack"), INTER("inter_rack");
62      String lbl;
63      Locality(String lbl) {
64        this.lbl = lbl;
65      }
66      public String getLabel() {
67        return lbl;
68      }
69    };
70  
71    protected Locality getLocality(String src, String dst) throws Exception {
72      if (null == src || null == dst) {
73        throw new IOException("Missing src/dst");
74      }
75      ipMatcher.reset(src);
76      if (!ipMatcher.find()) {
77        throw new IOException("Could not find src");
78      }
79      byte[] srcIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
80      ipMatcher.reset(dst);
81      if (!ipMatcher.find()) {
82        throw new IOException("Could not find dst");
83      }
84      byte[] dstIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
85      for (int i = 0; i < 4; ++i) {
86        if (srcIP[i] != dstIP[i]) {
87          return (3 == i && (srcIP[i] & 0xC0) == (dstIP[i] & 0xC0))
88            ? Locality.INTRA
89            : Locality.INTER;
90        }
91      }
92      return Locality.LOCAL;
93    }
94  
95    @Override
96    public void parse(String recordEntry,
97        OutputCollector<ChukwaRecordKey,ChukwaRecord> output, Reporter reporter)
98        throws Throwable {
99      try {
100       idMatcher.reset(recordEntry);
101       long ms;
102       long ms_fullresolution;
103       if (idMatcher.find()) {
104         ms = sdf.parse(idMatcher.group(1)).getTime();
105         ms_fullresolution = ms;
106       } else {
107         throw new IOException("Could not find date/source");
108       }
109       kvMatcher.reset(recordEntry);
110       if (!kvMatcher.find()) {
111         throw new IOException("Failed to find record");
112       }
113       ChukwaRecord rec = new ChukwaRecord();
114       do {
115         rec.add(kvMatcher.group(1), kvMatcher.group(2));
116       } while (kvMatcher.find());
117       Locality loc = getLocality(rec.getValue("src"), rec.getValue("dest"));
118       rec.add("locality", loc.getLabel());
119 
120       calendar.setTimeInMillis(ms);
121       calendar.set(Calendar.SECOND, 0);
122       calendar.set(Calendar.MILLISECOND, 0);
123       ms = calendar.getTimeInMillis();
124       calendar.set(Calendar.MINUTE, 0);
125       key.setKey(calendar.getTimeInMillis() + "/" + loc.getLabel() + "/" +
126                  rec.getValue("op").toLowerCase() + "/" + ms);
127       key.setReduceType("ClientTrace");
128       rec.setTime(ms);
129 
130       rec.add(Record.tagsField, chunk.getTags());
131       rec.add(Record.sourceField, chunk.getSource());
132       rec.add(Record.applicationField, chunk.getStreamName());
133       rec.add("actual_time",Long.toString(ms_fullresolution));
134       output.collect(key, rec);
135 
136     } catch (ParseException e) {
137       log.warn("Unable to parse the date in DefaultProcessor ["
138           + recordEntry + "]", e);
139       e.printStackTrace();
140       throw e;
141     } catch (IOException e) {
142       log.warn("Unable to collect output in DefaultProcessor ["
143           + recordEntry + "]", e);
144       e.printStackTrace();
145       throw e;
146     }
147   }
148 
149   public String getDataType() {
150     return recordType;
151   }
152 }