This project has retired. For details please refer to its
Attic page.
Demux xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31 import org.apache.hadoop.chukwa.ChunkImpl;
32 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
34 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
36 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
37 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
38 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40 import org.apache.hadoop.chukwa.util.ExceptionUtil;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.conf.Configured;
43 import org.apache.hadoop.filecache.DistributedCache;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.mapred.FileInputFormat;
48 import org.apache.hadoop.mapred.FileOutputFormat;
49 import org.apache.hadoop.mapred.JobClient;
50 import org.apache.hadoop.mapred.JobConf;
51 import org.apache.hadoop.mapred.JobPriority;
52 import org.apache.hadoop.mapred.MapReduceBase;
53 import org.apache.hadoop.mapred.Mapper;
54 import org.apache.hadoop.mapred.OutputCollector;
55 import org.apache.hadoop.mapred.Reducer;
56 import org.apache.hadoop.mapred.Reporter;
57 import org.apache.hadoop.mapred.SequenceFileInputFormat;
58 import org.apache.hadoop.util.Tool;
59 import org.apache.hadoop.util.ToolRunner;
60 import org.apache.log4j.Logger;
61
62 public class Demux extends Configured implements Tool {
63 static Logger log = Logger.getLogger(Demux.class);
64 static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
65 public static Configuration jobConf = null;
66
67 public static class MapClass extends MapReduceBase implements
68 Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
69
70 @Override
71 public void configure(JobConf jobConf) {
72 super.configure(jobConf);
73 Demux.jobConf = jobConf;
74 }
75
76 public void map(ChukwaArchiveKey key, ChunkImpl chunk,
77 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
78 throws IOException {
79
80 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
81 "DemuxMapOutput", output, reporter);
82 try {
83 long duration = System.currentTimeMillis();
84 if (log.isDebugEnabled()) {
85 log.debug("Entry: [" + chunk.getData() + "] EventType: ["
86 + chunk.getDataType() + "]");
87 }
88
89 String defaultProcessor = Demux.jobConf.get(
90 "chukwa.demux.mapper.default.processor",
91 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
92
93 String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
94 defaultProcessor);
95
96 String processorClass = processorClass_pri.split(",")[0];
97 if (!processorClass.equalsIgnoreCase("Drop")) {
98 reporter.incrCounter("DemuxMapInput", "total chunks", 1);
99 reporter.incrCounter("DemuxMapInput",
100 chunk.getDataType() + " chunks", 1);
101
102 MapProcessor processor = MapProcessorFactory
103 .getProcessor(processorClass);
104 processor.process(key, chunk, chukwaOutputCollector, reporter);
105 if (log.isDebugEnabled()) {
106 duration = System.currentTimeMillis() - duration;
107 log.debug("Demux:Map dataType:" + chunk.getDataType()
108 + " duration:" + duration + " processor:" + processorClass
109 + " recordCount:" + chunk.getRecordOffsets().length);
110 }
111
112 } else {
113 log.info("action:Demux, dataType:" + chunk.getDataType()
114 + " duration:0 processor:Drop recordCount:"
115 + chunk.getRecordOffsets().length);
116 }
117
118 } catch (Exception e) {
119 log.warn("Exception in Demux:MAP", e);
120 e.printStackTrace();
121 }
122 }
123 }
124
125 public static class ReduceClass extends MapReduceBase implements
126 Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
127
128 public void configure(JobConf jobConf) {
129 super.configure(jobConf);
130 Demux.jobConf = jobConf;
131 }
132
133 public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
134 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
135 throws IOException {
136 ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
137 "DemuxReduceOutput", output, reporter);
138 try {
139 long duration = System.currentTimeMillis();
140 reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
141 reporter.incrCounter("DemuxReduceInput", key.getReduceType()
142 + " total distinct keys", 1);
143
144 String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
145 ".IdentityReducer";
146 String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
147 "," + defaultProcessor_classname);
148
149 String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
150 String[] processClass_tmps = processClass_pri.split(",");
151 String processClass = null;
152 if (processClass_tmps.length != 2)
153 processClass = defaultProcessor_classname;
154 else
155 processClass = processClass_tmps[1];
156
157 ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
158 System.out.println(processor.getClass().getName());
159 processor.process(key, values, chukwaOutputCollector, reporter);
160
161 if (log.isDebugEnabled()) {
162 duration = System.currentTimeMillis() - duration;
163 log.debug("Demux:Reduce, dataType:" + key.getReduceType()
164 + " duration:" + duration);
165 }
166
167 } catch (Exception e) {
168 log.warn("Exception in Demux:Reduce", e);
169 e.printStackTrace();
170 }
171 }
172 }
173
174 static int printUsage() {
175 System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
176 ToolRunner.printGenericCommandUsage(System.out);
177 return -1;
178 }
179
180 public static void addParsers(Configuration conf) {
181 String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
182 try {
183 FileSystem fs = FileSystem.get(new Configuration());
184 FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
185 if (fstatus != null) {
186 String hdfsUrlPrefix = conf.get("fs.default.name");
187
188 for (FileStatus parser : fstatus) {
189 Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
190 log.debug("Adding parser JAR path " + jarPath);
191 DistributedCache.addFileToClassPath(jarPath, conf);
192 }
193 }
194 } catch (IOException e) {
195 log.error(ExceptionUtil.getStackTrace(e));
196 }
197 }
198
199 public int run(String[] args) throws Exception {
200 JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
201
202
203 conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
204 conf.setInputFormat(SequenceFileInputFormat.class);
205 conf.setMapperClass(Demux.MapClass.class);
206 conf.setPartitionerClass(ChukwaRecordPartitioner.class);
207 conf.setReducerClass(Demux.ReduceClass.class);
208
209 conf.setOutputKeyClass(ChukwaRecordKey.class);
210 conf.setOutputValueClass(ChukwaRecord.class);
211 conf.setOutputFormat(ChukwaRecordOutputFormat.class);
212 conf.setJobPriority(JobPriority.VERY_HIGH);
213 addParsers(conf);
214
215 List<String> other_args = new ArrayList<String>();
216 for (int i = 0; i < args.length; ++i) {
217 try {
218 if ("-m".equals(args[i])) {
219 conf.setNumMapTasks(Integer.parseInt(args[++i]));
220 } else if ("-r".equals(args[i])) {
221 conf.setNumReduceTasks(Integer.parseInt(args[++i]));
222 } else {
223 other_args.add(args[i]);
224 }
225 } catch (NumberFormatException except) {
226 System.out.println("ERROR: Integer expected instead of " + args[i]);
227 return printUsage();
228 } catch (ArrayIndexOutOfBoundsException except) {
229 System.out.println("ERROR: Required parameter missing from "
230 + args[i - 1]);
231 return printUsage();
232 }
233 }
234
235 if (other_args.size() != 2) {
236 System.out.println("ERROR: Wrong number of parameters: "
237 + other_args.size() + " instead of 2.");
238 return printUsage();
239 }
240
241 FileInputFormat.setInputPaths(conf, other_args.get(0));
242 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
243
244 JobClient.runJob(conf);
245 return 0;
246 }
247
248 public static void main(String[] args) throws Exception {
249 int res = ToolRunner.run(new Configuration(), new Demux(), args);
250 System.exit(res);
251 }
252
253 }