This project has retired. For details please refer to its
Attic page.
DailyChukwaRecordRolling xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.List;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.util.DaemonWatcher;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.chukwa.util.HierarchyDataType;
34 import org.apache.hadoop.conf.Configured;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.FileUtil;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.mapred.FileInputFormat;
40 import org.apache.hadoop.mapred.FileOutputFormat;
41 import org.apache.hadoop.mapred.JobClient;
42 import org.apache.hadoop.mapred.JobConf;
43 import org.apache.hadoop.mapred.JobPriority;
44 import org.apache.hadoop.mapred.SequenceFileInputFormat;
45 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
46 import org.apache.hadoop.mapred.lib.IdentityMapper;
47 import org.apache.hadoop.mapred.lib.IdentityReducer;
48 import org.apache.hadoop.util.Tool;
49 import org.apache.log4j.Logger;
50
51
52 public class DailyChukwaRecordRolling extends Configured implements Tool {
53 static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
54
55 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
56 static ChukwaConfiguration conf = null;
57 static FileSystem fs = null;
58 static final String HadoopLogDir = "_logs";
59 static final String hadoopTempDir = "_temporary";
60
61 static boolean rollInSequence = true;
62 static boolean deleteRawdata = false;
63
64 public static void usage() {
65 System.err
66 .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
67 System.exit(-1);
68 }
69
70 public static boolean hourlyRolling(String dailyStreamDirectory) {
71
72 Path pHour = null;
73 try {
74 log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
75
76 for (int i=0;i<24;i++) {
77 pHour = new Path(dailyStreamDirectory + "/" + i);
78 if (! fs.exists(pHour)) {
79 log.info("HourlyData is missing for:" + pHour);
80 continue;
81 } else {
82 FileStatus[] files = fs.listStatus(pHour);
83 boolean containsHourly = false;
84 for(FileStatus file: files) {
85 log.info("Debug checking" + file.getPath());
86 if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
87 containsHourly = true;
88 break;
89 }
90 }
91 if (containsHourly == false) {
92 log.info("HourlyDone is missing for : " + pHour);
93 return false;
94 }
95 }
96 }
97 return true;
98 }catch(Exception e) {
99 e.printStackTrace();
100 return false;
101 }
102 }
103 public static void buildDailyFiles(String chukwaMainRepository,
104 String tempDir, String rollingFolder, int workingDay) throws IOException {
105
106
107 boolean alldone = true;
108
109 Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
110 FileStatus[] clustersFS = fs.listStatus(dayPath);
111 for (FileStatus clusterFs : clustersFS) {
112 String cluster = clusterFs.getPath().getName();
113
114 Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
115 + workingDay + "/" + cluster);
116 FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
117 for (FileStatus dataSourceFS : dataSourcesFS) {
118
119 for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
120 dataSourceFS.getPath(), true)) {
121 String dataSource = HierarchyDataType.getDataType(
122 dataSourcePath.getPath(),
123 fs.getFileStatus(dataSourceClusterHourPaths).getPath());
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 public static void main(String[] args) throws Exception {
211
212 DaemonWatcher.createInstance("DailyChukwaRecordRolling");
213
214 conf = new ChukwaConfiguration();
215 String fsName = conf.get("writer.hdfs.filesystem");
216 fs = FileSystem.get(new URI(fsName), conf);
217
218
219 String rollingFolder = "/chukwa/rolling/";
220 String chukwaMainRepository = "/chukwa/repos/";
221 String tempDir = "/chukwa/temp/dailyRolling/";
222
223
224 if (args.length != 4) {
225 usage();
226 }
227
228 if (!args[0].equalsIgnoreCase("rollInSequence")) {
229 usage();
230 }
231
232 if (!args[2].equalsIgnoreCase("deleteRawdata")) {
233 usage();
234 }
235
236 if (args[1].equalsIgnoreCase("true")) {
237 rollInSequence = true;
238 } else {
239 rollInSequence = false;
240 }
241
242 if (args[3].equalsIgnoreCase("true")) {
243 deleteRawdata = true;
244 } else {
245 deleteRawdata = false;
246 }
247
248 log.info("rollInSequence: " + rollInSequence);
249 log.info("deleteRawdata: " + deleteRawdata);
250
251 Calendar calendar = Calendar.getInstance();
252 int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
253 int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
254 log.info("CurrentDay: " + currentDay);
255 log.info("currentHour" + currentHour);
256
257 Path rootFolder = new Path(rollingFolder + "/daily/");
258
259 FileStatus[] daysFS = fs.listStatus(rootFolder);
260 for (FileStatus dayFS : daysFS) {
261 try {
262 int workingDay = Integer.parseInt(dayFS.getPath().getName());
263 log.info("Daily working on :" + workingDay);
264 if (workingDay < currentDay) {
265
266 try {
267 buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
268 workingDay);
269 } catch(Throwable e) {
270 e.printStackTrace();
271 log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay ) ;
272 }
273
274 }
275 }
276
277 catch (NumberFormatException e) {
278 log.debug(ExceptionUtil.getStackTrace(e));
279 }
280
281 }
282 }
283
284 public int run(String[] args) throws Exception {
285 JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
286
287 conf.setJobName("DailyChukwa-Rolling");
288 conf.setInputFormat(SequenceFileInputFormat.class);
289
290 conf.setMapperClass(IdentityMapper.class);
291 conf.setReducerClass(IdentityReducer.class);
292
293 conf.setOutputKeyClass(ChukwaRecordKey.class);
294 conf.setOutputValueClass(ChukwaRecord.class);
295 conf.setOutputFormat(SequenceFileOutputFormat.class);
296
297 log.info("DailyChukwaRecordRolling input: " + args[0]);
298 log.info("DailyChukwaRecordRolling output: " + args[1]);
299
300 FileInputFormat.setInputPaths(conf, args[0]);
301 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
302 conf.setJobPriority(JobPriority.LOW);
303 conf.setNumReduceTasks(1);
304 JobClient.runJob(conf);
305 return 0;
306 }
307
308 }