This project has retired. For details please refer to its Attic page.
DirTailingAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.File;
21  import java.io.IOException;
22  
23  import org.apache.log4j.Logger;
24  import org.apache.commons.io.FileUtils;
25  import org.apache.commons.io.filefilter.FileFilterUtils;
26  import org.apache.commons.io.filefilter.IOFileFilter;
27  import org.apache.commons.io.filefilter.WildcardFileFilter;
28  
29  /**
30   *  Explore a whole directory hierarchy, looking for files to tail. 
31   *   DirTailingAdaptor will not try to start tailing a file more than once,
32   *   if the file hasn't been modified in the interim.  
33   *   
34   *  Offset param is used to track last finished scan.
35   *  
36   *  Mandatory first parameter is a directory with an optional unix style file
37   *  filter. Mandatory second parameter
38   *  is the name of an adaptor to start.
39   * 
40   *  If the specified directory does not exist, the DirTailer will continue
41   *  running, and will start tailing if the directory is later created.
42   *
43   */
44  public class DirTailingAdaptor extends AbstractAdaptor implements Runnable {
45    
46    static Logger log = Logger.getLogger(DirTailingAdaptor.class); 
47    
48    Thread scanThread = new Thread(this);
49    long lastSweepStartTime;
50    volatile boolean continueScanning=true;
51    File baseDir;
52    String baseDirName; 
53    long scanInterval;
54  
55  	protected String adaptorName; // name of adaptors to start
56  
57    IOFileFilter fileFilter;
58    String wildCharacter;
59  
60    @Override
61    public void start(long offset) throws AdaptorException {
62      scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
63        
64      scanThread.start();
65      lastSweepStartTime = offset;
66      try {
67        baseDirName = baseDir.getCanonicalPath();
68      } catch(IOException e) {
69        throw new AdaptorException(e);
70      }
71    }
72    
73    public void run() {
74      try {
75        log.debug("dir tailer starting to scan");
76        while(continueScanning) {
77          try {
78            long sweepStartTime = System.currentTimeMillis();
79            scanDirHierarchy(baseDir);
80            lastSweepStartTime=sweepStartTime;
81            control.reportCommit(this, lastSweepStartTime);
82          } catch(IOException e) {
83            log.warn(e);
84          }
85          Thread.sleep(scanInterval);
86        }
87      } catch(InterruptedException e) {
88      }
89    }
90    
91    /*
92     * Coded recursively.  Base case is a single non-dir file.
93     */
94    private void scanDirHierarchy(File dir) throws IOException {
95      if(!dir.exists())
96        return;
97      if(!dir.isDirectory()) {
98        //Don't start tailing if we would have gotten it on the last pass
99        if(dir.lastModified() >= lastSweepStartTime) {
100 				String newAdaptorID = control.processAddCommand(getAdaptorAddCommand(dir));
101 
102             log.info("DirTailingAdaptor " + adaptorID +  "  started new adaptor " + newAdaptorID);
103        }
104       
105       } else {
106         log.info("Scanning directory: " + dir.getName());
107         
108         for(Object f: FileUtils.listFiles(dir, fileFilter, FileFilterUtils.trueFileFilter())) {
109          scanDirHierarchy((File)f);
110         }
111       }
112   }
113   
114 	protected String getAdaptorAddCommand(File dir) throws IOException {
115 		return "add " + adaptorName + " " + type + " " + dir.getCanonicalPath() + " 0";
116 	}
117 
118   @Override
119   public String getCurrentStatus() {
120     return this.wildCharacter == null ? (type + " " + baseDirName + " " + adaptorName)
121     		:(type + " " + baseDirName + " " + this.wildCharacter + " " + adaptorName);
122   }
123 
124   @Override
125   public String parseArgs(String status) {
126     
127     String[] args = status.split(" ");
128      
129     if(args.length == 2){
130      baseDir = new File(args[0]);
131      fileFilter = FileFilterUtils.trueFileFilter();
132      adaptorName = args[1];
133     }else if(args.length == 3){
134      baseDir = new File(args[0]);
135      this.wildCharacter = args[ 1 ];
136      fileFilter = getFileFilter();
137      adaptorName = args[2]; 
138     }else{
139      log.warn("bad syntax in DirTailingAdaptor args");
140      return null;
141     }
142     
143     return (args.length == 2)? baseDir + " " + adaptorName : baseDir + " " + this.wildCharacter + " " + adaptorName;  //both params mandatory
144   }
145 
146   @Override
147   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
148       throws AdaptorException {
149     continueScanning = false;
150     return lastSweepStartTime;
151   }
152 
153   /**
154    * Returns {@link IOFileFilter} constructed using the wild character. Subclasses can override this method 
155    * return their own version of {@link IOFileFilter} instance.
156    *  
157    * @return {@link IOFileFilter} constructed using the wild character. Subclasses can override this method 
158    * return their own version of {@link IOFileFilter} instance.
159    */
160   protected IOFileFilter getFileFilter() {
161   	return new WildcardFileFilter( this.wildCharacter );
162   }
163 }