This project has retired. For details please refer to its Attic page.
RCheckFTAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.FileFilter;
22  import java.io.IOException;
23  import java.io.RandomAccessFile;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  import java.util.Collections;
27  import java.util.LinkedList;
28  
29  import org.apache.commons.lang3.builder.HashCodeBuilder;
30  
31  /**
32   * Checkpoint state:
33   *   date modified of most-recently tailed file, offset of first byte of that file,
34   *   then regular FTA arts 
35   *
36   */
37  public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
38    
39    private static class FPair implements Comparable<FPair> {
40      File f;
41      long mod;
42      FPair(File f) {
43        this.f = f;
44        mod = f.lastModified();
45      }
46      /**
47       * -1   implies this is LESS THAN o
48       */
49      @Override
50      public int compareTo(FPair o) {
51        if(mod < o.mod)
52          return -1;
53        else if (mod > o.mod)
54          return 1;
55        //want toWatch to be last after a rotation; otherwise, this is basically 
56        //just a heuristic that hasn't been tuned yet
57        else return (o.f.getName().compareTo(f.getName()));
58      }
59      
60      @Override
61      public boolean equals(Object o) {
62        if(o instanceof FPair) {
63          return mod == ((FPair) o).mod;
64        } else {
65          return false;
66        }
67      }
68      
69      @Override
70      public int hashCode() {
71        return new HashCodeBuilder(643, 1321).
72            append(this.mod).
73            toHashCode();
74      }
75    }
76    
77    long prevFileLastModDate = 0;
78    LinkedList<FPair> fileQ = new LinkedList<FPair>(); 
79    String fBaseName;
80    File cur; //this is the actual physical file being watched.  
81              // in contrast, toWatch is the path specified by the user
82    boolean caughtUp = false;
83    /**
84     * Check for date-modified and offset; if absent assume we just got a name.
85     */
86    @Override
87    public String parseArgs(String params) { 
88      Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
89      Matcher m = cmd.matcher(params);
90      if (m.matches()) {
91        prevFileLastModDate = Long.parseLong(m.group(1));
92        offsetOfFirstByte = Long.parseLong(m.group(2));
93        toWatch = new File(m.group(3)).getAbsoluteFile();
94      } else {
95        toWatch = new File(params.trim()).getAbsoluteFile();
96      }
97      fBaseName = toWatch.getName();
98      return toWatch.getAbsolutePath();
99    }
100   
101   public String getCurrentStatus() {
102     return type.trim() + " d:" + prevFileLastModDate + " "  + offsetOfFirstByte + " " + toWatch.getPath();
103   }
104 
105   @Override
106   public boolean accept(File pathname) {
107     return pathname.getName().startsWith(fBaseName) &&
108      ( pathname.getName().equals(fBaseName) ||
109        pathname.lastModified() > prevFileLastModDate);
110   }
111   
112   
113   protected void mkFileQ() {
114     
115     File toWatchDir = toWatch.getParentFile();
116     File[] candidates = toWatchDir.listFiles(this);
117     if(candidates == null) {
118       log.error(toWatchDir + " is not a directory in "+adaptorID);
119     } else {
120       log.debug("saw " + candidates.length + " files matching pattern");
121       fileQ = new LinkedList<FPair>();
122       for(File f:candidates)
123         fileQ.add(new FPair(f));
124       Collections.sort(fileQ);
125     } 
126   }
127   
128   protected void advanceQ() {
129     FPair next = fileQ.poll();
130     if(next != null) {
131       cur = next.f;
132       caughtUp = toWatch.equals(cur);
133       if(caughtUp && !fileQ.isEmpty()) 
134         log.warn("expected rotation queue to be empty when caught up...");
135     }
136     else {
137       cur = null;
138       caughtUp = true;
139     }
140   }
141   
142   @Override
143   public void start(long offset) {
144     mkFileQ(); //figure out what to watch
145     advanceQ();
146     super.start(offset);
147   }
148   
149   @Override
150   public boolean tailFile()
151   throws InterruptedException {
152     boolean hasMoreData = false;
153     try {
154       
155       if(caughtUp) {
156         //we're caught up and watching an unrotated file
157         mkFileQ(); //figure out what to watch
158         advanceQ();
159       }
160       if(cur == null) //file we're watching doesn't exist
161         return false;
162       
163       
164       long len = cur.length();
165       long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
166 
167       if(log.isDebugEnabled())
168         log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
169       
170       if(len < fileReadOffset) {
171         log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
172         //no unseen changes to prev version, since mod date is older than last scan.
173         offsetOfFirstByte += fileReadOffset;
174         fileReadOffset = 0;
175       } else if(len > fileReadOffset) {
176         log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
177         RandomAccessFile reader = new RandomAccessFile(cur, "r");
178         slurp(len, reader);
179         reader.close();
180       } else {
181         //we're either caught up or at EOF
182         if (!caughtUp) {
183           prevFileLastModDate = cur.lastModified();
184           //Hit EOF on an already-rotated file. Move on!
185           offsetOfFirstByte += fileReadOffset;
186           fileReadOffset = 0;
187           advanceQ();
188           log.debug("not caught up, and hit EOF.  Moving forward in queue to " + cur);
189         } else 
190           prevFileLastModDate = tsPreTail;
191 
192       }
193         
194     } catch(IOException e) {
195       log.warn("IOException in "+adaptorID, e);
196       deregisterAndStop();
197     }
198     
199     return hasMoreData;
200   }
201   
202 
203   public String toString() {
204     return "Rotation-aware Tailer on " + toWatch;
205   }
206 }