1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.adaptor;
1920import java.io.File;
21import java.io.IOException;
2223import org.apache.log4j.Logger;
24import org.apache.commons.io.FileUtils;
25import org.apache.commons.io.filefilter.FileFilterUtils;
26import org.apache.commons.io.filefilter.IOFileFilter;
27import org.apache.commons.io.filefilter.WildcardFileFilter;
2829/**30 * Explore a whole directory hierarchy, looking for files to tail. 31 * DirTailingAdaptor will not try to start tailing a file more than once,32 * if the file hasn't been modified in the interim. 33 * 34 * Offset param is used to track last finished scan.35 * 36 * Mandatory first parameter is a directory with an optional unix style file37 * filter. Mandatory second parameter38 * is the name of an adaptor to start.39 * 40 * If the specified directory does not exist, the DirTailer will continue41 * running, and will start tailing if the directory is later created.42 *43 */44publicclassDirTailingAdaptorextendsAbstractAdaptorimplements Runnable {
4546static Logger log = Logger.getLogger(DirTailingAdaptor.class);
4748 Thread scanThread = new Thread(this);
49long lastSweepStartTime;
50volatileboolean continueScanning=true;
51 File baseDir;
52 String baseDirName;
53long scanInterval;
5455protected String adaptorName; // name of adaptors to start5657 IOFileFilter fileFilter;
58 String wildCharacter;
5960 @Override
61publicvoid start(long offset) throws AdaptorException {
62 scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
6364 scanThread.start();
65 lastSweepStartTime = offset;
66try {
67 baseDirName = baseDir.getCanonicalPath();
68 } catch(IOException e) {
69thrownewAdaptorException(e);
70 }
71 }
7273publicvoid run() {
74try {
75 log.debug("dir tailer starting to scan");
76while(continueScanning) {
77try {
78long sweepStartTime = System.currentTimeMillis();
79 scanDirHierarchy(baseDir);
80 lastSweepStartTime=sweepStartTime;
81 control.reportCommit(this, lastSweepStartTime);
82 } catch(IOException e) {
83 log.warn(e);
84 }
85 Thread.sleep(scanInterval);
86 }
87 } catch(InterruptedException e) {
88 }
89 }
9091/*92 * Coded recursively. Base case is a single non-dir file.93 */94privatevoid scanDirHierarchy(File dir) throws IOException {
95if(!dir.exists())
96return;
97if(!dir.isDirectory()) {
98//Don't start tailing if we would have gotten it on the last pass99if(dir.lastModified() >= lastSweepStartTime) {
100 String newAdaptorID = control.processAddCommand(getAdaptorAddCommand(dir));
101102 log.info("DirTailingAdaptor " + adaptorID + " started new adaptor " + newAdaptorID);
103 }
104105 } else {
106 log.info("Scanning directory: " + dir.getName());
107108for(Object f: FileUtils.listFiles(dir, fileFilter, FileFilterUtils.trueFileFilter())) {
109 scanDirHierarchy((File)f);
110 }
111 }
112 }
113114protected String getAdaptorAddCommand(File dir) throws IOException {
115return"add " + adaptorName + " " + type + " " + dir.getCanonicalPath() + " 0";
116 }
117118 @Override
119public String getCurrentStatus() {
120returnthis.wildCharacter == null ? (type + " " + baseDirName + " " + adaptorName)
121 :(type + " " + baseDirName + " " + this.wildCharacter + " " + adaptorName);
122 }
123124 @Override
125public String parseArgs(String status) {
126127 String[] args = status.split(" ");
128129if(args.length == 2){
130 baseDir = new File(args[0]);
131 fileFilter = FileFilterUtils.trueFileFilter();
132 adaptorName = args[1];
133 }elseif(args.length == 3){
134 baseDir = new File(args[0]);
135this.wildCharacter = args[ 1 ];
136 fileFilter = getFileFilter();
137 adaptorName = args[2];
138 }else{
139 log.warn("bad syntax in DirTailingAdaptor args");
140returnnull;
141 }
142143return (args.length == 2)? baseDir + " " + adaptorName : baseDir + " " + this.wildCharacter + " " + adaptorName; //both params mandatory144 }
145146 @Override
147publiclong shutdown(AdaptorShutdownPolicy shutdownPolicy)
148throwsAdaptorException {
149 continueScanning = false;
150return lastSweepStartTime;
151 }
152153/**154 * Returns {@link IOFileFilter} constructed using the wild character. Subclasses can override this method 155 * return their own version of {@link IOFileFilter} instance.156 * 157 * @return {@link IOFileFilter} constructed using the wild character. Subclasses can override this method 158 * return their own version of {@link IOFileFilter} instance.159 */160protected IOFileFilter getFileFilter() {
161returnnew WildcardFileFilter( this.wildCharacter );
162 }
163 }