This project has retired. For details please refer to its
Attic page.
DemuxManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.text.SimpleDateFormat;
25 import java.util.Date;
26
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29 import org.apache.hadoop.chukwa.util.NagiosHelper;
30 import org.apache.hadoop.chukwa.util.DaemonWatcher;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.fs.PathFilter;
36 import org.apache.hadoop.util.ToolRunner;
37 import org.apache.log4j.Logger;
38
39 public class DemuxManager implements CHUKWA_CONSTANT {
40 static Logger log = Logger.getLogger(DemuxManager.class);
41
42 static int globalErrorcounter = 0;
43 static Date firstErrorTime = null;
44
45 protected int ERROR_SLEEP_TIME = 60;
46 protected int NO_DATASINK_SLEEP_TIME = 20;
47
48 protected int DEFAULT_MAX_ERROR_COUNT = 6;
49 protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
50 protected int DEFAULT_REDUCER_COUNT = 8;
51
52 protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
53 protected int demuxReducerCount = 0;
54 protected ChukwaConfiguration conf = null;
55 protected FileSystem fs = null;
56 protected int reprocess = 0;
57 protected boolean sendAlert = true;
58
59 protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
60 protected volatile boolean isRunning = true;
61
62 final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
63 public boolean accept(Path file) {
64 return file.getName().endsWith(".done");
65 }
66 };
67
68
69 public static void main(String[] args) throws Exception {
70 DaemonWatcher.createInstance("DemuxManager");
71
72 DemuxManager manager = new DemuxManager();
73 manager.start();
74
75 }
76
77 public DemuxManager() throws Exception {
78 this.conf = new ChukwaConfiguration();
79 init();
80 }
81
82 public DemuxManager(ChukwaConfiguration conf) throws Exception {
83 this.conf = conf;
84 init();
85 }
86
87 protected void init() throws IOException, URISyntaxException {
88 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
89 fs = FileSystem.get(new URI(fsName), conf);
90 }
91
92 public void shutdown() {
93 this.isRunning = false;
94 }
95
96
97 public int getReprocess() {
98 return reprocess;
99 }
100
101
102
103
104
105 public void start() throws Exception {
106
107 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
108 if ( ! chukwaRootDir.endsWith("/") ) {
109 chukwaRootDir += "/";
110 }
111 log.info("chukwaRootDir:" + chukwaRootDir);
112
113 String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
114 String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
115 String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
116 String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
117
118 String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
119 if ( ! dataSinkDir.endsWith("/") ) {
120 dataSinkDir += "/";
121 }
122 log.info("dataSinkDir:" + dataSinkDir);
123
124 String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
125 if ( ! postProcessDir.endsWith("/") ) {
126 postProcessDir += "/";
127 }
128 log.info("postProcessDir:" + postProcessDir);
129
130 String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
131 if ( ! archiveRootDir.endsWith("/") ) {
132 archiveRootDir += "/";
133 }
134 log.info("archiveRootDir:" + archiveRootDir);
135
136 maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
137 DEFAULT_MAX_ERROR_COUNT);
138 demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
139 log.info("demuxReducerCount:" + demuxReducerCount);
140
141 String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
142 int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
143 String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
144
145 log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:"
146 + nagiosPort + ", reportingHost:" + reportingHost);
147
148
149 if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
150 sendAlert = false;
151 log.warn("Alerting is OFF");
152 }
153
154 boolean demuxReady = false;
155
156
157 while (isRunning) {
158 try {
159 demuxReady = false;
160
161 if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
162 log.warn("==================\nToo many errors (" + globalErrorcounter +
163 "), Bail out!\n==================");
164 DaemonWatcher.bailout(-1);
165 }
166
167
168 if (checkDemuxOutputDir(demuxOutputDir) == true) {
169
170 if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
171 log.warn("Cannot delete an existing demux output directory!");
172 throw new IOException("Cannot move demuxOutput to postProcess!");
173 }
174 continue;
175 } else if (checkDemuxInputDir(demuxInputDir) == true) {
176 reprocess++;
177
178
179 if (reprocess > 3) {
180 if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
181 log.warn("Cannot move dataSink files to DemuxErrorDir!");
182 throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
183 }
184 reprocess = 0;
185 continue;
186 }
187
188 log.error("Demux inputDir aready contains some dataSink files,"
189 + " going to reprocess, reprocessCount=" + reprocess);
190 demuxReady = true;
191 } else {
192 reprocess = 0;
193
194 if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
195 demuxReady = true;
196 } else {
197 demuxReady = false;
198 }
199 }
200
201
202 if (demuxReady == true) {
203 boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
204 postProcessDir, archiveRootDir);
205 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
206
207
208 if (demuxStatus) {
209 globalErrorcounter = 0;
210 firstErrorTime = null;
211 }
212 } else {
213 log.info("Demux not ready so going to sleep ...");
214 Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
215 }
216 }catch(Throwable e) {
217 globalErrorcounter ++;
218 if (firstErrorTime == null) firstErrorTime = new Date();
219
220 log.warn("Consecutive error number " + globalErrorcounter +
221 " encountered since " + firstErrorTime, e);
222 sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
223 try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
224 catch (InterruptedException e1) {
225 init();
226 }
227 }
228 }
229
230
231
232
233
234
235
236
237
238
239
240 protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
241 String demuxInErrorDir,boolean demuxStatus,String demuxException) {
242
243 if (sendAlert == false) {
244 return;
245 }
246
247 boolean demuxInErrorStatus = true;
248 String demuxInErrorMsg = "";
249 try {
250 Path pDemuxInErrorDir = new Path(demuxInErrorDir);
251 if ( fs.exists(pDemuxInErrorDir)) {
252 FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
253 if (demuxInErrorDirs.length == 0) {
254 demuxInErrorStatus = false;
255 }
256 }
257 } catch (Throwable e) {
258 demuxInErrorMsg = e.getMessage();
259 log.warn(e);
260 }
261
262
263 if (demuxStatus == true) {
264 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux OK",NagiosHelper.NAGIOS_OK);
265 } else {
266 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
267 }
268
269
270 if (demuxInErrorStatus == false) {
271 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError OK",NagiosHelper.NAGIOS_OK);
272 } else {
273 NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
274 }
275
276 }
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292 protected boolean processData(String dataSinkDir, String demuxInputDir,
293 String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
294
295 boolean demuxStatus = false;
296
297 long startTime = System.currentTimeMillis();
298 demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
299 log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
300
301 if (demuxStatus == false) {
302 log.warn("Demux failed!");
303 } else {
304
305
306 if (checkDemuxOutputDir(demuxOutputDir)) {
307 if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
308 log.warn("Cannot move demuxOutput to postProcess! bail out!");
309 throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
310 }
311 } else {
312 log.warn("Demux processing OK but no output");
313 }
314
315
316 if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
317 log.warn("Cannot move datasinkFile to archive! bail out!");
318 throw new IOException("Cannot move datasinkFile to archive! bail out!");
319 }
320 }
321
322 return demuxStatus;
323 }
324
325
326
327
328
329
330
331
332 protected boolean runDemux(String demuxInputDir, String demuxOutputDir) {
333
334 Configuration tempConf = new Configuration(conf);
335 tempConf.reloadConfiguration();
336 demuxReducerCount = tempConf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
337 String[] demuxParams;
338 int i=0;
339 Demux.addParsers(tempConf);
340 demuxParams = new String[4];
341 demuxParams[i++] = "-r";
342 demuxParams[i++] = "" + demuxReducerCount;
343 demuxParams[i++] = demuxInputDir;
344 demuxParams[i++] = demuxOutputDir;
345 try {
346 return ( 0 == ToolRunner.run(tempConf,new Demux(), demuxParams) );
347 } catch (Throwable e) {
348 e.printStackTrace();
349 globalErrorcounter ++;
350 if (firstErrorTime == null) firstErrorTime = new Date();
351 log.error("Failed to run demux. Consecutive error number " +
352 globalErrorcounter + " encountered since " + firstErrorTime, e);
353 }
354 return false;
355 }
356
357
358
359
360
361
362
363
364
365
366 protected boolean moveDataSinkFilesToDemuxInputDirectory(
367 String dataSinkDir, String demuxInputDir) throws IOException {
368 Path pDataSinkDir = new Path(dataSinkDir);
369 Path pDemuxInputDir = new Path(demuxInputDir);
370 log.info("dataSinkDir: " + dataSinkDir);
371 log.info("demuxInputDir: " + demuxInputDir);
372
373
374 boolean containsFile = false;
375
376 FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
377 if (dataSinkFiles.length > 0) {
378 setup(pDemuxInputDir);
379 }
380
381 int maxFilesPerDemux = 0;
382 for (FileStatus fstatus : dataSinkFiles) {
383 boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
384 log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
385 maxFilesPerDemux ++;
386 containsFile = true;
387 if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
388 log.info("Max File per Demux reached:" + maxFilesPerDemux);
389 break;
390 }
391 }
392 return containsFile;
393 }
394
395
396
397
398
399
400
401
402
403
404
405 protected boolean moveDataSinkFilesToDemuxErrorDirectory(
406 String dataSinkDir, String demuxErrorDir) throws IOException {
407 demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
408 return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
409 }
410
411
412
413
414
415
416
417
418 protected boolean moveDataSinkFilesToArchiveDirectory(
419 String demuxInputDir, String archiveDirectory) throws IOException {
420 archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
421 return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
422 }
423
424
425
426
427
428
429
430
431 protected boolean moveDemuxOutputDirToPostProcessDirectory(
432 String demuxOutputDir, String postProcessDirectory) throws IOException {
433 return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
434 }
435
436
437
438
439
440
441
442
443 protected boolean checkDemuxInputDir(String demuxInputDir)
444 throws IOException {
445 return dirExists(demuxInputDir);
446 }
447
448
449
450
451
452
453
454 protected boolean checkDemuxOutputDir(String demuxOutputDir)
455 throws IOException {
456 return dirExists(demuxOutputDir);
457 }
458
459
460
461
462
463
464
465
466 protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
467 {
468 return fs.delete(new Path(demuxOutputDir), true);
469 }
470
471
472
473
474
475
476 protected void setup(Path directory) throws IOException {
477 if ( ! fs.exists(directory)) {
478 fs.mkdirs(directory);
479 }
480 }
481
482
483
484
485
486 protected boolean dirExists(String directory) throws IOException {
487 Path pDirectory = new Path(directory);
488 return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
489 }
490
491
492
493
494
495
496
497
498 protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
499 if (!destDir.endsWith("/")) {
500 destDir +="/";
501 }
502 Path pSrcDir = new Path(srcDir);
503 Path pDestDir = new Path(destDir );
504 setup(pDestDir);
505 destDir += prefix +"_" +System.currentTimeMillis();
506 Path pFinalDestDir = new Path(destDir );
507
508 return fs.rename(pSrcDir, pFinalDestDir);
509 }
510 }