This project has retired. For details please refer to its Attic page.
Demux xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Date;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31  import org.apache.hadoop.chukwa.ChunkImpl;
32  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
33  import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
34  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
36  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
37  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
38  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40  import org.apache.hadoop.chukwa.util.ExceptionUtil;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.conf.Configured;
43  import org.apache.hadoop.filecache.DistributedCache;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.mapred.FileInputFormat;
48  import org.apache.hadoop.mapred.FileOutputFormat;
49  import org.apache.hadoop.mapred.JobClient;
50  import org.apache.hadoop.mapred.JobConf;
51  import org.apache.hadoop.mapred.JobPriority;
52  import org.apache.hadoop.mapred.MapReduceBase;
53  import org.apache.hadoop.mapred.Mapper;
54  import org.apache.hadoop.mapred.OutputCollector;
55  import org.apache.hadoop.mapred.Reducer;
56  import org.apache.hadoop.mapred.Reporter;
57  import org.apache.hadoop.mapred.SequenceFileInputFormat;
58  import org.apache.hadoop.util.Tool;
59  import org.apache.hadoop.util.ToolRunner;
60  import org.apache.log4j.Logger;
61  
62  public class Demux extends Configured implements Tool {
63    static Logger log = Logger.getLogger(Demux.class);
64    static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
65    public static Configuration jobConf = null;
66  
67    public static class MapClass extends MapReduceBase implements
68            Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
69  
70      @Override
71      public void configure(JobConf jobConf) {
72        super.configure(jobConf);
73        Demux.jobConf = jobConf;
74      }
75  
76      public void map(ChukwaArchiveKey key, ChunkImpl chunk,
77                      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
78              throws IOException {
79  
80        ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
81                "DemuxMapOutput", output, reporter);
82        try {
83          long duration = System.currentTimeMillis();
84          if (log.isDebugEnabled()) {
85            log.debug("Entry: [" + chunk.getData() + "] EventType: ["
86                    + chunk.getDataType() + "]");
87          }
88  
89          String defaultProcessor = Demux.jobConf.get(
90                  "chukwa.demux.mapper.default.processor",
91                  "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
92  
93          String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
94                  defaultProcessor);
95  
96          String processorClass = processorClass_pri.split(",")[0];
97          if (!processorClass.equalsIgnoreCase("Drop")) {
98            reporter.incrCounter("DemuxMapInput", "total chunks", 1);
99            reporter.incrCounter("DemuxMapInput",
100                   chunk.getDataType() + " chunks", 1);
101 
102           MapProcessor processor = MapProcessorFactory
103                   .getProcessor(processorClass);
104           processor.process(key, chunk, chukwaOutputCollector, reporter);
105           if (log.isDebugEnabled()) {
106             duration = System.currentTimeMillis() - duration;
107             log.debug("Demux:Map dataType:" + chunk.getDataType()
108                     + " duration:" + duration + " processor:" + processorClass
109                     + " recordCount:" + chunk.getRecordOffsets().length);
110           }
111 
112         } else {
113           log.info("action:Demux, dataType:" + chunk.getDataType()
114                   + " duration:0 processor:Drop recordCount:"
115                   + chunk.getRecordOffsets().length);
116         }
117 
118       } catch (Exception e) {
119         log.warn("Exception in Demux:MAP", e);
120         e.printStackTrace();
121       }
122     }
123   }
124 
125   public static class ReduceClass extends MapReduceBase implements
126           Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
127 
128     public void configure(JobConf jobConf) {
129       super.configure(jobConf);
130       Demux.jobConf = jobConf;
131     }
132 
133     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
134                        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
135             throws IOException {
136       ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
137               "DemuxReduceOutput", output, reporter);
138       try {
139         long duration = System.currentTimeMillis();
140         reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
141         reporter.incrCounter("DemuxReduceInput", key.getReduceType()
142                 + " total distinct keys", 1);
143 
144         String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
145                 ".IdentityReducer";
146         String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
147                 "," + defaultProcessor_classname);
148 
149         String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
150         String[] processClass_tmps = processClass_pri.split(",");
151         String processClass = null;
152         if (processClass_tmps.length != 2)
153           processClass = defaultProcessor_classname;
154         else
155           processClass = processClass_tmps[1];
156 
157         ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
158         System.out.println(processor.getClass().getName());
159         processor.process(key, values, chukwaOutputCollector, reporter);
160 
161         if (log.isDebugEnabled()) {
162           duration = System.currentTimeMillis() - duration;
163           log.debug("Demux:Reduce, dataType:" + key.getReduceType()
164                   + " duration:" + duration);
165         }
166 
167       } catch (Exception e) {
168         log.warn("Exception in Demux:Reduce", e);
169         e.printStackTrace();
170       }
171     }
172   }
173 
174   static int printUsage() {
175     System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
176     ToolRunner.printGenericCommandUsage(System.out);
177     return -1;
178   }
179 
180   public static void addParsers(Configuration conf) {
181     String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
182     try {
183       FileSystem fs = FileSystem.get(new Configuration());
184       FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
185       if (fstatus != null) {
186         String hdfsUrlPrefix = conf.get("fs.default.name");
187 
188         for (FileStatus parser : fstatus) {
189           Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
190           log.debug("Adding parser JAR path " + jarPath);
191           DistributedCache.addFileToClassPath(jarPath, conf);
192         }
193       }
194     } catch (IOException e) {
195       log.error(ExceptionUtil.getStackTrace(e));
196     }
197   }
198 
199   public int run(String[] args) throws Exception {
200     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
201 
202 
203     conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
204     conf.setInputFormat(SequenceFileInputFormat.class);
205     conf.setMapperClass(Demux.MapClass.class);
206     conf.setPartitionerClass(ChukwaRecordPartitioner.class);
207     conf.setReducerClass(Demux.ReduceClass.class);
208 
209     conf.setOutputKeyClass(ChukwaRecordKey.class);
210     conf.setOutputValueClass(ChukwaRecord.class);
211     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
212     conf.setJobPriority(JobPriority.VERY_HIGH);
213     addParsers(conf);
214 
215     List<String> other_args = new ArrayList<String>();
216     for (int i = 0; i < args.length; ++i) {
217       try {
218         if ("-m".equals(args[i])) {
219           conf.setNumMapTasks(Integer.parseInt(args[++i]));
220         } else if ("-r".equals(args[i])) {
221           conf.setNumReduceTasks(Integer.parseInt(args[++i]));
222         } else {
223           other_args.add(args[i]);
224         }
225       } catch (NumberFormatException except) {
226         System.out.println("ERROR: Integer expected instead of " + args[i]);
227         return printUsage();
228       } catch (ArrayIndexOutOfBoundsException except) {
229         System.out.println("ERROR: Required parameter missing from "
230                 + args[i - 1]);
231         return printUsage();
232       }
233     }
234     // Make sure there are exactly 2 parameters left.
235     if (other_args.size() != 2) {
236       System.out.println("ERROR: Wrong number of parameters: "
237               + other_args.size() + " instead of 2.");
238       return printUsage();
239     }
240 
241     FileInputFormat.setInputPaths(conf, other_args.get(0));
242     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
243 
244     JobClient.runJob(conf);
245     return 0;
246   }
247 
248   public static void main(String[] args) throws Exception {
249     int res = ToolRunner.run(new Configuration(), new Demux(), args);
250     System.exit(res);
251   }
252 
253 }