This project has retired. For details please refer to its Attic page.
DemuxManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29  import org.apache.hadoop.chukwa.util.NagiosHelper;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.fs.PathFilter;
35  import org.apache.hadoop.util.ToolRunner;
36  import org.apache.log4j.Logger;
37  
38  public class DemuxManager implements CHUKWA_CONSTANT {  
39    static Logger log = Logger.getLogger(DemuxManager.class);
40  
41    int globalErrorcounter = 0;
42    Date firstErrorTime = null;
43  
44    protected int ERROR_SLEEP_TIME = 60;
45    protected int NO_DATASINK_SLEEP_TIME = 20;
46  
47    protected int DEFAULT_MAX_ERROR_COUNT = 6;
48    protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
49    protected int DEFAULT_REDUCER_COUNT = 8;
50    
51    protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
52    protected int demuxReducerCount = 0;
53    protected ChukwaConfiguration conf = null;
54    protected FileSystem fs = null;
55    protected int reprocess = 0;
56    protected boolean sendAlert = true;
57    
58    protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
59    protected volatile boolean isRunning = true;
60  
61    final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
62      public boolean accept(Path file) {
63        return file.getName().endsWith(".done");
64      }     
65    };
66  
67  
68    public static void main(String[] args) throws Exception {
69      
70      DemuxManager manager = new DemuxManager();
71      manager.start();
72  
73    }
74  
75    public DemuxManager() throws Exception {
76      this.conf = new ChukwaConfiguration();
77      init();
78    }
79  
80    public DemuxManager(ChukwaConfiguration conf) throws Exception {
81      this.conf = conf;
82      init();
83    }
84  
85    protected void init() throws IOException, URISyntaxException {
86      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
87      fs = FileSystem.get(new URI(fsName), conf);
88    }
89  
90    public void shutdown() {
91      this.isRunning = false;
92    }
93  
94  
95    public int getReprocess() {
96      return reprocess;
97    }
98  
99    /**
100    * Start the Demux Manager daemon
101    * @throws Exception if error in processing data
102    */
103   public void start() throws Exception {
104 
105      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
106      if ( ! chukwaRootDir.endsWith("/") ) {
107        chukwaRootDir += "/";
108      }
109      log.info("chukwaRootDir:" + chukwaRootDir);
110 
111      String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
112      String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
113      String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
114      String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
115 
116      String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
117      if ( ! dataSinkDir.endsWith("/") ) {
118        dataSinkDir += "/";
119      }
120      log.info("dataSinkDir:" + dataSinkDir);
121      
122      String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
123      if ( ! postProcessDir.endsWith("/") ) {
124        postProcessDir += "/";
125      }
126      log.info("postProcessDir:" + postProcessDir);
127      
128      String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
129      if ( ! archiveRootDir.endsWith("/") ) {
130        archiveRootDir += "/";
131      }
132      log.info("archiveRootDir:" + archiveRootDir);
133      
134      maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
135                                           DEFAULT_MAX_ERROR_COUNT);
136      demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
137      log.info("demuxReducerCount:" + demuxReducerCount);
138      
139      String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
140      int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
141      String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
142      
143      log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:" 
144          + nagiosPort + ", reportingHost:" + reportingHost);
145      
146      
147      if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) {
148        sendAlert = false;
149        log.warn("Alerting is OFF");
150      }
151      
152      boolean demuxReady = false;
153 
154      
155      while (isRunning) {
156        try {
157          demuxReady = false;
158 
159          if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
160            log.warn("==================\nToo many errors (" + globalErrorcounter +
161                     "), Bail out!\n==================");
162            break;
163          }
164          
165          // Check for anomalies
166          if (checkDemuxOutputDir(demuxOutputDir) == true) {
167            // delete current demux output dir
168            if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
169              log.warn("Cannot delete an existing demux output directory!");
170              throw new IOException("Cannot move demuxOutput to postProcess!");
171            }
172            continue;
173          } else if (checkDemuxInputDir(demuxInputDir) == true) { // dataSink already there
174            reprocess++;
175 
176            // Data has been processed more than 3 times ... move to InError directory
177            if (reprocess > 3) {
178              if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
179                log.warn("Cannot move dataSink files to DemuxErrorDir!");
180                throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
181              }
182              reprocess = 0;
183              continue;
184            }
185 
186            log.error("Demux inputDir aready contains some dataSink files,"
187                + " going to reprocess, reprocessCount=" + reprocess);
188            demuxReady = true;
189          } else { // standard code path
190            reprocess = 0;
191            // Move new dataSink Files
192            if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
193              demuxReady = true; // if any are available
194            } else {
195              demuxReady = false; // if none
196            }
197          }
198 
199          // start a new demux ?
200          if (demuxReady == true) {
201           boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
202                postProcessDir, archiveRootDir);
203           sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
204 
205           // if demux suceeds, then we reset these.
206           if (demuxStatus) {
207            globalErrorcounter = 0;
208            firstErrorTime = null;
209           }
210          } else {
211            log.info("Demux not ready so going to sleep ...");
212            Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
213          }
214        }catch(Throwable e) {
215          globalErrorcounter ++;
216          if (firstErrorTime == null) firstErrorTime = new Date();
217 
218          log.warn("Consecutive error number " + globalErrorcounter +
219                   " encountered since " + firstErrorTime, e);
220          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
221          try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
222          catch (InterruptedException e1) {/*do nothing*/ }
223          init();
224        }
225      }
226    }
227 
228 
229    /**
230     * Send NSCA status to Nagios
231     * @param nagiosHost
232     * @param nagiosPort
233     * @param reportingHost
234     * @param demuxInErrorDir
235     * @param demuxStatus
236     * @param exception
237     */
238   protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
239         String demuxInErrorDir,boolean demuxStatus,String demuxException) {
240       
241      if (sendAlert == false) {
242        return;
243      }
244       
245      boolean demuxInErrorStatus = true;
246      String demuxInErrorMsg = "";
247      try {
248        Path pDemuxInErrorDir = new Path(demuxInErrorDir);
249        if ( fs.exists(pDemuxInErrorDir)) {
250          FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
251          if (demuxInErrorDirs.length == 0) {
252            demuxInErrorStatus = false;
253          }          
254        }
255      } catch (Throwable e) {
256        demuxInErrorMsg = e.getMessage();
257        log.warn(e);
258      }
259      
260      // send Demux status
261      if (demuxStatus == true) {
262        NagiosHelper.sendNsca("Demux OK",NagiosHelper.NAGIOS_OK);
263      } else {
264        NagiosHelper.sendNsca("Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
265      }
266      
267      // send DemuxInErrorStatus
268      if (demuxInErrorStatus == false) {
269        NagiosHelper.sendNsca("DemuxInError OK",NagiosHelper.NAGIOS_OK);
270      } else {
271        NagiosHelper.sendNsca("DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
272      }
273      
274    }
275    
276    /**
277     * Process Data, i.e. 
278     * - run demux
279     * - move demux output to postProcessDir
280     * - move dataSink file to archiveDir
281     * 
282     * @param dataSinkDir
283     * @param demuxInputDir
284     * @param demuxOutputDir
285     * @param postProcessDir
286     * @param archiveDir
287     * @return True iff succeed
288     * @throws IOException
289     */
290     protected boolean processData(String dataSinkDir, String demuxInputDir,
291        String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
292 
293      boolean demuxStatus = false;
294 
295      long startTime = System.currentTimeMillis();
296      demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
297      log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
298 
299      if (demuxStatus == false) {
300        log.warn("Demux failed!");
301      } else {
302 
303        // Move demux output to postProcessDir 
304        if (checkDemuxOutputDir(demuxOutputDir)) {
305          if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
306            log.warn("Cannot move demuxOutput to postProcess! bail out!");
307            throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
308          } 
309        } else {
310          log.warn("Demux processing OK but no output");
311        }
312 
313        // Move DataSink Files to archiveDir
314        if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
315          log.warn("Cannot move datasinkFile to archive! bail out!");
316          throw new IOException("Cannot move datasinkFile to archive! bail out!");
317        }
318      }
319      
320      return demuxStatus;
321    }
322 
323 
324    /**
325     * Submit and Run demux Job 
326     * @param demuxInputDir
327     * @param demuxOutputDir
328     * @return true id Demux succeed
329     */
330    protected boolean runDemux(String demuxInputDir, String demuxOutputDir) { 
331      // to reload the configuration, and demux's reduce number
332      Configuration tempConf = new Configuration(conf);
333      tempConf.reloadConfiguration();
334      demuxReducerCount = tempConf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
335      String[] demuxParams;
336      int i=0;
337      Demux.addParsers(tempConf);
338      demuxParams = new String[4];
339      demuxParams[i++] = "-r";
340      demuxParams[i++] = "" + demuxReducerCount;
341      demuxParams[i++] = demuxInputDir;
342      demuxParams[i++] = demuxOutputDir;
343      try {
344        return ( 0 == ToolRunner.run(tempConf,new Demux(), demuxParams) );
345      } catch (Throwable e) {
346        e.printStackTrace();
347        globalErrorcounter ++;
348        if (firstErrorTime == null) firstErrorTime = new Date();
349        log.error("Failed to run demux. Consecutive error number " +
350                globalErrorcounter + " encountered since " + firstErrorTime, e);
351      }
352      return false;
353    }
354 
355 
356 
357    /**
358     * Move dataSink files to Demux input directory
359     * @param dataSinkDir
360     * @param demuxInputDir
361     * @return true if there's any dataSink files ready to be processed
362     * @throws IOException
363     */
364    protected boolean moveDataSinkFilesToDemuxInputDirectory(
365        String dataSinkDir, String demuxInputDir) throws IOException {
366      Path pDataSinkDir = new Path(dataSinkDir);
367      Path pDemuxInputDir = new Path(demuxInputDir);
368      log.info("dataSinkDir: " + dataSinkDir);
369      log.info("demuxInputDir: " + demuxInputDir);
370 
371 
372      boolean containsFile = false;
373 
374      FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
375      if (dataSinkFiles.length > 0) {
376        setup(pDemuxInputDir);
377      }
378 
379      int maxFilesPerDemux = 0;
380      for (FileStatus fstatus : dataSinkFiles) {
381        boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
382        log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
383        maxFilesPerDemux ++;
384        containsFile = true;
385        if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
386          log.info("Max File per Demux reached:" + maxFilesPerDemux);
387          break;
388        }
389      }
390      return containsFile;
391    }
392 
393 
394 
395 
396    /**
397     * Move sourceFolder inside destFolder
398     * @param dataSinkDir : ex chukwa/demux/inputDir
399     * @param demuxErrorDir : ex /chukwa/demux/inError
400     * @return true if able to move chukwa/demux/inputDir to /chukwa/demux/inError/<YYYYMMDD>/demuxInputDirXXX
401     * @throws IOException
402     */
403    protected boolean moveDataSinkFilesToDemuxErrorDirectory(
404        String dataSinkDir, String demuxErrorDir) throws IOException {
405      demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
406      return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
407    }
408 
409    /**
410     * Move sourceFolder inside destFolder
411     * @param demuxInputDir: ex chukwa/demux/inputDir
412     * @param archiveDirectory: ex /chukwa/archives
413     * @return true if able to move chukwa/demux/inputDir to /chukwa/archives/raw/<YYYYMMDD>/dataSinkDirXXX
414     * @throws IOException
415     */
416    protected boolean moveDataSinkFilesToArchiveDirectory(
417        String demuxInputDir, String archiveDirectory) throws IOException {
418      archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
419      return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
420    }
421 
422    /**
423     * Move sourceFolder inside destFolder
424     * @param demuxOutputDir: ex chukwa/demux/outputDir 
425     * @param postProcessDirectory: ex /chukwa/postProcess
426     * @return true if able to move chukwa/demux/outputDir to /chukwa/postProcess/demuxOutputDirXXX
427     * @throws IOException 
428     */
429    protected  boolean moveDemuxOutputDirToPostProcessDirectory(
430        String demuxOutputDir, String postProcessDirectory) throws IOException {
431      return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
432    }
433 
434 
435    /**
436     * Test if demuxInputDir exists
437     * @param demuxInputDir
438     * @return true if demuxInputDir exists
439     * @throws IOException
440     */
441    protected boolean checkDemuxInputDir(String demuxInputDir)
442    throws IOException {
443      return dirExists(demuxInputDir);
444    }
445 
446    /**
447     * Test if demuxOutputDir exists
448     * @param demuxOutputDir
449     * @return true if demuxOutputDir exists
450     * @throws IOException
451     */
452    protected boolean checkDemuxOutputDir(String demuxOutputDir)
453    throws IOException {
454      return dirExists(demuxOutputDir);
455    }
456 
457 
458    /**
459     * Delete DemuxOutput directory
460     * @param demuxOutputDir
461     * @return true if succeed
462     * @throws IOException
463     */
464    protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
465    {
466      return fs.delete(new Path(demuxOutputDir), true);
467    }
468 
469    /**
470     * Create directory if !exists
471     * @param directory
472     * @throws IOException
473     */
474    protected void setup(Path directory) throws IOException {
475       if ( ! fs.exists(directory)) {
476         fs.mkdirs(directory);
477       }
478     }
479 
480     /** 
481      * Check if source exists and if source is a directory
482      * @param f source file
483      */
484    protected boolean dirExists(String directory) throws IOException {
485       Path pDirectory = new Path(directory);
486       return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
487     }
488 
489     /**
490      * Move sourceFolder inside destFolder
491      * @param srcDir
492      * @param destDir
493      * @return
494      * @throws IOException
495      */ 
496    protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
497       if (!destDir.endsWith("/")) {
498         destDir +="/";
499       }
500       Path pSrcDir = new Path(srcDir);
501       Path pDestDir = new Path(destDir );
502       setup(pDestDir);
503       destDir += prefix +"_" +System.currentTimeMillis();
504       Path pFinalDestDir = new Path(destDir );
505 
506       return fs.rename(pSrcDir, pFinalDestDir);
507     }
508 }