This project has retired. For details please refer to its Attic page.
FileTailingAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20  
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.io.File;
24  import org.apache.hadoop.chukwa.datacollection.adaptor.*;
25  
26  /**
27   * An adaptor that repeatedly tails a specified file, sending the new bytes.
28   * This class does not split out records, but just sends everything up to end of
29   * file. Subclasses can alter this behavior by overriding extractRecords().
30   * 
31   */
32  public class FileTailingAdaptor extends LWFTAdaptor {
33  
34  
35    public static int MAX_RETRIES = 300;
36    static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
37  
38    private int attempts = 0;
39    private long gracefulPeriodExpired = 0l;
40    private boolean adaptorInError = false;
41  
42    protected RandomAccessFile reader = null;
43  
44    public void start(long bytes) {
45      super.start(bytes);
46      log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
47      this.attempts = 0;
48  
49      log.info("started file tailer " + adaptorID +  "  on file " + toWatch
50          + " with first byte at offset " + offsetOfFirstByte);
51    }
52   
53    
54    @Override
55    public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
56      
57      log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
58      
59      switch(shutdownPolicy) {
60        case GRACEFULLY : 
61        case WAIT_TILL_FINISHED :{
62          if (toWatch.exists()) {
63            int retry = 0;
64            tailer.stopWatchingFile(this);
65            TerminatorThread lastTail = new TerminatorThread(this);
66            lastTail.setDaemon(true);
67            lastTail.start();
68            
69            if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
70              while (lastTail.isAlive() && retry < 60) {
71                try {
72                  log.info("GRACEFULLY Retry:" + retry);
73                  Thread.sleep(1000);
74                  retry++;
75                } catch (InterruptedException ex) {
76                }
77              }
78            } else {
79              while (lastTail.isAlive()) {
80                try {
81                  if (retry%100 == 0) {
82                    log.info("WAIT_TILL_FINISHED Retry:" + retry);
83                  }
84                  Thread.sleep(1000);
85                  retry++;
86                } catch (InterruptedException ex) {
87                }
88              } 
89            }          
90          }
91        }
92        break;
93        
94        case HARD_STOP:
95        default:
96          tailer.stopWatchingFile(this);
97          try {
98            if (reader != null) {
99              reader.close();
100           }
101           reader = null;
102         } catch(Throwable e) {
103          log.warn("Exception while closing reader:",e);
104         }
105         break;
106     }
107     log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
108     return fileReadOffset + offsetOfFirstByte;
109   }
110   
111 
112   /**
113    * Looks at the tail of the associated file, adds some of it to event queue
114    * This method is not thread safe. Returns true if there's more data in the
115    * file
116    * 
117    */
118   @Override
119   public boolean tailFile()
120       throws InterruptedException {
121     boolean hasMoreData = false;
122 
123     try {
124       if ((adaptorInError == true)
125           && (System.currentTimeMillis() > gracefulPeriodExpired)) {
126         if (!toWatch.exists()) {
127           log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
128               + "| File does not exist: " + toWatch.getAbsolutePath()
129               + ", streaming policy expired.  File removed from streaming.");
130         } else if (!toWatch.canRead()) {
131           log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
132               + "| File cannot be read: " + toWatch.getAbsolutePath()
133               + ", streaming policy expired.  File removed from streaming.");
134         } else {
135           // Should have never been there
136           adaptorInError = false;
137           gracefulPeriodExpired = 0L;
138           attempts = 0;
139           return false;
140         }
141 
142         deregisterAndStop();
143         return false;
144       } else if (!toWatch.exists() || !toWatch.canRead()) {
145         if (adaptorInError == false) {
146           long now = System.currentTimeMillis();
147           gracefulPeriodExpired = now + GRACEFUL_PERIOD;
148           adaptorInError = true;
149           attempts = 0;
150           log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
151               + ", graceful period will Expire at now:" + now + " + "
152               + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
153         } else if (attempts % 10 == 0) {
154           log.info("failed to stream data for: " + toWatch.getAbsolutePath()
155               + ", attempt: " + attempts);
156         }
157 
158         attempts++;
159         return false; // no more data
160       }
161 
162       if (reader == null) {
163         reader = new RandomAccessFile(toWatch, "r");
164         log.info("Adaptor|" + adaptorID
165             + "|Opening the file for the first time|seek|" + fileReadOffset);
166       }
167 
168       long len = 0L;
169       try {
170         len = reader.length();
171         if (lastSlurpTime == 0) {
172           lastSlurpTime = System.currentTimeMillis();
173         }
174         if (offsetOfFirstByte > fileReadOffset) {
175           // If the file rotated, the recorded offsetOfFirstByte is greater than
176           // file size,reset the first byte position to beginning of the file.
177           fileReadOffset = 0;
178           offsetOfFirstByte = 0L;
179           log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
180         }
181         if (len == fileReadOffset) {
182           File fixedNameFile = new File(toWatch.getAbsolutePath());
183           long fixedNameLastModified = fixedNameFile.lastModified();
184           if (fixedNameLastModified > lastSlurpTime) {
185             // If len == fileReadOffset,the file stops rolling log or the file
186             // has rotated.
187             // But fixedNameLastModified > lastSlurpTime , this means after the
188             // last slurping,the file has been written ,
189             // so the file has been rotated.
190             boolean hasLeftData = true;
191             while (hasLeftData) {// read the possiblly generated log
192               hasLeftData = slurp(len, reader);
193             }
194             RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
195             if (reader != null) {
196               reader.close();
197             }
198             reader = newReader;
199             fileReadOffset = 0L;
200             len = reader.length();
201             log.debug("Adaptor|" + adaptorID 
202                 + "| File size mismatched, rotating: " 
203                 + toWatch.getAbsolutePath());
204             hasMoreData = slurp(len, reader);
205           } 
206         } else if (len < fileReadOffset) {
207           // file has rotated and no detection
208           if (reader != null) {
209             reader.close();
210           }
211           reader = null;
212           fileReadOffset = 0L;
213           offsetOfFirstByte = 0L;
214           hasMoreData = true;
215           log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath() 
216               + ", has rotated and no detection - reset counters to 0L");
217         } else {
218           hasMoreData = slurp(len, reader);
219         }
220       } catch (IOException e) {
221         // do nothing, if file doesn't exist.
222       }       
223     } catch (IOException e) {
224       log.warn("failure reading " + toWatch, e);
225     }
226     attempts = 0;
227     adaptorInError = false;
228     return hasMoreData;
229   }
230 
231 
232 }