This project has retired. For details please refer to its
Attic page.
HourlyChukwaRecordRolling xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.List;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.util.HierarchyDataType;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.mapred.FileInputFormat;
38 import org.apache.hadoop.mapred.FileOutputFormat;
39 import org.apache.hadoop.mapred.JobClient;
40 import org.apache.hadoop.mapred.JobConf;
41 import org.apache.hadoop.mapred.JobPriority;
42 import org.apache.hadoop.mapred.SequenceFileInputFormat;
43 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
44 import org.apache.hadoop.mapred.lib.IdentityMapper;
45 import org.apache.hadoop.mapred.lib.IdentityReducer;
46 import org.apache.hadoop.util.Tool;
47 import org.apache.log4j.Logger;
48
49
50 public class HourlyChukwaRecordRolling extends Configured implements Tool {
51 static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
52
53 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
54 static ChukwaConfiguration conf = null;
55 static FileSystem fs = null;
56 static final String HadoopLogDir = "_logs";
57 static final String hadoopTempDir = "_temporary";
58
59 static boolean rollInSequence = true;
60 static boolean deleteRawdata = false;
61
62 public static void usage() {
63 System.err
64 .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
65 }
66
67 public static void buildHourlyFiles(String chukwaMainRepository,
68 String tempDir, String rollingFolder, int workingDay, int workingHour)
69 throws IOException {
70
71 Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
72 + workingHour);
73 FileStatus[] clustersFS = fs.listStatus(hourPath);
74 for (FileStatus clusterFs : clustersFS) {
75 String cluster = clusterFs.getPath().getName();
76
77 Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
78 + workingDay + "/" + workingHour + "/" + cluster);
79 FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
80
81 for (FileStatus dataSourceFS : dataSourcesFS) {
82
83 for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
84 dataSourceFS.getPath(), true)) {
85 String dataSource = HierarchyDataType.getDataType(
86 dataSourcePath.getPath(),
87 fs.getFileStatus(dataSourceClusterHourPaths).getPath());
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 public static void main(String[] args) throws Exception {
159
160 conf = new ChukwaConfiguration();
161 String fsName = conf.get("writer.hdfs.filesystem");
162 fs = FileSystem.get(new URI(fsName), conf);
163
164
165 String rollingFolder = "/chukwa/rolling/";
166 String chukwaMainRepository = "/chukwa/repos/";
167 String tempDir = "/chukwa/temp/hourlyRolling/";
168
169
170 if (args.length != 4) {
171 usage();
172 return;
173 }
174
175 if (!args[0].equalsIgnoreCase("rollInSequence")) {
176 usage();
177 return;
178 }
179
180 if (!args[2].equalsIgnoreCase("deleteRawdata")) {
181 usage();
182 return;
183 }
184
185 if (args[1].equalsIgnoreCase("true")) {
186 rollInSequence = true;
187 } else {
188 rollInSequence = false;
189 }
190
191 if (args[3].equalsIgnoreCase("true")) {
192 deleteRawdata = true;
193 } else {
194 deleteRawdata = false;
195 }
196
197 Calendar calendar = Calendar.getInstance();
198 int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
199 int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
200 log.info("CurrentDay: " + currentDay);
201 log.info("currentHour" + currentHour);
202
203 Path rootFolder = new Path(rollingFolder + "/hourly/");
204
205 FileStatus[] daysFS = fs.listStatus(rootFolder);
206 for (FileStatus dayFS : daysFS) {
207 try {
208 log.info("dayFs:" + dayFS.getPath().getName());
209 int workingDay = Integer.parseInt(dayFS.getPath().getName());
210
211 Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
212 FileStatus[] hoursFS = fs.listStatus(hourlySrc);
213 for (FileStatus hourFS : hoursFS) {
214 String workinhHourStr = hourFS.getPath().getName();
215 int workingHour = Integer.parseInt(workinhHourStr);
216 if ((workingDay < currentDay) ||
217 ((workingDay == currentDay) && (workingHour < currentHour))
218
219
220
221
222 ) {
223
224 try {
225 buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
226 workingDay, workingHour);
227 } catch(Throwable e) {
228 e.printStackTrace();
229 log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
230 }
231
232 }
233
234 }
235 }
236
237 catch (NumberFormatException e) {
238 log.warn("Exception in hourlyRolling:", e);
239 }
240
241 }
242 }
243
244 public int run(String[] args) throws Exception {
245 JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
246
247 conf.setJobName("HourlyChukwa-Rolling");
248 conf.setInputFormat(SequenceFileInputFormat.class);
249
250 conf.setMapperClass(IdentityMapper.class);
251 conf.setReducerClass(IdentityReducer.class);
252
253 conf.setOutputKeyClass(ChukwaRecordKey.class);
254 conf.setOutputValueClass(ChukwaRecord.class);
255 conf.setOutputFormat(SequenceFileOutputFormat.class);
256
257 log.info("HourlyChukwaRecordRolling input: " + args[0]);
258 log.info("HourlyChukwaRecordRolling output: " + args[1]);
259
260 FileInputFormat.setInputPaths(conf, args[0]);
261 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
262 conf.setJobPriority(JobPriority.LOW);
263 conf.setNumReduceTasks(1);
264
265 JobClient.runJob(conf);
266 return 0;
267 }
268
269 }