This project has retired. For details please refer to its Attic page.
Demux xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Date;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31  import org.apache.hadoop.chukwa.ChunkImpl;
32  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
33  import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
34  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
36  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
37  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
38  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40  import org.apache.hadoop.chukwa.util.ExceptionUtil;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.conf.Configured;
43  import org.apache.hadoop.filecache.DistributedCache;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.mapred.FileInputFormat;
48  import org.apache.hadoop.mapred.FileOutputFormat;
49  import org.apache.hadoop.mapred.JobClient;
50  import org.apache.hadoop.mapred.JobConf;
51  import org.apache.hadoop.mapred.JobPriority;
52  import org.apache.hadoop.mapred.MapReduceBase;
53  import org.apache.hadoop.mapred.Mapper;
54  import org.apache.hadoop.mapred.OutputCollector;
55  import org.apache.hadoop.mapred.Reducer;
56  import org.apache.hadoop.mapred.Reporter;
57  import org.apache.hadoop.mapred.SequenceFileInputFormat;
58  import org.apache.hadoop.util.Tool;
59  import org.apache.hadoop.util.ToolRunner;
60  import org.apache.log4j.Logger;
61  
62  public class Demux extends Configured implements Tool {
63    static Logger log = Logger.getLogger(Demux.class);
64    public static Configuration jobConf = null;
65    protected static void setJobConf(JobConf jobConf) {
66      Demux.jobConf = jobConf;
67    }
68  
69    protected Configuration getJobConf() {
70      return Demux.jobConf;
71    }
72  
73    public static class MapClass extends MapReduceBase implements
74            Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
75  
76      private Configuration jobConf = null;
77  
78      @Override
79      public void configure(JobConf jobConf) {
80        super.configure(jobConf);
81        setJobConf(jobConf);
82      }
83  
84      private void setJobConf(JobConf jobConf) {
85        this.jobConf = jobConf;
86      }
87  
88      public void map(ChukwaArchiveKey key, ChunkImpl chunk,
89                      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
90              throws IOException {
91  
92        ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
93                "DemuxMapOutput", output, reporter);
94        try {
95          long duration = System.currentTimeMillis();
96          if (log.isDebugEnabled()) {
97            log.debug("Entry: [" + String.valueOf(chunk.getData()) + "] EventType: ["
98                    + chunk.getDataType() + "]");
99          }
100 
101         String defaultProcessor = jobConf.get(
102                 "chukwa.demux.mapper.default.processor",
103                 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
104 
105         String processorClass_pri = jobConf.get(chunk.getDataType(),
106                 defaultProcessor);
107 
108         String processorClass = processorClass_pri.split(",")[0];
109         if (!processorClass.equalsIgnoreCase("Drop")) {
110           reporter.incrCounter("DemuxMapInput", "total chunks", 1);
111           reporter.incrCounter("DemuxMapInput",
112                   chunk.getDataType() + " chunks", 1);
113 
114           MapProcessor processor = MapProcessorFactory
115                   .getProcessor(processorClass);
116           processor.process(key, chunk, chukwaOutputCollector, reporter);
117           if (log.isDebugEnabled()) {
118             duration = System.currentTimeMillis() - duration;
119             log.debug("Demux:Map dataType:" + chunk.getDataType()
120                     + " duration:" + duration + " processor:" + processorClass
121                     + " recordCount:" + chunk.getRecordOffsets().length);
122           }
123 
124         } else {
125           log.info("action:Demux, dataType:" + chunk.getDataType()
126                   + " duration:0 processor:Drop recordCount:"
127                   + chunk.getRecordOffsets().length);
128         }
129 
130       } catch (Exception e) {
131         log.warn("Exception in Demux:MAP", e);
132         e.printStackTrace();
133       }
134     }
135   }
136 
137   public static class ReduceClass extends MapReduceBase implements
138           Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
139 
140     private Configuration jobConf = null;
141 
142     public void configure(JobConf jobConf) {
143       super.configure(jobConf);
144       this.jobConf = jobConf;
145     }
146 
147     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
148                        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
149             throws IOException {
150       ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
151               "DemuxReduceOutput", output, reporter);
152       try {
153         long duration = System.currentTimeMillis();
154         reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
155         reporter.incrCounter("DemuxReduceInput", key.getReduceType()
156                 + " total distinct keys", 1);
157 
158         String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
159                 ".IdentityReducer";
160         String defaultProcessor = jobConf.get("chukwa.demux.reducer.default.processor",
161                 "," + defaultProcessor_classname);
162 
163         String processClass_pri = jobConf.get(key.getReduceType(), defaultProcessor);
164         String[] processClass_tmps = processClass_pri.split(",");
165         String processClass = null;
166         if (processClass_tmps.length != 2)
167           processClass = defaultProcessor_classname;
168         else
169           processClass = processClass_tmps[1];
170 
171         ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
172         System.out.println(processor.getClass().getName());
173         processor.process(key, values, chukwaOutputCollector, reporter);
174 
175         if (log.isDebugEnabled()) {
176           duration = System.currentTimeMillis() - duration;
177           log.debug("Demux:Reduce, dataType:" + key.getReduceType()
178                   + " duration:" + duration);
179         }
180 
181       } catch (Exception e) {
182         log.warn("Exception in Demux:Reduce", e);
183         e.printStackTrace();
184       }
185     }
186   }
187 
188   static int printUsage() {
189     System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
190     ToolRunner.printGenericCommandUsage(System.out);
191     return -1;
192   }
193 
194   public static void addParsers(Configuration conf) {
195     String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
196     try {
197       FileSystem fs = FileSystem.get(new Configuration());
198       FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
199       if (fstatus != null) {
200         String hdfsUrlPrefix = conf.get("fs.defaultFS");
201 
202         for (FileStatus parser : fstatus) {
203           Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
204           log.debug("Adding parser JAR path " + jarPath);
205           DistributedCache.addFileToClassPath(jarPath, conf);
206         }
207       }
208     } catch (IOException e) {
209       log.error(ExceptionUtil.getStackTrace(e));
210     }
211   }
212 
213   public int run(String[] args) throws Exception {
214     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
215 
216     SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
217     conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
218     conf.setInputFormat(SequenceFileInputFormat.class);
219     conf.setMapperClass(Demux.MapClass.class);
220     conf.setPartitionerClass(ChukwaRecordPartitioner.class);
221     conf.setReducerClass(Demux.ReduceClass.class);
222 
223     conf.setOutputKeyClass(ChukwaRecordKey.class);
224     conf.setOutputValueClass(ChukwaRecord.class);
225     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
226     conf.setJobPriority(JobPriority.VERY_HIGH);
227     addParsers(conf);
228 
229     List<String> other_args = new ArrayList<String>();
230     for (int i = 0; i < args.length; ++i) {
231       try {
232         if ("-m".equals(args[i])) {
233           conf.setNumMapTasks(Integer.parseInt(args[++i]));
234         } else if ("-r".equals(args[i])) {
235           conf.setNumReduceTasks(Integer.parseInt(args[++i]));
236         } else {
237           other_args.add(args[i]);
238         }
239       } catch (NumberFormatException except) {
240         System.out.println("ERROR: Integer expected instead of " + args[i]);
241         return printUsage();
242       } catch (ArrayIndexOutOfBoundsException except) {
243         System.out.println("ERROR: Required parameter missing from "
244                 + args[i - 1]);
245         return printUsage();
246       }
247     }
248     // Make sure there are exactly 2 parameters left.
249     if (other_args.size() != 2) {
250       System.out.println("ERROR: Wrong number of parameters: "
251               + other_args.size() + " instead of 2.");
252       return printUsage();
253     }
254 
255     FileInputFormat.setInputPaths(conf, other_args.get(0));
256     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
257 
258     JobClient.runJob(conf);
259     return 0;
260   }
261 
262   public static void main(String[] args) throws Exception {
263     int res = ToolRunner.run(new Configuration(), new Demux(), args);
264     return;
265   }
266 
267 }