This project has retired. For details please refer to its
Attic page.
ChukwaArchiveBuilder xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.archive;
20
21
22 import java.io.IOException;
23 import java.util.Iterator;
24 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
25 import org.apache.hadoop.chukwa.ChunkImpl;
26 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27 import org.apache.hadoop.conf.Configured;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.mapred.FileInputFormat;
30 import org.apache.hadoop.mapred.FileOutputFormat;
31 import org.apache.hadoop.mapred.JobClient;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.MapReduceBase;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reducer;
36 import org.apache.hadoop.mapred.Reporter;
37 import org.apache.hadoop.mapred.SequenceFileInputFormat;
38 import org.apache.hadoop.mapred.lib.IdentityMapper;
39 import org.apache.hadoop.util.Tool;
40 import org.apache.hadoop.util.ToolRunner;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48
49
50
51
52 public class ChukwaArchiveBuilder extends Configured implements Tool {
53
54
55 static class UniqueKeyReduce extends MapReduceBase implements
56 Reducer<ChukwaArchiveKey, ChunkImpl, ChukwaArchiveKey, ChunkImpl> {
57
58
59
60
61 @Override
62 public void reduce(ChukwaArchiveKey key, Iterator<ChunkImpl> vals,
63 OutputCollector<ChukwaArchiveKey, ChunkImpl> out, Reporter r)
64 throws IOException {
65 ChunkImpl i = vals.next();
66 out.collect(key, i);
67 int dups = 0;
68 while(vals.hasNext()) {
69 vals.next();
70 dups ++;
71 }
72 r.incrCounter("app", "duplicate chunks", dups);
73 }
74
75 }
76 static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
77
78 static int printUsage() {
79 System.out
80 .println("ChukwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
81 ToolRunner.printGenericCommandUsage(System.out);
82 return -1;
83 }
84
85 public int run(String[] args) throws Exception {
86
87
88 if (args.length != 3) {
89 System.out.println("ERROR: Wrong number of parameters: " + args.length
90 + " instead of 3.");
91 return printUsage();
92 }
93 JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
94
95 jobConf.setInputFormat(SequenceFileInputFormat.class);
96
97 jobConf.setMapperClass(IdentityMapper.class);
98
99 jobConf.setReducerClass(UniqueKeyReduce.class);
100
101
102 if (args[0].equalsIgnoreCase("Daily")) {
103 jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
104 jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
105 jobConf.setJobName("Chukwa-DailyArchiveBuilder");
106 } else if (args[0].equalsIgnoreCase("Hourly")) {
107 jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
108 jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
109 jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
110 } else if (args[0].equalsIgnoreCase("DataType")) {
111 jobConf.setJobName("Chukwa-ArchiveBuilder-DataType");
112 int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
113 log.info("Reduce Count:" + reduceCount);
114 jobConf.setNumReduceTasks(reduceCount);
115
116 jobConf.setPartitionerClass(ChukwaArchiveDataTypePartitioner.class);
117 jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);
118 } else if (args[0].equalsIgnoreCase("Stream")) {
119 jobConf.setJobName("Chukwa-HourlyArchiveBuilder-Stream");
120 int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
121 log.info("Reduce Count:" + reduceCount);
122 jobConf.setNumReduceTasks(reduceCount);
123
124 jobConf.setPartitionerClass(ChukwaArchiveStreamNamePartitioner.class);
125 jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);
126 } else {
127 System.out.println("ERROR: Wrong Time partionning: " + args[0]
128 + " instead of [Stream/DataType/Hourly/Daily].");
129 return printUsage();
130 }
131
132 jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
133 jobConf.setOutputValueClass(ChunkImpl.class);
134
135 FileInputFormat.setInputPaths(jobConf, args[1]);
136 FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
137
138 JobClient.runJob(jobConf);
139 return 0;
140 }
141
142 public static void main(String[] args) throws Exception {
143 int res = ToolRunner.run(new ChukwaConfiguration(), new ChukwaArchiveBuilder(),
144 args);
145 return;
146 }
147 }