This project has retired. For details please refer to its Attic page.
ChukwaArchiveBuilder xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.archive;
20  
21  
22  import java.io.IOException;
23  import java.util.Iterator;
24  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
25  import org.apache.hadoop.chukwa.ChunkImpl;
26  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.mapred.FileInputFormat;
30  import org.apache.hadoop.mapred.FileOutputFormat;
31  import org.apache.hadoop.mapred.JobClient;
32  import org.apache.hadoop.mapred.JobConf;
33  import org.apache.hadoop.mapred.MapReduceBase;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reducer;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.hadoop.mapred.SequenceFileInputFormat;
38  import org.apache.hadoop.mapred.lib.IdentityMapper;
39  import org.apache.hadoop.util.Tool;
40  import org.apache.hadoop.util.ToolRunner;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * Main class for mapreduce job to do archiving of Chunks.
45   * 
46   * Map class and reduce classes are both identity; actual logic is in 
47   * Partitioner and OutputFormat classes.  Those are selected by first argument.
48   * 
49   * 
50   *
51   */
52  public class ChukwaArchiveBuilder extends Configured implements Tool {
53    
54    
55    static class UniqueKeyReduce extends MapReduceBase implements
56    Reducer<ChukwaArchiveKey, ChunkImpl, ChukwaArchiveKey, ChunkImpl> {
57  
58      /**
59       * Outputs exactly one value for each key; this suppresses duplicates
60       */
61      @Override
62      public void reduce(ChukwaArchiveKey key, Iterator<ChunkImpl> vals,
63          OutputCollector<ChukwaArchiveKey, ChunkImpl> out, Reporter r)
64          throws IOException {
65        ChunkImpl i = vals.next();
66        out.collect(key, i);
67        int dups = 0;
68        while(vals.hasNext()) {
69          vals.next();
70          dups ++;
71        }
72        r.incrCounter("app", "duplicate chunks", dups);
73      }
74    
75    }
76    static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
77  
78    static int printUsage() {
79      System.out
80          .println("ChukwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
81      ToolRunner.printGenericCommandUsage(System.out);
82      return -1;
83    }
84  
85    public int run(String[] args) throws Exception {
86  
87      // Make sure there are exactly 3 parameters left.
88      if (args.length != 3) {
89        System.out.println("ERROR: Wrong number of parameters: " + args.length
90            + " instead of 3.");
91        return printUsage();
92      }
93      JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
94  
95      jobConf.setInputFormat(SequenceFileInputFormat.class);
96  
97      jobConf.setMapperClass(IdentityMapper.class);
98      
99      jobConf.setReducerClass(UniqueKeyReduce.class);
100 //    jobConf.setReducerClass(IdentityReducer.class);
101 
102     if (args[0].equalsIgnoreCase("Daily")) {
103       jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
104       jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
105       jobConf.setJobName("Chukwa-DailyArchiveBuilder");
106     } else if (args[0].equalsIgnoreCase("Hourly")) {
107       jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
108       jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
109       jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
110     } else if (args[0].equalsIgnoreCase("DataType")) {
111       jobConf.setJobName("Chukwa-ArchiveBuilder-DataType");
112       int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
113       log.info("Reduce Count:" + reduceCount);
114       jobConf.setNumReduceTasks(reduceCount);
115 
116       jobConf.setPartitionerClass(ChukwaArchiveDataTypePartitioner.class);
117       jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);
118     } else if (args[0].equalsIgnoreCase("Stream")) {
119       jobConf.setJobName("Chukwa-HourlyArchiveBuilder-Stream");
120       int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
121       log.info("Reduce Count:" + reduceCount);
122       jobConf.setNumReduceTasks(reduceCount);
123 
124       jobConf.setPartitionerClass(ChukwaArchiveStreamNamePartitioner.class);
125       jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);
126     } else {
127       System.out.println("ERROR: Wrong Time partionning: " + args[0]
128           + " instead of [Stream/DataType/Hourly/Daily].");
129       return printUsage();
130     }
131 
132     jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
133     jobConf.setOutputValueClass(ChunkImpl.class);
134 
135     FileInputFormat.setInputPaths(jobConf, args[1]);
136     FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
137 
138     JobClient.runJob(jobConf);
139     return 0;
140   }
141 
142   public static void main(String[] args) throws Exception {
143     int res = ToolRunner.run(new ChukwaConfiguration(), new ChukwaArchiveBuilder(),
144         args);
145     return;
146   }
147 }