This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.FileFilter;
22  import java.io.IOException;
23  import java.io.RandomAccessFile;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  import java.util.Collections;
27  import java.util.LinkedList;
28  
29  
30  
31  
32  
33  
34  
35  public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
36    
37    private static class FPair implements Comparable<FPair> {
38      File f;
39      long mod;
40      FPair(File f) {
41        this.f = f;
42        mod = f.lastModified();
43      }
44      
45  
46  
47      @Override
48      public int compareTo(FPair o) {
49        if(mod < o.mod)
50          return -1;
51        else if (mod > o.mod)
52          return 1;
53        
54        
55        else return (o.f.getName().compareTo(f.getName()));
56      }
57    }
58    
59    long prevFileLastModDate = 0;
60    LinkedList<FPair> fileQ = new LinkedList<FPair>(); 
61    String fBaseName;
62    File cur; 
63              
64    boolean caughtUp = false;
65    
66  
67  
68    @Override
69    public String parseArgs(String params) { 
70      Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
71      Matcher m = cmd.matcher(params);
72      if (m.matches()) {
73        prevFileLastModDate = Long.parseLong(m.group(1));
74        offsetOfFirstByte = Long.parseLong(m.group(2));
75        toWatch = new File(m.group(3)).getAbsoluteFile();
76      } else {
77        toWatch = new File(params.trim()).getAbsoluteFile();
78      }
79      fBaseName = toWatch.getName();
80      return toWatch.getAbsolutePath();
81    }
82    
83    public String getCurrentStatus() {
84      return type.trim() + " d:" + prevFileLastModDate + " "  + offsetOfFirstByte + " " + toWatch.getPath();
85    }
86  
87    @Override
88    public boolean accept(File pathname) {
89      return pathname.getName().startsWith(fBaseName) &&
90       ( pathname.getName().equals(fBaseName) ||
91         pathname.lastModified() > prevFileLastModDate);
92    }
93    
94    
95    protected void mkFileQ() {
96      
97      File toWatchDir = toWatch.getParentFile();
98      File[] candidates = toWatchDir.listFiles(this);
99      if(candidates == null) {
100       log.error(toWatchDir + " is not a directory in "+adaptorID);
101     } else {
102       log.debug("saw " + candidates.length + " files matching pattern");
103       fileQ = new LinkedList<FPair>();
104       for(File f:candidates)
105         fileQ.add(new FPair(f));
106       Collections.sort(fileQ);
107     } 
108   }
109   
110   protected void advanceQ() {
111     FPair next = fileQ.poll();
112     if(next != null) {
113       cur = next.f;
114       caughtUp = toWatch.equals(cur);
115       if(caughtUp && !fileQ.isEmpty()) 
116         log.warn("expected rotation queue to be empty when caught up...");
117     }
118     else {
119       cur = null;
120       caughtUp = true;
121     }
122   }
123   
124   @Override
125   public void start(long offset) {
126     mkFileQ(); 
127     advanceQ();
128     super.start(offset);
129   }
130   
131   @Override
132   public synchronized boolean tailFile()
133   throws InterruptedException {
134     boolean hasMoreData = false;
135     try {
136       
137       if(caughtUp) {
138         
139         mkFileQ(); 
140         advanceQ();
141       }
142       if(cur == null) 
143         return false;
144       
145       
146       long len = cur.length();
147       long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
148 
149       if(log.isDebugEnabled())
150         log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
151       
152       if(len < fileReadOffset) {
153         log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
154         
155         offsetOfFirstByte += fileReadOffset;
156         fileReadOffset = 0;
157       } else if(len > fileReadOffset) {
158         log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
159         RandomAccessFile reader = new RandomAccessFile(cur, "r");
160         slurp(len, reader);
161         reader.close();
162       } else {
163         
164         if (!caughtUp) {
165           prevFileLastModDate = cur.lastModified();
166           
167           offsetOfFirstByte += fileReadOffset;
168           fileReadOffset = 0;
169           advanceQ();
170           log.debug("not caught up, and hit EOF.  Moving forward in queue to " + cur);
171         } else 
172           prevFileLastModDate = tsPreTail;
173 
174       }
175         
176     } catch(IOException e) {
177       log.warn("IOException in "+adaptorID, e);
178       deregisterAndStop();
179     }
180     
181     return hasMoreData;
182   }
183   
184 
185   public String toString() {
186     return "Rotation-aware Tailer on " + toWatch;
187   }
188 }