This project has retired. For details please refer to its
Attic page.
DataNodeClientTraceMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.regex.*;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.apache.hadoop.chukwa.extraction.demux.*;
29 import org.apache.hadoop.chukwa.extraction.engine.*;
30 import org.apache.hadoop.conf.*;
31 import org.apache.hadoop.mapred.*;
32 import org.apache.hadoop.util.*;
33
34
35
36
37
38
39
40
41
42 public class DataNodeClientTraceMapper
43 extends MapReduceBase
44 implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
45 {
46 private static Log log = LogFactory.getLog(FSMBuilder.class);
47 protected static final String SEP = "/";
48 protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
49 private final Pattern ipPattern =
50 Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z0-9\\-_:\\/].*");
51
52 public void map
53 (ChukwaRecordKey key, ChukwaRecord val,
54 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
55 Reporter reporter)
56 throws IOException
57 {
58
59
60 String [] fieldNames = val.getFields();
61 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
62 for (int i = 0; i < fieldNames.length; i++) {
63 fieldNamesList.add(fieldNames[i]);
64 }
65
66
67
68
69 if (key.getReduceType().equals("ClientTraceDetailed")) {
70 assert(fieldNamesList.contains("op"));
71 if (val.getValue("op").startsWith("HDFS")) {
72 parseClientTraceDetailed(key, val, output, reporter, fieldNamesList);
73 }
74 }
75
76
77 }
78
79 protected final int DEFAULT_READ_DURATION_MS = 10;
80
81
82
83 protected void parseClientTraceDetailed
84 (ChukwaRecordKey key, ChukwaRecord val,
85 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
86 Reporter reporter, ArrayList<String> fieldNamesList)
87 throws IOException
88 {
89 FSMIntermedEntry start_rec, end_rec;
90 String current_op = null, src_add = null, dest_add = null;
91 String datanodeserver_add = null, blkid = null, cli_id = null;
92
93
94 start_rec = new FSMIntermedEntry();
95 end_rec = new FSMIntermedEntry();
96 start_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
97 start_rec.state_type = new StateType(StateType.STATE_START);
98 end_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
99 end_rec.state_type = new StateType(StateType.STATE_END);
100
101
102 Matcher src_regex = ipPattern.matcher(val.getValue("src"));
103 if (src_regex.matches()) {
104 src_add = src_regex.group(1);
105 } else {
106 log.warn("Failed to match src IP:"+val.getValue("src")+"");
107 src_add = new String("");
108 }
109 Matcher dest_regex = ipPattern.matcher(val.getValue("dest"));
110 if (dest_regex.matches()) {
111 dest_add = dest_regex.group(1);
112 } else {
113 log.warn("Failed to match dest IP:"+val.getValue("dest")+"");
114 dest_add = new String("");
115 }
116 Matcher datanodeserver_regex = ipPattern.matcher(val.getValue("srvID"));
117 if (datanodeserver_regex.matches()) {
118 datanodeserver_add = datanodeserver_regex.group(1);
119 } else {
120 log.warn("Failed to match DataNode server address:"+val.getValue("srvID")+"");
121 datanodeserver_add = new String("");
122 }
123
124 start_rec.host_exec = new String(src_add);
125 end_rec.host_exec = new String(src_add);
126
127 blkid = val.getValue("blockid").trim();
128 if (fieldNamesList.contains("cliID")) {
129 cli_id = val.getValue("cliID").trim();
130 if (cli_id.startsWith("DFSClient_")) {
131 cli_id = cli_id.substring(10);
132 }
133 } else {
134 cli_id = new String("");
135 }
136 current_op = val.getValue("op");
137 String [] k = key.getKey().split("/");
138
139 long actual_time_ms = Long.parseLong(val.getValue("actual_time"));
140 if (fieldNamesList.contains("duration")) {
141 try {
142 actual_time_ms -= (Long.parseLong(val.getValue("duration").trim()) / 1000);
143 } catch (NumberFormatException nef) {
144 log.warn("Failed to parse duration: >>" + val.getValue("duration"));
145 }
146 } else {
147 actual_time_ms -= DEFAULT_READ_DURATION_MS;
148 }
149
150 start_rec.time_orig_epoch = k[0];
151 start_rec.time_orig = (new Long(actual_time_ms)).toString();
152 start_rec.timestamp = (new Long(actual_time_ms)).toString();
153 start_rec.time_end = new String("");
154 start_rec.time_start = new String(start_rec.timestamp);
155
156 end_rec.time_orig_epoch = k[0];
157 end_rec.time_orig = val.getValue("actual_time");
158 end_rec.timestamp = new String(val.getValue("actual_time"));
159 end_rec.time_end = new String(val.getValue("actual_time"));
160 end_rec.time_start = new String("");
161
162 log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start)));
163
164 end_rec.job_id = new String(cli_id);
165 start_rec.job_id = new String(cli_id);
166
167 if (current_op.equals("HDFS_READ")) {
168 if (src_add != null && src_add.equals(dest_add)) {
169 start_rec.state_hdfs = new HDFSState(HDFSState.READ_LOCAL);
170 } else {
171 start_rec.state_hdfs = new HDFSState(HDFSState.READ_REMOTE);
172 }
173
174 start_rec.host_other = new String(dest_add);
175 end_rec.host_other = new String(dest_add);
176 } else if (current_op.equals("HDFS_WRITE")) {
177 if (src_add != null && dest_add.equals(datanodeserver_add)) {
178 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_LOCAL);
179 } else if (dest_add != null && !dest_add.equals(datanodeserver_add)) {
180 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REMOTE);
181 } else {
182 start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REPLICATED);
183 }
184 start_rec.host_other = new String(dest_add);
185 end_rec.host_other = new String(dest_add);
186 } else {
187 log.warn("Invalid state: " + current_op);
188 }
189 end_rec.state_hdfs = start_rec.state_hdfs;
190 start_rec.state_name = start_rec.state_hdfs.toString();
191 end_rec.state_name = end_rec.state_hdfs.toString();
192 start_rec.identifier = new String(blkid);
193 end_rec.identifier = new String(blkid);
194
195 start_rec.unique_id = new String(start_rec.state_name + "@" +
196 start_rec.identifier + "@" + start_rec.job_id);
197 end_rec.unique_id = new String(end_rec.state_name + "@" +
198 end_rec.identifier + "@" + end_rec.job_id);
199
200 start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
201 start_rec.add_info.put("csource",val.getValue("csource"));
202 end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
203 end_rec.add_info.put("csource",val.getValue("csource"));
204 end_rec.add_info.put("STATE_STRING",new String("SUCCESS"));
205
206
207 end_rec.add_info.put("BYTES",val.getValue("bytes"));
208
209 String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp);
210 String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp);
211 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec);
212 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec);
213
214 }
215
216 }