This project has retired. For details please refer to its
Attic page.
DemuxManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29 import org.apache.hadoop.chukwa.util.NagiosHelper;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.fs.PathFilter;
35 import org.apache.hadoop.util.ToolRunner;
36 import org.apache.log4j.Logger;
37
38 public class DemuxManager implements CHUKWA_CONSTANT {
39 static Logger log = Logger.getLogger(DemuxManager.class);
40
41 int globalErrorcounter = 0;
42 Date firstErrorTime = null;
43
44 protected int ERROR_SLEEP_TIME = 60;
45 protected int NO_DATASINK_SLEEP_TIME = 20;
46
47 protected int DEFAULT_MAX_ERROR_COUNT = 6;
48 protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
49 protected int DEFAULT_REDUCER_COUNT = 8;
50
51 protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
52 protected int demuxReducerCount = 0;
53 protected ChukwaConfiguration conf = null;
54 protected FileSystem fs = null;
55 protected int reprocess = 0;
56 protected boolean sendAlert = true;
57
58 protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
59 protected volatile boolean isRunning = true;
60
61 final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
62 public boolean accept(Path file) {
63 return file.getName().endsWith(".done");
64 }
65 };
66
67
68 public static void main(String[] args) throws Exception {
69
70 DemuxManager manager = new DemuxManager();
71 manager.start();
72
73 }
74
75 public DemuxManager() throws Exception {
76 this.conf = new ChukwaConfiguration();
77 init();
78 }
79
80 public DemuxManager(ChukwaConfiguration conf) throws Exception {
81 this.conf = conf;
82 init();
83 }
84
85 protected void init() throws IOException, URISyntaxException {
86 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
87 fs = FileSystem.get(new URI(fsName), conf);
88 }
89
90 public void shutdown() {
91 this.isRunning = false;
92 }
93
94
95 public int getReprocess() {
96 return reprocess;
97 }
98
99
100
101
102
103 public void start() throws Exception {
104
105 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
106 if ( ! chukwaRootDir.endsWith("/") ) {
107 chukwaRootDir += "/";
108 }
109 log.info("chukwaRootDir:" + chukwaRootDir);
110
111 String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
112 String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
113 String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
114 String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
115
116 String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
117 if ( ! dataSinkDir.endsWith("/") ) {
118 dataSinkDir += "/";
119 }
120 log.info("dataSinkDir:" + dataSinkDir);
121
122 String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
123 if ( ! postProcessDir.endsWith("/") ) {
124 postProcessDir += "/";
125 }
126 log.info("postProcessDir:" + postProcessDir);
127
128 String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
129 if ( ! archiveRootDir.endsWith("/") ) {
130 archiveRootDir += "/";
131 }
132 log.info("archiveRootDir:" + archiveRootDir);
133
134 maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
135 DEFAULT_MAX_ERROR_COUNT);
136 demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
137 log.info("demuxReducerCount:" + demuxReducerCount);
138
139 String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
140 int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
141 String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
142
143 log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:"
144 + nagiosPort + ", reportingHost:" + reportingHost);
145
146
147 if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) {
148 sendAlert = false;
149 log.warn("Alerting is OFF");
150 }
151
152 boolean demuxReady = false;
153
154
155 while (isRunning) {
156 try {
157 demuxReady = false;
158
159 if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
160 log.warn("==================\nToo many errors (" + globalErrorcounter +
161 "), Bail out!\n==================");
162 break;
163 }
164
165
166 if (checkDemuxOutputDir(demuxOutputDir) == true) {
167
168 if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
169 log.warn("Cannot delete an existing demux output directory!");
170 throw new IOException("Cannot move demuxOutput to postProcess!");
171 }
172 continue;
173 } else if (checkDemuxInputDir(demuxInputDir) == true) {
174 reprocess++;
175
176
177 if (reprocess > 3) {
178 if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
179 log.warn("Cannot move dataSink files to DemuxErrorDir!");
180 throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
181 }
182 reprocess = 0;
183 continue;
184 }
185
186 log.error("Demux inputDir aready contains some dataSink files,"
187 + " going to reprocess, reprocessCount=" + reprocess);
188 demuxReady = true;
189 } else {
190 reprocess = 0;
191
192 if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
193 demuxReady = true;
194 } else {
195 demuxReady = false;
196 }
197 }
198
199
200 if (demuxReady == true) {
201 boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
202 postProcessDir, archiveRootDir);
203 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
204
205
206 if (demuxStatus) {
207 globalErrorcounter = 0;
208 firstErrorTime = null;
209 }
210 } else {
211 log.info("Demux not ready so going to sleep ...");
212 Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
213 }
214 }catch(Throwable e) {
215 globalErrorcounter ++;
216 if (firstErrorTime == null) firstErrorTime = new Date();
217
218 log.warn("Consecutive error number " + globalErrorcounter +
219 " encountered since " + firstErrorTime, e);
220 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
221 try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
222 catch (InterruptedException e1) {
223 init();
224 }
225 }
226 }
227
228
229
230
231
232
233
234
235
236
237
238 protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
239 String demuxInErrorDir,boolean demuxStatus,String demuxException) {
240
241 if (sendAlert == false) {
242 return;
243 }
244
245 boolean demuxInErrorStatus = true;
246 String demuxInErrorMsg = "";
247 try {
248 Path pDemuxInErrorDir = new Path(demuxInErrorDir);
249 if ( fs.exists(pDemuxInErrorDir)) {
250 FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
251 if (demuxInErrorDirs.length == 0) {
252 demuxInErrorStatus = false;
253 }
254 }
255 } catch (Throwable e) {
256 demuxInErrorMsg = e.getMessage();
257 log.warn(e);
258 }
259
260
261 if (demuxStatus == true) {
262 NagiosHelper.sendNsca("Demux OK",NagiosHelper.NAGIOS_OK);
263 } else {
264 NagiosHelper.sendNsca("Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
265 }
266
267
268 if (demuxInErrorStatus == false) {
269 NagiosHelper.sendNsca("DemuxInError OK",NagiosHelper.NAGIOS_OK);
270 } else {
271 NagiosHelper.sendNsca("DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
272 }
273
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290 protected boolean processData(String dataSinkDir, String demuxInputDir,
291 String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
292
293 boolean demuxStatus = false;
294
295 long startTime = System.currentTimeMillis();
296 demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
297 log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
298
299 if (demuxStatus == false) {
300 log.warn("Demux failed!");
301 } else {
302
303
304 if (checkDemuxOutputDir(demuxOutputDir)) {
305 if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
306 log.warn("Cannot move demuxOutput to postProcess! bail out!");
307 throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
308 }
309 } else {
310 log.warn("Demux processing OK but no output");
311 }
312
313
314 if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
315 log.warn("Cannot move datasinkFile to archive! bail out!");
316 throw new IOException("Cannot move datasinkFile to archive! bail out!");
317 }
318 }
319
320 return demuxStatus;
321 }
322
323
324
325
326
327
328
329
330 protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
331
332 Configuration tempConf = new Configuration(conf);
333 tempConf.reloadConfiguration();
334 demuxReducerCount = tempConf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
335 String[] demuxParams;
336 int i=0;
337 Demux.addParsers(tempConf);
338 demuxParams = new String[4];
339 demuxParams[i++] = "-r";
340 demuxParams[i++] = "" + demuxReducerCount;
341 demuxParams[i++] = demuxInputDir;
342 demuxParams[i++] = demuxOutputDir;
343 try {
344 return ( 0 == ToolRunner.run(tempConf,new Demux(), demuxParams) );
345 } catch (Throwable e) {
346 e.printStackTrace();
347 globalErrorcounter ++;
348 if (firstErrorTime == null) firstErrorTime = new Date();
349 log.error("Failed to run demux. Consecutive error number " +
350 globalErrorcounter + " encountered since " + firstErrorTime, e);
351 }
352 return false;
353 }
354
355
356
357
358
359
360
361
362
363
364 protected boolean moveDataSinkFilesToDemuxInputDirectory(
365 String dataSinkDir, String demuxInputDir) throws IOException {
366 Path pDataSinkDir = new Path(dataSinkDir);
367 Path pDemuxInputDir = new Path(demuxInputDir);
368 log.info("dataSinkDir: " + dataSinkDir);
369 log.info("demuxInputDir: " + demuxInputDir);
370
371
372 boolean containsFile = false;
373
374 FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
375 if (dataSinkFiles.length > 0) {
376 setup(pDemuxInputDir);
377 }
378
379 int maxFilesPerDemux = 0;
380 for (FileStatus fstatus : dataSinkFiles) {
381 boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
382 log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
383 maxFilesPerDemux ++;
384 containsFile = true;
385 if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
386 log.info("Max File per Demux reached:" + maxFilesPerDemux);
387 break;
388 }
389 }
390 return containsFile;
391 }
392
393
394
395
396
397
398
399
400
401
402
403 protected boolean moveDataSinkFilesToDemuxErrorDirectory(
404 String dataSinkDir, String demuxErrorDir) throws IOException {
405 demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
406 return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
407 }
408
409
410
411
412
413
414
415
416 protected boolean moveDataSinkFilesToArchiveDirectory(
417 String demuxInputDir, String archiveDirectory) throws IOException {
418 archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
419 return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
420 }
421
422
423
424
425
426
427
428
429 protected boolean moveDemuxOutputDirToPostProcessDirectory(
430 String demuxOutputDir, String postProcessDirectory) throws IOException {
431 return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
432 }
433
434
435
436
437
438
439
440
441 protected boolean checkDemuxInputDir(String demuxInputDir)
442 throws IOException {
443 return dirExists(demuxInputDir);
444 }
445
446
447
448
449
450
451
452 protected boolean checkDemuxOutputDir(String demuxOutputDir)
453 throws IOException {
454 return dirExists(demuxOutputDir);
455 }
456
457
458
459
460
461
462
463
464 protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
465 {
466 return fs.delete(new Path(demuxOutputDir), true);
467 }
468
469
470
471
472
473
474 protected void setup(Path directory) throws IOException {
475 if ( ! fs.exists(directory)) {
476 fs.mkdirs(directory);
477 }
478 }
479
480
481
482
483
484 protected boolean dirExists(String directory) throws IOException {
485 Path pDirectory = new Path(directory);
486 return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
487 }
488
489
490
491
492
493
494
495
496 protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
497 if (!destDir.endsWith("/")) {
498 destDir +="/";
499 }
500 Path pSrcDir = new Path(srcDir);
501 Path pDestDir = new Path(destDir );
502 setup(pDestDir);
503 destDir += prefix +"_" +System.currentTimeMillis();
504 Path pFinalDestDir = new Path(destDir );
505
506 return fs.rename(pSrcDir, pFinalDestDir);
507 }
508 }