This project has retired. For details please refer to its Attic page.
FileTailingAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20  
21  import java.io.IOException;
22  import java.io.RandomAccessFile;
23  import java.io.File;
24  import org.apache.hadoop.chukwa.datacollection.adaptor.*;
25  import org.apache.hadoop.chukwa.util.ExceptionUtil;
26  
27  /**
28   * An adaptor that repeatedly tails a specified file, sending the new bytes.
29   * This class does not split out records, but just sends everything up to end of
30   * file. Subclasses can alter this behavior by overriding extractRecords().
31   * 
32   */
33  public class FileTailingAdaptor extends LWFTAdaptor {
34  
35  
36    public static int MAX_RETRIES = 300;
37    public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
38  
39    private int attempts = 0;
40    private long gracefulPeriodExpired = 0l;
41    private boolean adaptorInError = false;
42  
43    protected RandomAccessFile reader = null;
44  
45    public void start(long bytes) {
46      super.start(bytes);
47      log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
48      this.attempts = 0;
49  
50      log.info("started file tailer " + adaptorID +  "  on file " + toWatch
51          + " with first byte at offset " + offsetOfFirstByte);
52    }
53   
54    
55    @Override
56    public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
57      
58      log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
59      
60      switch(shutdownPolicy) {
61        case GRACEFULLY : 
62        case WAIT_TILL_FINISHED :{
63          if (toWatch.exists()) {
64            int retry = 0;
65            tailer.stopWatchingFile(this);
66            TerminatorThread lastTail = new TerminatorThread(this);
67            lastTail.setDaemon(true);
68            lastTail.start();
69            
70            if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
71              while (lastTail.isAlive() && retry < 60) {
72                try {
73                  log.info("GRACEFULLY Retry:" + retry);
74                  Thread.sleep(1000);
75                  retry++;
76                } catch (InterruptedException ex) {
77                }
78              }
79            } else {
80              while (lastTail.isAlive()) {
81                try {
82                  if (retry%100 == 0) {
83                    log.info("WAIT_TILL_FINISHED Retry:" + retry);
84                  }
85                  Thread.sleep(1000);
86                  retry++;
87                } catch (InterruptedException ex) {
88                }
89              } 
90            }          
91          }
92        }
93        break;
94        
95        case HARD_STOP:
96        default:
97          tailer.stopWatchingFile(this);
98          try {
99            if (reader != null) {
100             reader.close();
101           }
102           reader = null;
103         } catch(Throwable e) {
104          log.warn("Exception while closing reader:",e);
105         }
106         break;
107     }
108     log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
109     return fileReadOffset + offsetOfFirstByte;
110   }
111   
112 
113   /**
114    * Looks at the tail of the associated file, adds some of it to event queue
115    * This method is not thread safe. Returns true if there's more data in the
116    * file
117    * 
118    * @param eq the queue to write Chunks into
119    */
120   @Override
121   public synchronized boolean tailFile()
122       throws InterruptedException {
123     boolean hasMoreData = false;
124 
125     try {
126       if ((adaptorInError == true)
127           && (System.currentTimeMillis() > gracefulPeriodExpired)) {
128         if (!toWatch.exists()) {
129           log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
130               + "| File does not exist: " + toWatch.getAbsolutePath()
131               + ", streaming policy expired.  File removed from streaming.");
132         } else if (!toWatch.canRead()) {
133           log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
134               + "| File cannot be read: " + toWatch.getAbsolutePath()
135               + ", streaming policy expired.  File removed from streaming.");
136         } else {
137           // Should have never been there
138           adaptorInError = false;
139           gracefulPeriodExpired = 0L;
140           attempts = 0;
141           return false;
142         }
143 
144         deregisterAndStop();
145         return false;
146       } else if (!toWatch.exists() || !toWatch.canRead()) {
147         if (adaptorInError == false) {
148           long now = System.currentTimeMillis();
149           gracefulPeriodExpired = now + GRACEFUL_PERIOD;
150           adaptorInError = true;
151           attempts = 0;
152           log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
153               + ", graceful period will Expire at now:" + now + " + "
154               + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
155         } else if (attempts % 10 == 0) {
156           log.info("failed to stream data for: " + toWatch.getAbsolutePath()
157               + ", attempt: " + attempts);
158         }
159 
160         attempts++;
161         return false; // no more data
162       }
163 
164       if (reader == null) {
165         reader = new RandomAccessFile(toWatch, "r");
166         log.info("Adaptor|" + adaptorID
167             + "|Opening the file for the first time|seek|" + fileReadOffset);
168       }
169 
170       long len = 0L;
171       try {
172         len = reader.length();
173         if (lastSlurpTime == 0) {
174           lastSlurpTime = System.currentTimeMillis();
175         }
176         if (offsetOfFirstByte > fileReadOffset) {
177           // If the file rotated, the recorded offsetOfFirstByte is greater than
178           // file size,reset the first byte position to beginning of the file.
179           fileReadOffset = 0;
180           offsetOfFirstByte = 0L;
181           log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
182         }
183         if (len == fileReadOffset) {
184           File fixedNameFile = new File(toWatch.getAbsolutePath());
185           long fixedNameLastModified = fixedNameFile.lastModified();
186           if (fixedNameLastModified > lastSlurpTime) {
187             // If len == fileReadOffset,the file stops rolling log or the file
188             // has rotated.
189             // But fixedNameLastModified > lastSlurpTime , this means after the
190             // last slurping,the file has been written ,
191             // so the file has been rotated.
192             boolean hasLeftData = true;
193             while (hasLeftData) {// read the possiblly generated log
194               hasLeftData = slurp(len, reader);
195             }
196             RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
197             if (reader != null) {
198               reader.close();
199             }
200             reader = newReader;
201             fileReadOffset = 0L;
202             len = reader.length();
203             log.debug("Adaptor|" + adaptorID 
204                 + "| File size mismatched, rotating: " 
205                 + toWatch.getAbsolutePath());
206             hasMoreData = slurp(len, reader);
207           } 
208         } else if (len < fileReadOffset) {
209           // file has rotated and no detection
210           if (reader != null) {
211             reader.close();
212           }
213           reader = null;
214           fileReadOffset = 0L;
215           offsetOfFirstByte = 0L;
216           hasMoreData = true;
217           log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath() 
218               + ", has rotated and no detection - reset counters to 0L");
219         } else {
220           hasMoreData = slurp(len, reader);
221         }
222       } catch (IOException e) {
223         // do nothing, if file doesn't exist.
224       }       
225     } catch (IOException e) {
226       log.warn("failure reading " + toWatch, e);
227     }
228     attempts = 0;
229     adaptorInError = false;
230     return hasMoreData;
231   }
232 
233 
234 }