This project has retired. For details please refer to its
Attic page.
DataNodeClientTraceMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.regex.*;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.apache.hadoop.chukwa.extraction.demux.*;
29 import org.apache.hadoop.chukwa.extraction.engine.*;
30 import org.apache.hadoop.conf.*;
31 import org.apache.hadoop.mapred.*;
32 import org.apache.hadoop.util.*;
33
34
35
36
37
38
39
40
41
42 public class DataNodeClientTraceMapper
43 extends MapReduceBase
44 implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
45 {
46 private static Log log = LogFactory.getLog(FSMBuilder.class);
47 protected static final String SEP = "/";
48 protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
49 private final Pattern ipPattern =
50 Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z0-9\\-_:\\/].*");
51
52 public void map
53 (ChukwaRecordKey key, ChukwaRecord val,
54 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
55 Reporter reporter)
56 throws IOException
57 {
58
59
60 String [] fieldNames = val.getFields();
61 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
62 for (int i = 0; i < fieldNames.length; i++) {
63 fieldNamesList.add(fieldNames[i]);
64 }
65
66
67
68
69 if (key.getReduceType().equals("ClientTraceDetailed")) {
70 assert(fieldNamesList.contains("op"));
71 if (val.getValue("op").startsWith("HDFS")) {
72 parseClientTraceDetailed(key, val, output, reporter, fieldNamesList);
73 }
74 }
75
76
77 }
78
79 protected final int DEFAULT_READ_DURATION_MS = 10;
80
81
82
83 protected void parseClientTraceDetailed
84 (ChukwaRecordKey key, ChukwaRecord val,
85 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
86 Reporter reporter, ArrayList<String> fieldNamesList)
87 throws IOException
88 {
89 FSMIntermedEntry start_rec, end_rec;
90 String current_op = null, src_add = null, dest_add = null;
91 String datanodeserver_add = null, blkid = null, cli_id = null;
92
93
94 start_rec = new FSMIntermedEntry();
95 end_rec = new FSMIntermedEntry();
96 start_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
97 start_rec.state_type = new StateType(StateType.STATE_START);
98 end_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
99 end_rec.state_type = new StateType(StateType.STATE_END);
100
101
102 Matcher src_regex = ipPattern.matcher(val.getValue("src"));
103 if (src_regex.matches()) {
104 src_add = src_regex.group(1);
105 } else {
106 log.warn("Failed to match src IP:"+val.getValue("src")+"");
107 src_add = "";
108 }
109 Matcher dest_regex = ipPattern.matcher(val.getValue("dest"));
110 if (dest_regex.matches()) {
111 dest_add = dest_regex.group(1);
112 } else {
113 log.warn("Failed to match dest IP:"+val.getValue("dest")+"");
114 dest_add = "";
115 }
116 Matcher datanodeserver_regex = ipPattern.matcher(val.getValue("srvID"));
117 if (datanodeserver_regex.matches()) {
118 datanodeserver_add = datanodeserver_regex.group(1);
119 } else {
120 log.warn("Failed to match DataNode server address:"+val.getValue("srvID")+"");
121 datanodeserver_add = "";
122 }
123
124 start_rec.host_exec = src_add;
125 end_rec.host_exec = src_add;
126
127 blkid = val.getValue("blockid").trim();
128 if (fieldNamesList.contains("cliID")) {
129 cli_id = val.getValue("cliID").trim();
130 if (cli_id.startsWith("DFSClient_")) {
131 cli_id = cli_id.substring(10);
132 }
133 } else {
134 cli_id = "";
135 }
136 current_op = val.getValue("op");
137 String [] k = key.getKey().split("/");
138
139 long actual_time_ms = Long.parseLong(val.getValue("actual_time"));
140 if (fieldNamesList.contains("duration")) {
141 try {
142 actual_time_ms -= (Long.parseLong(val.getValue("duration").trim()) / 1000);
143 } catch (NumberFormatException nef) {
144 log.warn("Failed to parse duration: >>" + val.getValue("duration"));
145 }
146 } else {
147 actual_time_ms -= DEFAULT_READ_DURATION_MS;
148 }
149
150 start_rec.time_orig_epoch = k[0];
151 start_rec.time_orig = Long.toString(actual_time_ms);
152 start_rec.timestamp = Long.toString(actual_time_ms);
153 start_rec.time_end = "";
154 start_rec.time_start = start_rec.timestamp;
155
156 end_rec.time_orig_epoch = k[0];
157 end_rec.time_orig = val.getValue("actual_time");
158 end_rec.timestamp = val.getValue("actual_time");
159 end_rec.time_end = val.getValue("actual_time");
160 end_rec.time_start = "";
161
162 log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start)));
163
164 end_rec.job_id = cli_id;
165 start_rec.job_id = cli_id;
166
167 if (current_op.equals("HDFS_READ")) {
168 if (src_add != null && src_add.equals(dest_add)) {
169 start_rec.state_hdfs = new HDFSState(HDFSState.READ_LOCAL);
170 } else {
171 start_rec.state_hdfs = new HDFSState(HDFSState.READ_REMOTE);
172 }
173
174 start_rec.host_other = dest_add;
175 end_rec.host_other = dest_add;
176 } else if (current_op.equals("HDFS_WRITE")) {
177 if (src_add != null && dest_add.equals(datanodeserver_add)) {
178 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_LOCAL);
179 } else if (!dest_add.equals(datanodeserver_add)) {
180 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REMOTE);
181 } else {
182 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REPLICATED);
183 }
184 start_rec.host_other = dest_add;
185 end_rec.host_other = dest_add;
186 } else {
187 log.warn("Invalid state: " + current_op);
188 }
189 end_rec.state_hdfs = start_rec.state_hdfs;
190 start_rec.state_name = start_rec.state_hdfs.toString();
191 end_rec.state_name = end_rec.state_hdfs.toString();
192 start_rec.identifier = blkid;
193 end_rec.identifier = blkid;
194
195 start_rec.unique_id = new StringBuilder().append(start_rec.state_name).append("@").append(start_rec.identifier).append("@").append(start_rec.job_id).toString();
196 end_rec.unique_id = new StringBuilder().append(end_rec.state_name).append("@").append(end_rec.identifier).append("@").append(end_rec.job_id).toString();
197
198 start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
199 start_rec.add_info.put("csource",val.getValue("csource"));
200 end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
201 end_rec.add_info.put("csource",val.getValue("csource"));
202 end_rec.add_info.put("STATE_STRING","SUCCESS");
203
204
205 end_rec.add_info.put("BYTES",val.getValue("bytes"));
206
207 String crk_mid_string_start = new StringBuilder().append(start_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString();
208 String crk_mid_string_end = new StringBuilder().append(end_rec.getUniqueID()).append("_").append(start_rec.timestamp).toString();
209 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec);
210 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec);
211
212 }
213
214 }