This project has retired. For details please refer to its
Attic page.
DailyChukwaRecordRolling xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.List;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.util.ExceptionUtil;
32 import org.apache.hadoop.chukwa.util.HierarchyDataType;
33 import org.apache.hadoop.conf.Configured;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.FileUtil;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.mapred.FileInputFormat;
39 import org.apache.hadoop.mapred.FileOutputFormat;
40 import org.apache.hadoop.mapred.JobClient;
41 import org.apache.hadoop.mapred.JobConf;
42 import org.apache.hadoop.mapred.JobPriority;
43 import org.apache.hadoop.mapred.SequenceFileInputFormat;
44 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45 import org.apache.hadoop.mapred.lib.IdentityMapper;
46 import org.apache.hadoop.mapred.lib.IdentityReducer;
47 import org.apache.hadoop.util.Tool;
48 import org.apache.log4j.Logger;
49
50
51 public class DailyChukwaRecordRolling extends Configured implements Tool {
52 static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
53
54 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
55 static ChukwaConfiguration conf = null;
56 static FileSystem fs = null;
57 static final String HadoopLogDir = "_logs";
58 static final String hadoopTempDir = "_temporary";
59
60 static boolean rollInSequence = true;
61 static boolean deleteRawdata = false;
62
63 public static void usage() {
64 System.err
65 .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
66 }
67
68 public static boolean hourlyRolling(String dailyStreamDirectory) {
69
70 Path pHour = null;
71 try {
72 log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
73
74 for (int i=0;i<24;i++) {
75 pHour = new Path(dailyStreamDirectory + "/" + i);
76 if (! fs.exists(pHour)) {
77 log.info("HourlyData is missing for:" + pHour);
78 continue;
79 } else {
80 FileStatus[] files = fs.listStatus(pHour);
81 boolean containsHourly = false;
82 for(FileStatus file: files) {
83 log.info("Debug checking" + file.getPath());
84 if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
85 containsHourly = true;
86 break;
87 }
88 }
89 if (containsHourly == false) {
90 log.info("HourlyDone is missing for : " + pHour);
91 return false;
92 }
93 }
94 }
95 return true;
96 }catch(Exception e) {
97 e.printStackTrace();
98 return false;
99 }
100 }
101 public static void buildDailyFiles(String chukwaMainRepository,
102 String tempDir, String rollingFolder, int workingDay) throws IOException {
103
104
105 boolean alldone = true;
106
107 Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
108 FileStatus[] clustersFS = fs.listStatus(dayPath);
109 for (FileStatus clusterFs : clustersFS) {
110 String cluster = clusterFs.getPath().getName();
111
112 Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
113 + workingDay + "/" + cluster);
114 FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
115 for (FileStatus dataSourceFS : dataSourcesFS) {
116
117 for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
118 dataSourceFS.getPath(), true)) {
119 String dataSource = HierarchyDataType.getDataType(
120 dataSourcePath.getPath(),
121 fs.getFileStatus(dataSourceClusterHourPaths).getPath());
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 public static void main(String[] args) throws Exception {
209
210
211 conf = new ChukwaConfiguration();
212 String fsName = conf.get("writer.hdfs.filesystem");
213 fs = FileSystem.get(new URI(fsName), conf);
214
215
216 String rollingFolder = "/chukwa/rolling/";
217 String chukwaMainRepository = "/chukwa/repos/";
218 String tempDir = "/chukwa/temp/dailyRolling/";
219
220
221 if (args.length != 4) {
222 usage();
223 return;
224 }
225
226 if (!args[0].equalsIgnoreCase("rollInSequence")) {
227 usage();
228 return;
229 }
230
231 if (!args[2].equalsIgnoreCase("deleteRawdata")) {
232 usage();
233 return;
234 }
235
236 if (args[1].equalsIgnoreCase("true")) {
237 rollInSequence = true;
238 } else {
239 rollInSequence = false;
240 }
241
242 if (args[3].equalsIgnoreCase("true")) {
243 deleteRawdata = true;
244 } else {
245 deleteRawdata = false;
246 }
247
248 log.info("rollInSequence: " + rollInSequence);
249 log.info("deleteRawdata: " + deleteRawdata);
250
251 Calendar calendar = Calendar.getInstance();
252 int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
253 int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
254 log.info("CurrentDay: " + currentDay);
255 log.info("currentHour" + currentHour);
256
257 Path rootFolder = new Path(rollingFolder + "/daily/");
258
259 FileStatus[] daysFS = fs.listStatus(rootFolder);
260 for (FileStatus dayFS : daysFS) {
261 try {
262 int workingDay = Integer.parseInt(dayFS.getPath().getName());
263 log.info("Daily working on :" + workingDay);
264 if (workingDay < currentDay) {
265
266 try {
267 buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
268 workingDay);
269 } catch(Throwable e) {
270 e.printStackTrace();
271 log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay ) ;
272 }
273
274 }
275 }
276
277 catch (NumberFormatException e) {
278 log.debug(ExceptionUtil.getStackTrace(e));
279 }
280
281 }
282 }
283
284 public int run(String[] args) throws Exception {
285 JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
286
287 conf.setJobName("DailyChukwa-Rolling");
288 conf.setInputFormat(SequenceFileInputFormat.class);
289
290 conf.setMapperClass(IdentityMapper.class);
291 conf.setReducerClass(IdentityReducer.class);
292
293 conf.setOutputKeyClass(ChukwaRecordKey.class);
294 conf.setOutputValueClass(ChukwaRecord.class);
295 conf.setOutputFormat(SequenceFileOutputFormat.class);
296
297 log.info("DailyChukwaRecordRolling input: " + args[0]);
298 log.info("DailyChukwaRecordRolling output: " + args[1]);
299
300 FileInputFormat.setInputPaths(conf, args[0]);
301 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
302 conf.setJobPriority(JobPriority.LOW);
303 conf.setNumReduceTasks(1);
304 JobClient.runJob(conf);
305 return 0;
306 }
307
308 }