This project has retired. For details please refer to its Attic page.
LWFTAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.util.regex.Matcher;
24  import java.util.regex.Pattern;
25  import org.apache.hadoop.chukwa.ChunkImpl;
26  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
27  import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
28  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
29  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.log4j.Logger;
32  
33  /**
34   * A base class for file tailing adaptors.  
35   * Intended to mandate as little policy as possible, and to use as 
36   * few system resources as possible.
37   * 
38   * 
39   * If the file does not exist, this class will continue to retry quietly
40   * forever and will start tailing if it's eventually created.
41   */
42  public class LWFTAdaptor extends AbstractAdaptor {
43    
44    /**
45     * This is the maximum amount we'll read from any one file before moving on to
46     * the next. This way, we get quick response time for other files if one file
47     * is growing rapidly.
48     * 
49     */
50    public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
51    public static final String MAX_READ_SIZE_OPT = 
52        "chukwaAgent.fileTailingAdaptor.maxReadSize";
53  
54    int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
55    
56    static Logger log;
57    static FileTailer tailer;
58    
59    static {
60      tailer = null;
61      log = Logger.getLogger(FileTailingAdaptor.class);
62    }
63    
64    
65    /**
66     * next PHYSICAL offset to read
67     */
68    protected long fileReadOffset;
69  
70    /**
71     * The logical offset of the first byte of the file
72     */
73    protected long offsetOfFirstByte = 0;
74    protected Configuration conf = null;
75    /**
76     * The timestamp of last slurping.
77     */ 
78    protected long lastSlurpTime = 0l;
79  
80    File toWatch;
81  
82    @Override
83    public void start(long offset) {
84      synchronized(LWFTAdaptor.class) {
85        if(tailer == null)
86          tailer = new FileTailer(control.getConfiguration());
87      }
88      this.fileReadOffset = offset - offsetOfFirstByte;    
89      tailer.startWatchingFile(this);
90    }
91    
92    /**
93     * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
94     */
95    public String getCurrentStatus() {
96      return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
97    }
98  
99    public String toString() {
100     return "Lightweight Tailer on " + toWatch;
101   }
102 
103   public String getStreamName() {
104     return toWatch.getPath();
105   }
106   
107   @Override
108   public String parseArgs(String params) { 
109     conf = control.getConfiguration();
110     MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
111 
112     Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
113     Matcher m = cmd.matcher(params);
114     if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
115       offsetOfFirstByte = Long.parseLong(m.group(1));
116       toWatch = new File(m.group(2));
117     } else {
118       toWatch = new File(params.trim());
119     }
120     return toWatch.getAbsolutePath();
121   }
122 
123   @Override
124   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
125       throws AdaptorException {
126     tailer.stopWatchingFile(this);
127     return fileReadOffset + offsetOfFirstByte;
128   }
129   
130 
131   /**
132    * Extract records from a byte sequence
133    * 
134    * @param eq the queue to stick the new chunk[s] in
135    * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
136    * @param buf the byte buffer to extract records from
137    * @return the number of bytes processed
138    * @throws InterruptedException
139    */
140   protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
141       byte[] buf) throws InterruptedException {
142     if(buf.length == 0)
143       return 0;
144     
145     ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
146         buffOffsetInFile + buf.length, buf, this);
147 
148     eq.add(chunk);
149     return buf.length;
150   }
151   
152   protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
153   InterruptedException{
154     boolean hasMoreData = false;
155 
156     log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
157     reader.seek(fileReadOffset);
158 
159     long bufSize = len - fileReadOffset;
160 
161    if (bufSize > MAX_READ_SIZE) {
162       bufSize = MAX_READ_SIZE;
163       hasMoreData = true;
164     }
165     byte[] buf = new byte[(int) bufSize];
166 
167     long curOffset = fileReadOffset;
168 
169     lastSlurpTime = System.currentTimeMillis();
170     int bufferRead = reader.read(buf);
171     assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
172         + " pointer is "
173         + reader.getFilePointer()
174         + " but offset is "
175         + fileReadOffset + bufSize;
176 
177     int bytesUsed = extractRecords(dest,
178         fileReadOffset + offsetOfFirstByte, buf);
179 
180     // === WARNING ===
181     // If we couldn't found a complete record AND
182     // we cannot read more, i.e bufferRead == MAX_READ_SIZE
183     // it's because the record is too BIG
184     // So log.warn, and drop current buffer so we can keep moving
185     // instead of being stopped at that point for ever
186     if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
187       log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="
188               + curOffset
189               + ", MAX_READ_SIZE="
190               + MAX_READ_SIZE
191               + ", for "
192               + toWatch.getPath());
193       bytesUsed = buf.length;
194     }
195 
196     fileReadOffset = fileReadOffset + bytesUsed;
197 
198     log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
199         + fileReadOffset);
200     return hasMoreData;
201   }
202   
203   public boolean tailFile()
204   throws InterruptedException {
205     boolean hasMoreData = false;
206     try {
207       
208        //if file doesn't exist, length =0 and we just keep waiting for it.
209       //if(!toWatch.exists())
210       //  deregisterAndStop(false);
211       
212       long len = toWatch.length();
213       if(len < fileReadOffset) {
214         //file shrank; probably some data went missing.
215         handleShrunkenFile(len);
216       } else if(len > fileReadOffset) {
217         RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
218         hasMoreData = slurp(len, reader);
219         reader.close();
220       }
221     } catch(IOException e) {
222       log.warn("IOException in tailer", e);
223       deregisterAndStop();
224     }
225     
226     return hasMoreData;
227   }
228 
229   private void handleShrunkenFile(long measuredLen) {
230     log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
231     offsetOfFirstByte = measuredLen;
232     fileReadOffset = 0;
233   }
234 
235 }