This project has retired. For details please refer to its Attic page.
DemuxManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.text.SimpleDateFormat;
25  import java.util.Date;
26  
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
29  import org.apache.hadoop.chukwa.util.NagiosHelper;
30  import org.apache.hadoop.chukwa.util.DaemonWatcher;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileStatus;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.fs.PathFilter;
36  import org.apache.hadoop.util.ToolRunner;
37  import org.apache.log4j.Logger;
38  
39  public class DemuxManager implements CHUKWA_CONSTANT {  
40    static Logger log = Logger.getLogger(DemuxManager.class);
41  
42    static int globalErrorcounter = 0;
43    static Date firstErrorTime = null;
44  
45    protected int ERROR_SLEEP_TIME = 60;
46    protected int NO_DATASINK_SLEEP_TIME = 20;
47  
48    protected int DEFAULT_MAX_ERROR_COUNT = 6;
49    protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
50    protected int DEFAULT_REDUCER_COUNT = 8;
51    
52    protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
53    protected int demuxReducerCount = 0;
54    protected ChukwaConfiguration conf = null;
55    protected FileSystem fs = null;
56    protected int reprocess = 0;
57    protected boolean sendAlert = true;
58    
59    protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
60    protected volatile boolean isRunning = true;
61  
62    final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
63      public boolean accept(Path file) {
64        return file.getName().endsWith(".done");
65      }     
66    };
67  
68  
69    public static void main(String[] args) throws Exception {
70      DaemonWatcher.createInstance("DemuxManager");
71      
72      DemuxManager manager = new DemuxManager();
73      manager.start();
74  
75    }
76  
77    public DemuxManager() throws Exception {
78      this.conf = new ChukwaConfiguration();
79      init();
80    }
81  
82    public DemuxManager(ChukwaConfiguration conf) throws Exception {
83      this.conf = conf;
84      init();
85    }
86  
87    protected void init() throws IOException, URISyntaxException {
88      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
89      fs = FileSystem.get(new URI(fsName), conf);
90    }
91  
92    public void shutdown() {
93      this.isRunning = false;
94    }
95  
96  
97    public int getReprocess() {
98      return reprocess;
99    }
100 
101   /**
102    * Start the Demux Manager daemon
103    * @throws Exception
104    */
105   public void start() throws Exception {
106 
107      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
108      if ( ! chukwaRootDir.endsWith("/") ) {
109        chukwaRootDir += "/";
110      }
111      log.info("chukwaRootDir:" + chukwaRootDir);
112 
113      String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
114      String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
115      String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
116      String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;
117 
118      String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
119      if ( ! dataSinkDir.endsWith("/") ) {
120        dataSinkDir += "/";
121      }
122      log.info("dataSinkDir:" + dataSinkDir);
123      
124      String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
125      if ( ! postProcessDir.endsWith("/") ) {
126        postProcessDir += "/";
127      }
128      log.info("postProcessDir:" + postProcessDir);
129      
130      String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
131      if ( ! archiveRootDir.endsWith("/") ) {
132        archiveRootDir += "/";
133      }
134      log.info("archiveRootDir:" + archiveRootDir);
135      
136      maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
137                                           DEFAULT_MAX_ERROR_COUNT);
138      demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
139      log.info("demuxReducerCount:" + demuxReducerCount);
140      
141      String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
142      int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
143      String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
144      
145      log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:" 
146          + nagiosPort + ", reportingHost:" + reportingHost);
147      
148      
149      if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
150        sendAlert = false;
151        log.warn("Alerting is OFF");
152      }
153      
154      boolean demuxReady = false;
155 
156      
157      while (isRunning) {
158        try {
159          demuxReady = false;
160 
161          if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
162            log.warn("==================\nToo many errors (" + globalErrorcounter +
163                     "), Bail out!\n==================");
164            DaemonWatcher.bailout(-1);
165          }
166          
167          // Check for anomalies
168          if (checkDemuxOutputDir(demuxOutputDir) == true) {
169            // delete current demux output dir
170            if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
171              log.warn("Cannot delete an existing demux output directory!");
172              throw new IOException("Cannot move demuxOutput to postProcess!");
173            }
174            continue;
175          } else if (checkDemuxInputDir(demuxInputDir) == true) { // dataSink already there
176            reprocess++;
177 
178            // Data has been processed more than 3 times ... move to InError directory
179            if (reprocess > 3) {
180              if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
181                log.warn("Cannot move dataSink files to DemuxErrorDir!");
182                throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
183              }
184              reprocess = 0;
185              continue;
186            }
187 
188            log.error("Demux inputDir aready contains some dataSink files,"
189                + " going to reprocess, reprocessCount=" + reprocess);
190            demuxReady = true;
191          } else { // standard code path
192            reprocess = 0;
193            // Move new dataSink Files
194            if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
195              demuxReady = true; // if any are available
196            } else {
197              demuxReady = false; // if none
198            }
199          }
200 
201          // start a new demux ?
202          if (demuxReady == true) {
203           boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
204                postProcessDir, archiveRootDir);
205           sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
206 
207           // if demux suceeds, then we reset these.
208           if (demuxStatus) {
209            globalErrorcounter = 0;
210            firstErrorTime = null;
211           }
212          } else {
213            log.info("Demux not ready so going to sleep ...");
214            Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
215          }
216        }catch(Throwable e) {
217          globalErrorcounter ++;
218          if (firstErrorTime == null) firstErrorTime = new Date();
219 
220          log.warn("Consecutive error number " + globalErrorcounter +
221                   " encountered since " + firstErrorTime, e);
222          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
223          try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
224          catch (InterruptedException e1) {/*do nothing*/ }
225          init();
226        }
227      }
228    }
229 
230 
231    /**
232     * Send NSCA status to Nagios
233     * @param nagiosHost
234     * @param nagiosPort
235     * @param reportingHost
236     * @param demuxInErrorDir
237     * @param demuxStatus
238     * @param exception
239     */
240   protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
241         String demuxInErrorDir,boolean demuxStatus,String demuxException) {
242       
243      if (sendAlert == false) {
244        return;
245      }
246       
247      boolean demuxInErrorStatus = true;
248      String demuxInErrorMsg = "";
249      try {
250        Path pDemuxInErrorDir = new Path(demuxInErrorDir);
251        if ( fs.exists(pDemuxInErrorDir)) {
252          FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
253          if (demuxInErrorDirs.length == 0) {
254            demuxInErrorStatus = false;
255          }          
256        }
257      } catch (Throwable e) {
258        demuxInErrorMsg = e.getMessage();
259        log.warn(e);
260      }
261      
262      // send Demux status
263      if (demuxStatus == true) {
264        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux OK",NagiosHelper.NAGIOS_OK);
265      } else {
266        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxProcessing","Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
267      }
268      
269      // send DemuxInErrorStatus
270      if (demuxInErrorStatus == false) {
271        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError OK",NagiosHelper.NAGIOS_OK);
272      } else {
273        NagiosHelper.sendNsca(nagiosHost,nagiosPort,reportingHost,"DemuxInErrorDirectory","DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
274      }
275      
276    }
277    
278    /**
279     * Process Data, i.e. 
280     * - run demux
281     * - move demux output to postProcessDir
282     * - move dataSink file to archiveDir
283     * 
284     * @param dataSinkDir
285     * @param demuxInputDir
286     * @param demuxOutputDir
287     * @param postProcessDir
288     * @param archiveDir
289     * @return True iff succeed
290     * @throws IOException
291     */
292     protected boolean processData(String dataSinkDir, String demuxInputDir,
293        String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {
294 
295      boolean demuxStatus = false;
296 
297      long startTime = System.currentTimeMillis();
298      demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
299      log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));
300 
301      if (demuxStatus == false) {
302        log.warn("Demux failed!");
303      } else {
304 
305        // Move demux output to postProcessDir 
306        if (checkDemuxOutputDir(demuxOutputDir)) {
307          if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
308            log.warn("Cannot move demuxOutput to postProcess! bail out!");
309            throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
310          } 
311        } else {
312          log.warn("Demux processing OK but no output");
313        }
314 
315        // Move DataSink Files to archiveDir
316        if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
317          log.warn("Cannot move datasinkFile to archive! bail out!");
318          throw new IOException("Cannot move datasinkFile to archive! bail out!");
319        }
320      }
321      
322      return demuxStatus;
323    }
324 
325 
326    /**
327     * Submit and Run demux Job 
328     * @param demuxInputDir
329     * @param demuxOutputDir
330     * @return true id Demux succeed
331     */
332    protected boolean runDemux(String demuxInputDir, String demuxOutputDir) { 
333      // to reload the configuration, and demux's reduce number
334      Configuration tempConf = new Configuration(conf);
335      tempConf.reloadConfiguration();
336      demuxReducerCount = tempConf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
337      String[] demuxParams;
338      int i=0;
339      Demux.addParsers(tempConf);
340      demuxParams = new String[4];
341      demuxParams[i++] = "-r";
342      demuxParams[i++] = "" + demuxReducerCount;
343      demuxParams[i++] = demuxInputDir;
344      demuxParams[i++] = demuxOutputDir;
345      try {
346        return ( 0 == ToolRunner.run(tempConf,new Demux(), demuxParams) );
347      } catch (Throwable e) {
348        e.printStackTrace();
349        globalErrorcounter ++;
350        if (firstErrorTime == null) firstErrorTime = new Date();
351        log.error("Failed to run demux. Consecutive error number " +
352                globalErrorcounter + " encountered since " + firstErrorTime, e);
353      }
354      return false;
355    }
356 
357 
358 
359    /**
360     * Move dataSink files to Demux input directory
361     * @param dataSinkDir
362     * @param demuxInputDir
363     * @return true if there's any dataSink files ready to be processed
364     * @throws IOException
365     */
366    protected boolean moveDataSinkFilesToDemuxInputDirectory(
367        String dataSinkDir, String demuxInputDir) throws IOException {
368      Path pDataSinkDir = new Path(dataSinkDir);
369      Path pDemuxInputDir = new Path(demuxInputDir);
370      log.info("dataSinkDir: " + dataSinkDir);
371      log.info("demuxInputDir: " + demuxInputDir);
372 
373 
374      boolean containsFile = false;
375 
376      FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
377      if (dataSinkFiles.length > 0) {
378        setup(pDemuxInputDir);
379      }
380 
381      int maxFilesPerDemux = 0;
382      for (FileStatus fstatus : dataSinkFiles) {
383        boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
384        log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
385        maxFilesPerDemux ++;
386        containsFile = true;
387        if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
388          log.info("Max File per Demux reached:" + maxFilesPerDemux);
389          break;
390        }
391      }
392      return containsFile;
393    }
394 
395 
396 
397 
398    /**
399     * Move sourceFolder inside destFolder
400     * @param dataSinkDir : ex chukwa/demux/inputDir
401     * @param demuxErrorDir : ex /chukwa/demux/inError
402     * @return true if able to move chukwa/demux/inputDir to /chukwa/demux/inError/<YYYYMMDD>/demuxInputDirXXX
403     * @throws IOException
404     */
405    protected boolean moveDataSinkFilesToDemuxErrorDirectory(
406        String dataSinkDir, String demuxErrorDir) throws IOException {
407      demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
408      return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
409    }
410 
411    /**
412     * Move sourceFolder inside destFolder
413     * @param demuxInputDir: ex chukwa/demux/inputDir
414     * @param archiveDirectory: ex /chukwa/archives
415     * @return true if able to move chukwa/demux/inputDir to /chukwa/archives/raw/<YYYYMMDD>/dataSinkDirXXX
416     * @throws IOException
417     */
418    protected boolean moveDataSinkFilesToArchiveDirectory(
419        String demuxInputDir, String archiveDirectory) throws IOException {
420      archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
421      return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
422    }
423 
424    /**
425     * Move sourceFolder inside destFolder
426     * @param demuxOutputDir: ex chukwa/demux/outputDir 
427     * @param postProcessDirectory: ex /chukwa/postProcess
428     * @return true if able to move chukwa/demux/outputDir to /chukwa/postProcess/demuxOutputDirXXX
429     * @throws IOException 
430     */
431    protected  boolean moveDemuxOutputDirToPostProcessDirectory(
432        String demuxOutputDir, String postProcessDirectory) throws IOException {
433      return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
434    }
435 
436 
437    /**
438     * Test if demuxInputDir exists
439     * @param demuxInputDir
440     * @return true if demuxInputDir exists
441     * @throws IOException
442     */
443    protected boolean checkDemuxInputDir(String demuxInputDir)
444    throws IOException {
445      return dirExists(demuxInputDir);
446    }
447 
448    /**
449     * Test if demuxOutputDir exists
450     * @param demuxOutputDir
451     * @return true if demuxOutputDir exists
452     * @throws IOException
453     */
454    protected boolean checkDemuxOutputDir(String demuxOutputDir)
455    throws IOException {
456      return dirExists(demuxOutputDir);
457    }
458 
459 
460    /**
461     * Delete DemuxOutput directory
462     * @param demuxOutputDir
463     * @return true if succeed
464     * @throws IOException
465     */
466    protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
467    {
468      return fs.delete(new Path(demuxOutputDir), true);
469    }
470 
471    /**
472     * Create directory if !exists
473     * @param directory
474     * @throws IOException
475     */
476    protected void setup(Path directory) throws IOException {
477       if ( ! fs.exists(directory)) {
478         fs.mkdirs(directory);
479       }
480     }
481 
482     /** 
483      * Check if source exists and if source is a directory
484      * @param f source file
485      */
486    protected boolean dirExists(String directory) throws IOException {
487       Path pDirectory = new Path(directory);
488       return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
489     }
490 
491     /**
492      * Move sourceFolder inside destFolder
493      * @param srcDir
494      * @param destDir
495      * @return
496      * @throws IOException
497      */ 
498    protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
499       if (!destDir.endsWith("/")) {
500         destDir +="/";
501       }
502       Path pSrcDir = new Path(srcDir);
503       Path pDestDir = new Path(destDir );
504       setup(pDestDir);
505       destDir += prefix +"_" +System.currentTimeMillis();
506       Path pFinalDestDir = new Path(destDir );
507 
508       return fs.rename(pSrcDir, pFinalDestDir);
509     }
510 }