This project has retired. For details please refer to its
Attic page.
TaskTrackerClientTraceMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.regex.*;
24 import java.util.Random;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.chukwa.extraction.demux.*;
30 import org.apache.hadoop.chukwa.extraction.engine.*;
31 import org.apache.hadoop.conf.*;
32 import org.apache.hadoop.mapred.*;
33 import org.apache.hadoop.util.*;
34
35
36
37
38
39
40
41
42
43 public class TaskTrackerClientTraceMapper
44 extends MapReduceBase
45 implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
46 {
47 private static Log log = LogFactory.getLog(FSMBuilder.class);
48 protected static final String SEP = "/";
49 protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
50 private final Pattern ipPattern =
51 Pattern.compile("([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*");
52
53 public void map
54 (ChukwaRecordKey key, ChukwaRecord val,
55 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
56 Reporter reporter)
57 throws IOException
58 {
59
60
61 String [] fieldNames = val.getFields();
62 ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
63 for (int i = 0; i < fieldNames.length; i++) {
64 fieldNamesList.add(fieldNames[i]);
65 }
66
67
68
69
70 if (key.getReduceType().equals("ClientTraceDetailed")) {
71 assert(fieldNamesList.contains("op"));
72 if (val.getValue("op").startsWith("MAPRED")) {
73 parseClientTraceDetailed(key, val, output, reporter, fieldNamesList);
74 }
75 }
76 }
77
78 protected final int DEFAULT_SHUFFLE_DURATION_MS = 10;
79
80
81
82 protected void parseClientTraceDetailed
83 (ChukwaRecordKey key, ChukwaRecord val,
84 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
85 Reporter reporter, ArrayList<String> fieldNamesList)
86 throws IOException
87 {
88 FSMIntermedEntry start_rec, end_rec;
89 String current_op = null, src_add = null, dest_add = null;
90 String reduce_id = null, map_id = null;
91
92
93 start_rec = new FSMIntermedEntry();
94 end_rec = new FSMIntermedEntry();
95 start_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
96 start_rec.state_type = new StateType(StateType.STATE_START);
97 end_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
98 end_rec.state_type = new StateType(StateType.STATE_END);
99
100
101 Matcher src_regex = ipPattern.matcher(val.getValue("src"));
102 if (src_regex.matches()) {
103 src_add = src_regex.group(1);
104 } else {
105 log.warn("Failed to match src IP:"+val.getValue("src")+"");
106 src_add = new String("");
107 }
108 Matcher dest_regex = ipPattern.matcher(val.getValue("dest"));
109 if (dest_regex.matches()) {
110 dest_add = dest_regex.group(1);
111 } else {
112 log.warn("Failed to match dest IP:"+val.getValue("dest")+"");
113 dest_add = new String("");
114 }
115 if (fieldNamesList.contains("reduceID")) {
116 reduce_id = new String(val.getValue("reduceID"));
117 } else {
118
119 Random r = new Random();
120 reduce_id = new String("noreduce" + r.nextInt());
121 }
122
123 if (fieldNamesList.contains("cliID")) {
124 map_id = new String(val.getValue("cliID").trim());
125 } else {
126 map_id = new String("nomap");
127 }
128
129 current_op = val.getValue("op");
130
131 start_rec.host_exec = new String(src_add);
132 end_rec.host_exec = new String(src_add);
133 start_rec.host_other = new String(dest_add);
134 end_rec.host_other = new String(dest_add);
135
136
137
138 long actual_time_ms = Long.parseLong(val.getValue("actual_time"));
139 if (fieldNamesList.contains("duration")) {
140 try {
141 actual_time_ms -= (Long.parseLong(val.getValue("duration").trim()) / 1000);
142 } catch (NumberFormatException nef) {
143 log.warn("Failed to parse duration: >>" + val.getValue("duration"));
144 }
145 } else {
146 actual_time_ms -= DEFAULT_SHUFFLE_DURATION_MS;
147 }
148
149 String [] k = key.getKey().split("/");
150
151 start_rec.time_orig_epoch = k[0];
152 start_rec.time_orig = (new Long(actual_time_ms)).toString();
153 start_rec.timestamp = (new Long(actual_time_ms)).toString();
154 start_rec.time_end = new String("");
155 start_rec.time_start = new String(start_rec.timestamp);
156
157 end_rec.time_orig_epoch = k[0];
158 end_rec.time_orig = val.getValue("actual_time");
159 end_rec.timestamp = new String(val.getValue("actual_time"));
160 end_rec.time_end = new String(val.getValue("actual_time"));
161 end_rec.time_start = new String("");
162
163 log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start)));
164
165 start_rec.job_id = new String(reduce_id);
166 end_rec.job_id = new String(reduce_id);
167
168 if (current_op.equals("MAPRED_SHUFFLE")) {
169 if (src_add != null && src_add.equals(dest_add)) {
170 start_rec.state_mapred = new MapRedState(MapRedState.SHUFFLE_LOCAL);
171 } else {
172 start_rec.state_mapred = new MapRedState(MapRedState.SHUFFLE_REMOTE);
173 }
174 } else {
175 log.warn("Invalid state: " + current_op);
176 }
177 end_rec.state_mapred = start_rec.state_mapred;
178 start_rec.state_name = start_rec.state_mapred.toString();
179 end_rec.state_name = end_rec.state_mapred.toString();
180 start_rec.identifier = new String(reduce_id + "@" + map_id);
181 end_rec.identifier = new String(reduce_id + "@" + map_id);
182
183 start_rec.generateUniqueID();
184 end_rec.generateUniqueID();
185
186 start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
187 start_rec.add_info.put("csource",val.getValue("csource"));
188 end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
189 end_rec.add_info.put("csource",val.getValue("csource"));
190 end_rec.add_info.put("STATE_STRING",new String("SUCCESS"));
191
192
193 end_rec.add_info.put("BYTES",val.getValue("bytes"));
194
195 String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp);
196 String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp);
197 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec);
198 output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec);
199
200 }
201
202 }