This project has retired. For details please refer to its Attic page.
FileAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.File;
22  import java.io.RandomAccessFile;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  
26  import org.apache.hadoop.chukwa.ChunkImpl;
27  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
28  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.chukwa.util.ExceptionUtil;
31  import org.apache.log4j.Level;
32  import org.apache.log4j.Logger;
33  
34  
35  class FileAdaptorTailer extends Thread {
36    static Logger log = Logger.getLogger(FileAdaptorTailer.class);
37    private List<FileAdaptor> adaptors = null;
38    private static Configuration conf = null;
39    private Object lock = new Object();
40    
41    /**
42     * How often to call each adaptor.
43     */
44    int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 10;
45    int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
46  
47    
48    public FileAdaptorTailer() {
49      
50      if (conf == null) {
51        ChukwaAgent agent = ChukwaAgent.getAgent();
52        if (agent != null) {
53          conf = agent.getConfiguration();
54          if (conf != null) {
55            SAMPLE_PERIOD_MS = conf.getInt(
56                "chukwaAgent.adaptor.context.switch.time",
57                DEFAULT_SAMPLE_PERIOD_MS);
58          }
59        }
60      }
61      
62      // iterations are much more common than adding a new adaptor
63      adaptors = new CopyOnWriteArrayList<FileAdaptor>();
64  
65      setDaemon(true);
66      start();// start the FileAdaptorTailer thread
67    }
68    @Override
69    public void run() {
70      while(true) {
71        try {
72  
73          while (adaptors.size() == 0) {
74            synchronized (lock) {
75              try {
76                log.info("Waiting queue is empty");
77                lock.wait();
78              } catch (InterruptedException e) {
79                // do nothing
80              }
81            }
82          }
83          
84          long startTime = System.currentTimeMillis();
85          for (FileAdaptor adaptor: adaptors) {
86            log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
87            adaptor.sendFile(); 
88          }
89          
90          long timeToReadFiles = System.currentTimeMillis() - startTime;
91          if (timeToReadFiles < SAMPLE_PERIOD_MS) {
92            Thread.sleep(SAMPLE_PERIOD_MS);
93          }
94          
95        }catch (Throwable e) {
96          log.warn("Exception in FileAdaptorTailer:",e);
97        }
98      }
99    }
100   
101   public void addFileAdaptor(FileAdaptor adaptor) {
102     adaptors.add(adaptor);
103     synchronized (lock) {
104       lock.notifyAll();
105     }
106   }
107   
108   public void removeFileAdaptor(FileAdaptor adaptor) {
109     adaptors.remove(adaptor);
110   }
111 }
112 
113 /**
114  * File Adaptor push small size file in one chunk to collector
115  */
116 public class FileAdaptor extends AbstractAdaptor {
117 
118   static Logger log = Logger.getLogger(FileAdaptor.class);
119   static FileAdaptorTailer tailer = null;
120   
121   static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
122   static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
123   
124   static {
125     tailer = new FileAdaptorTailer();
126   }
127   
128   private long startTime = 0;
129   private long timeOut = 0;
130   
131   protected volatile boolean finished = false;
132   File toWatch;
133   protected RandomAccessFile reader = null;
134   protected long fileReadOffset;
135   protected boolean deleteFileOnClose = false;
136   protected boolean shutdownCalled = false;
137   
138   /**
139    * The logical offset of the first byte of the file
140    */
141   private long offsetOfFirstByte = 0;
142 
143   public void start(long bytes) {
144     // in this case params = filename
145     log.info("adaptor id: " + adaptorID + " started file adaptor on file "
146         + toWatch);
147     this.startTime = System.currentTimeMillis();
148     TIMEOUT_PERIOD = control.getConfiguration().getInt(
149         "chukwaAgent.adaptor.fileadaptor.timeoutperiod",
150         DEFAULT_TIMEOUT_PERIOD);
151     this.timeOut = startTime + TIMEOUT_PERIOD;
152     
153     tailer.addFileAdaptor(this);
154   }
155 
156   void sendFile() {
157     long now = System.currentTimeMillis() ;
158     long oneMinAgo = now - (60*1000);
159     if (toWatch.exists()) {
160      if (toWatch.lastModified() > oneMinAgo && now < timeOut) {
161        log.info("Last modified time less than one minute, keep waiting");
162        return;
163      } else {
164        try {
165          
166          long bufSize = toWatch.length();
167          byte[] buf = new byte[(int) bufSize];
168          
169          reader = new RandomAccessFile(toWatch, "r");
170          reader.read(buf);
171          reader.close();
172          reader = null;
173          
174          long fileTime = toWatch.lastModified();
175          int bytesUsed = extractRecords(dest, 0, buf, fileTime);
176          this.fileReadOffset = bytesUsed;
177          finished = true;
178          deregisterAndStop();
179          cleanUp();
180        } catch(Exception e) {
181          log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
182        }  finally {
183          if (reader != null) {
184            try {
185              reader.close();
186            } catch (Exception e) {
187              log.debug(ExceptionUtil.getStackTrace(e));
188           }
189            reader = null;
190          }
191        }
192      }
193     } else {
194       if (now > timeOut) {
195         finished = true;
196         log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
197         deregisterAndStop();
198         cleanUp() ;
199       }
200     }
201   }
202   
203   private void cleanUp() {
204     tailer.removeFileAdaptor(this);
205     if (reader != null) {
206       try {
207         reader.close();
208       } catch (Exception e) {
209         log.debug(ExceptionUtil.getStackTrace(e));
210      }
211       reader = null;
212     } 
213   }
214   
215 
216   @Override
217   public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
218     log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
219     switch(shutdownPolicy) {
220       case GRACEFULLY : {
221         int retry = 0;
222         while (!finished && retry < 60) {
223           try {
224             log.info("GRACEFULLY Retry:" + retry);
225             Thread.sleep(1000);
226             retry++;
227           } catch (InterruptedException ex) {
228           }
229         } 
230       }
231       break;
232       case WAIT_TILL_FINISHED : {
233         int retry = 0;
234         while (!finished) {
235           try {
236             if (retry%100 == 0) {
237               log.info("WAIT_TILL_FINISHED Retry:" + retry);
238             }
239 
240             Thread.sleep(1000);
241             retry++;
242           } catch (InterruptedException ex) {
243           }
244         } 
245       }
246 
247       break;
248       default :
249         cleanUp();
250         break;
251     }
252 
253     if (deleteFileOnClose && toWatch != null) {
254       if (log.isDebugEnabled()) {
255         log.debug("About to delete " + toWatch.getAbsolutePath());
256       }
257       if (toWatch.delete()) {
258         if (log.isInfoEnabled()) {
259           log.debug("Successfully deleted " + toWatch.getAbsolutePath());
260         }
261       } else {
262         if (log.isEnabledFor(Level.WARN)) {
263           log.warn("Could not delete " + toWatch.getAbsolutePath() + " (for unknown reason)");
264         }
265       }
266     }
267     
268     log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
269     return fileReadOffset + offsetOfFirstByte;
270   }
271   
272   public String parseArgs(String params) {
273 
274     String[] words = params.split(" ");
275     if (words.length == 2) {
276       if (words[1].equals("delete")) {
277         deleteFileOnClose = true;
278         toWatch = new File(words[0]);
279       } else {
280         offsetOfFirstByte = Long.parseLong(words[0]);
281         toWatch = new File(words[1]);
282       }
283     } else if (words.length == 3) {
284       offsetOfFirstByte = Long.parseLong(words[0]);
285       toWatch = new File(words[1]);
286       deleteFileOnClose = words[2].equals("delete");
287     } else {
288       toWatch = new File(params);
289     }
290     return toWatch.getAbsolutePath();
291   }
292 
293   /**
294    * Extract records from a byte sequence
295    * 
296    * @param eq
297    *          the queue to stick the new chunk[s] in
298    * @param buffOffsetInFile
299    *          the byte offset in the stream at which buf[] begins
300    * @param buf
301    *          the byte buffer to extract records from
302    * @return the number of bytes processed
303    * @throws InterruptedException
304    */
305   protected int extractRecords(final ChunkReceiver eq, long buffOffsetInFile,
306       byte[] buf, long fileTime) throws InterruptedException {
307     final ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
308         buffOffsetInFile + buf.length, buf, this);
309     chunk.addTag("time=\"" + fileTime + "\"");
310     log.info("Adding " + toWatch.getAbsolutePath() + " to the queue");
311     eq.add(chunk);
312     log.info( toWatch.getAbsolutePath() + " added to the queue");
313     return buf.length;
314   }
315 
316   @Override
317   public String getCurrentStatus() {
318     return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
319   }
320 
321 }