This project has retired. For details please refer to its Attic page.
RCheckFTAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.FileFilter;
22  import java.io.IOException;
23  import java.io.RandomAccessFile;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  import java.util.Collections;
27  import java.util.LinkedList;
28  
29  /**
30   * Checkpoint state:
31   *   date modified of most-recently tailed file, offset of first byte of that file,
32   *   then regular FTA arts 
33   *
34   */
35  public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
36    
37    private static class FPair implements Comparable<FPair> {
38      File f;
39      long mod;
40      FPair(File f) {
41        this.f = f;
42        mod = f.lastModified();
43      }
44      /**
45       * -1   implies this is LESS THAN o
46       */
47      @Override
48      public int compareTo(FPair o) {
49        if(mod < o.mod)
50          return -1;
51        else if (mod > o.mod)
52          return 1;
53        //want toWatch to be last after a rotation; otherwise, this is basically 
54        //just a heuristic that hasn't been tuned yet
55        else return (o.f.getName().compareTo(f.getName()));
56      }
57    }
58    
59    long prevFileLastModDate = 0;
60    LinkedList<FPair> fileQ = new LinkedList<FPair>(); 
61    String fBaseName;
62    File cur; //this is the actual physical file being watched.  
63              // in contrast, toWatch is the path specified by the user
64    boolean caughtUp = false;
65    /**
66     * Check for date-modified and offset; if absent assume we just got a name.
67     */
68    @Override
69    public String parseArgs(String params) { 
70      Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
71      Matcher m = cmd.matcher(params);
72      if (m.matches()) {
73        prevFileLastModDate = Long.parseLong(m.group(1));
74        offsetOfFirstByte = Long.parseLong(m.group(2));
75        toWatch = new File(m.group(3)).getAbsoluteFile();
76      } else {
77        toWatch = new File(params.trim()).getAbsoluteFile();
78      }
79      fBaseName = toWatch.getName();
80      return toWatch.getAbsolutePath();
81    }
82    
83    public String getCurrentStatus() {
84      return type.trim() + " d:" + prevFileLastModDate + " "  + offsetOfFirstByte + " " + toWatch.getPath();
85    }
86  
87    @Override
88    public boolean accept(File pathname) {
89      return pathname.getName().startsWith(fBaseName) &&
90       ( pathname.getName().equals(fBaseName) ||
91         pathname.lastModified() > prevFileLastModDate);
92    }
93    
94    
95    protected void mkFileQ() {
96      
97      File toWatchDir = toWatch.getParentFile();
98      File[] candidates = toWatchDir.listFiles(this);
99      if(candidates == null) {
100       log.error(toWatchDir + " is not a directory in "+adaptorID);
101     } else {
102       log.debug("saw " + candidates.length + " files matching pattern");
103       fileQ = new LinkedList<FPair>();
104       for(File f:candidates)
105         fileQ.add(new FPair(f));
106       Collections.sort(fileQ);
107     } 
108   }
109   
110   protected void advanceQ() {
111     FPair next = fileQ.poll();
112     if(next != null) {
113       cur = next.f;
114       caughtUp = toWatch.equals(cur);
115       if(caughtUp && !fileQ.isEmpty()) 
116         log.warn("expected rotation queue to be empty when caught up...");
117     }
118     else {
119       cur = null;
120       caughtUp = true;
121     }
122   }
123   
124   @Override
125   public void start(long offset) {
126     mkFileQ(); //figure out what to watch
127     advanceQ();
128     super.start(offset);
129   }
130   
131   @Override
132   public synchronized boolean tailFile()
133   throws InterruptedException {
134     boolean hasMoreData = false;
135     try {
136       
137       if(caughtUp) {
138         //we're caught up and watching an unrotated file
139         mkFileQ(); //figure out what to watch
140         advanceQ();
141       }
142       if(cur == null) //file we're watching doesn't exist
143         return false;
144       
145       
146       long len = cur.length();
147       long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
148 
149       if(log.isDebugEnabled())
150         log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
151       
152       if(len < fileReadOffset) {
153         log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
154         //no unseen changes to prev version, since mod date is older than last scan.
155         offsetOfFirstByte += fileReadOffset;
156         fileReadOffset = 0;
157       } else if(len > fileReadOffset) {
158         log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
159         RandomAccessFile reader = new RandomAccessFile(cur, "r");
160         slurp(len, reader);
161         reader.close();
162       } else {
163         //we're either caught up or at EOF
164         if (!caughtUp) {
165           prevFileLastModDate = cur.lastModified();
166           //Hit EOF on an already-rotated file. Move on!
167           offsetOfFirstByte += fileReadOffset;
168           fileReadOffset = 0;
169           advanceQ();
170           log.debug("not caught up, and hit EOF.  Moving forward in queue to " + cur);
171         } else 
172           prevFileLastModDate = tsPreTail;
173 
174       }
175         
176     } catch(IOException e) {
177       log.warn("IOException in "+adaptorID, e);
178       deregisterAndStop();
179     }
180     
181     return hasMoreData;
182   }
183   
184 
185   public String toString() {
186     return "Rotation-aware Tailer on " + toWatch;
187   }
188 }