This project has retired. For details please refer to its
Attic page.
Demux xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31 import org.apache.hadoop.chukwa.ChunkImpl;
32 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
34 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
36 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
37 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
38 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40 import org.apache.hadoop.chukwa.util.ExceptionUtil;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.conf.Configured;
43 import org.apache.hadoop.filecache.DistributedCache;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.mapred.FileInputFormat;
48 import org.apache.hadoop.mapred.FileOutputFormat;
49 import org.apache.hadoop.mapred.JobClient;
50 import org.apache.hadoop.mapred.JobConf;
51 import org.apache.hadoop.mapred.JobPriority;
52 import org.apache.hadoop.mapred.MapReduceBase;
53 import org.apache.hadoop.mapred.Mapper;
54 import org.apache.hadoop.mapred.OutputCollector;
55 import org.apache.hadoop.mapred.Reducer;
56 import org.apache.hadoop.mapred.Reporter;
57 import org.apache.hadoop.mapred.SequenceFileInputFormat;
58 import org.apache.hadoop.util.Tool;
59 import org.apache.hadoop.util.ToolRunner;
60 import org.apache.log4j.Logger;
61
62 public class Demux extends Configured implements Tool {
63 static Logger log = Logger.getLogger(Demux.class);
64 public static Configuration jobConf = null;
65 protected static void setJobConf(JobConf jobConf) {
66 Demux.jobConf = jobConf;
67 }
68
69 protected Configuration getJobConf() {
70 return Demux.jobConf;
71 }
72
73 public static class MapClass extends MapReduceBase implements
74 Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
75
76 private Configuration jobConf = null;
77
78 @Override
79 public void configure(JobConf jobConf) {
80 super.configure(jobConf);
81 setJobConf(jobConf);
82 }
83
84 private void setJobConf(JobConf jobConf) {
85 this.jobConf = jobConf;
86 }
87
88 public void map(ChukwaArchiveKey key, ChunkImpl chunk,
89 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
90 throws IOException {
91
92 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
93 "DemuxMapOutput", output, reporter);
94 try {
95 long duration = System.currentTimeMillis();
96 if (log.isDebugEnabled()) {
97 log.debug("Entry: [" + String.valueOf(chunk.getData()) + "] EventType: ["
98 + chunk.getDataType() + "]");
99 }
100
101 String defaultProcessor = jobConf.get(
102 "chukwa.demux.mapper.default.processor",
103 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
104
105 String processorClass_pri = jobConf.get(chunk.getDataType(),
106 defaultProcessor);
107
108 String processorClass = processorClass_pri.split(",")[0];
109 if (!processorClass.equalsIgnoreCase("Drop")) {
110 reporter.incrCounter("DemuxMapInput", "total chunks", 1);
111 reporter.incrCounter("DemuxMapInput",
112 chunk.getDataType() + " chunks", 1);
113
114 MapProcessor processor = MapProcessorFactory
115 .getProcessor(processorClass);
116 processor.process(key, chunk, chukwaOutputCollector, reporter);
117 if (log.isDebugEnabled()) {
118 duration = System.currentTimeMillis() - duration;
119 log.debug("Demux:Map dataType:" + chunk.getDataType()
120 + " duration:" + duration + " processor:" + processorClass
121 + " recordCount:" + chunk.getRecordOffsets().length);
122 }
123
124 } else {
125 log.info("action:Demux, dataType:" + chunk.getDataType()
126 + " duration:0 processor:Drop recordCount:"
127 + chunk.getRecordOffsets().length);
128 }
129
130 } catch (Exception e) {
131 log.warn("Exception in Demux:MAP", e);
132 e.printStackTrace();
133 }
134 }
135 }
136
137 public static class ReduceClass extends MapReduceBase implements
138 Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
139
140 private Configuration jobConf = null;
141
142 public void configure(JobConf jobConf) {
143 super.configure(jobConf);
144 this.jobConf = jobConf;
145 }
146
147 public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
148 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
149 throws IOException {
150 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
151 "DemuxReduceOutput", output, reporter);
152 try {
153 long duration = System.currentTimeMillis();
154 reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
155 reporter.incrCounter("DemuxReduceInput", key.getReduceType()
156 + " total distinct keys", 1);
157
158 String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
159 ".IdentityReducer";
160 String defaultProcessor = jobConf.get("chukwa.demux.reducer.default.processor",
161 "," + defaultProcessor_classname);
162
163 String processClass_pri = jobConf.get(key.getReduceType(), defaultProcessor);
164 String[] processClass_tmps = processClass_pri.split(",");
165 String processClass = null;
166 if (processClass_tmps.length != 2)
167 processClass = defaultProcessor_classname;
168 else
169 processClass = processClass_tmps[1];
170
171 ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
172 System.out.println(processor.getClass().getName());
173 processor.process(key, values, chukwaOutputCollector, reporter);
174
175 if (log.isDebugEnabled()) {
176 duration = System.currentTimeMillis() - duration;
177 log.debug("Demux:Reduce, dataType:" + key.getReduceType()
178 + " duration:" + duration);
179 }
180
181 } catch (Exception e) {
182 log.warn("Exception in Demux:Reduce", e);
183 e.printStackTrace();
184 }
185 }
186 }
187
188 static int printUsage() {
189 System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
190 ToolRunner.printGenericCommandUsage(System.out);
191 return -1;
192 }
193
194 public static void addParsers(Configuration conf) {
195 String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
196 try {
197 FileSystem fs = FileSystem.get(new Configuration());
198 FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
199 if (fstatus != null) {
200 String hdfsUrlPrefix = conf.get("fs.defaultFS");
201
202 for (FileStatus parser : fstatus) {
203 Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
204 log.debug("Adding parser JAR path " + jarPath);
205 DistributedCache.addFileToClassPath(jarPath, conf);
206 }
207 }
208 } catch (IOException e) {
209 log.error(ExceptionUtil.getStackTrace(e));
210 }
211 }
212
213 public int run(String[] args) throws Exception {
214 JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
215
216 SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
217 conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
218 conf.setInputFormat(SequenceFileInputFormat.class);
219 conf.setMapperClass(Demux.MapClass.class);
220 conf.setPartitionerClass(ChukwaRecordPartitioner.class);
221 conf.setReducerClass(Demux.ReduceClass.class);
222
223 conf.setOutputKeyClass(ChukwaRecordKey.class);
224 conf.setOutputValueClass(ChukwaRecord.class);
225 conf.setOutputFormat(ChukwaRecordOutputFormat.class);
226 conf.setJobPriority(JobPriority.VERY_HIGH);
227 addParsers(conf);
228
229 List<String> other_args = new ArrayList<String>();
230 for (int i = 0; i < args.length; ++i) {
231 try {
232 if ("-m".equals(args[i])) {
233 conf.setNumMapTasks(Integer.parseInt(args[++i]));
234 } else if ("-r".equals(args[i])) {
235 conf.setNumReduceTasks(Integer.parseInt(args[++i]));
236 } else {
237 other_args.add(args[i]);
238 }
239 } catch (NumberFormatException except) {
240 System.out.println("ERROR: Integer expected instead of " + args[i]);
241 return printUsage();
242 } catch (ArrayIndexOutOfBoundsException except) {
243 System.out.println("ERROR: Required parameter missing from "
244 + args[i - 1]);
245 return printUsage();
246 }
247 }
248
249 if (other_args.size() != 2) {
250 System.out.println("ERROR: Wrong number of parameters: "
251 + other_args.size() + " instead of 2.");
252 return printUsage();
253 }
254
255 FileInputFormat.setInputPaths(conf, other_args.get(0));
256 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
257
258 JobClient.runJob(conf);
259 return 0;
260 }
261
262 public static void main(String[] args) throws Exception {
263 int res = ToolRunner.run(new Configuration(), new Demux(), args);
264 return;
265 }
266
267 }