This project has retired. For details please refer to its
Attic page.
ClientTraceProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.text.ParseException;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
28
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30 import org.apache.hadoop.chukwa.extraction.engine.Record;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35
36 @Table(name="Hadoop",columnFamily="ClientTrace")
37 public class ClientTraceProcessor extends AbstractProcessor {
38 private static final String recordType = "ClientTrace";
39 private final SimpleDateFormat sdf =
40 new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
41 private final Matcher kvMatcher;
42 private final Matcher idMatcher;
43 private final Matcher ipMatcher;
44
45 private final Pattern idPattern =
46 Pattern.compile("^(.{23}).*clienttrace.*");
47
48 private final Pattern kvPattern =
49 Pattern.compile("\\s+(\\w+):\\s+([^,]+)");
50 private final Pattern ipPattern =
51 Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
52
53 public ClientTraceProcessor() {
54 super();
55 kvMatcher = kvPattern.matcher("");
56 idMatcher = idPattern.matcher("");
57 ipMatcher = ipPattern.matcher("");
58 }
59
60 public enum Locality {
61 LOCAL("local"), INTRA("intra_rack"), INTER("inter_rack");
62 String lbl;
63 Locality(String lbl) {
64 this.lbl = lbl;
65 }
66 public String getLabel() {
67 return lbl;
68 }
69 };
70
71 protected Locality getLocality(String src, String dst) throws Exception {
72 if (null == src || null == dst) {
73 throw new IOException("Missing src/dst");
74 }
75 ipMatcher.reset(src);
76 if (!ipMatcher.find()) {
77 throw new IOException("Could not find src");
78 }
79 byte[] srcIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
80 ipMatcher.reset(dst);
81 if (!ipMatcher.find()) {
82 throw new IOException("Could not find dst");
83 }
84 byte[] dstIP = InetAddress.getByName(ipMatcher.group(0)).getAddress();
85 for (int i = 0; i < 4; ++i) {
86 if (srcIP[i] != dstIP[i]) {
87 return (3 == i && (srcIP[i] & 0xC0) == (dstIP[i] & 0xC0))
88 ? Locality.INTRA
89 : Locality.INTER;
90 }
91 }
92 return Locality.LOCAL;
93 }
94
95 @Override
96 public void parse(String recordEntry,
97 OutputCollector<ChukwaRecordKey,ChukwaRecord> output, Reporter reporter)
98 throws Throwable {
99 try {
100 idMatcher.reset(recordEntry);
101 long ms;
102 long ms_fullresolution;
103 if (idMatcher.find()) {
104 ms = sdf.parse(idMatcher.group(1)).getTime();
105 ms_fullresolution = ms;
106 } else {
107 throw new IOException("Could not find date/source");
108 }
109 kvMatcher.reset(recordEntry);
110 if (!kvMatcher.find()) {
111 throw new IOException("Failed to find record");
112 }
113 ChukwaRecord rec = new ChukwaRecord();
114 do {
115 rec.add(kvMatcher.group(1), kvMatcher.group(2));
116 } while (kvMatcher.find());
117 Locality loc = getLocality(rec.getValue("src"), rec.getValue("dest"));
118 rec.add("locality", loc.getLabel());
119
120 calendar.setTimeInMillis(ms);
121 calendar.set(Calendar.SECOND, 0);
122 calendar.set(Calendar.MILLISECOND, 0);
123 ms = calendar.getTimeInMillis();
124 calendar.set(Calendar.MINUTE, 0);
125 key.setKey(calendar.getTimeInMillis() + "/" + loc.getLabel() + "/" +
126 rec.getValue("op").toLowerCase() + "/" + ms);
127 key.setReduceType("ClientTrace");
128 rec.setTime(ms);
129
130 rec.add(Record.tagsField, chunk.getTags());
131 rec.add(Record.sourceField, chunk.getSource());
132 rec.add(Record.applicationField, chunk.getStreamName());
133 rec.add("actual_time",Long.toString(ms_fullresolution));
134 output.collect(key, rec);
135
136 } catch (ParseException e) {
137 log.warn("Unable to parse the date in DefaultProcessor ["
138 + recordEntry + "]", e);
139 e.printStackTrace();
140 throw e;
141 } catch (IOException e) {
142 log.warn("Unable to collect output in DefaultProcessor ["
143 + recordEntry + "]", e);
144 e.printStackTrace();
145 throw e;
146 }
147 }
148
149 public String getDataType() {
150 return recordType;
151 }
152 }