This project has retired. For details please refer to its
Attic page.
RCheckFTAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import java.io.File;
21 import java.io.FileFilter;
22 import java.io.IOException;
23 import java.io.RandomAccessFile;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26 import java.util.Collections;
27 import java.util.LinkedList;
28
29 import org.apache.commons.lang3.builder.HashCodeBuilder;
30
31
32
33
34
35
36
37 public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
38
39 private static class FPair implements Comparable<FPair> {
40 File f;
41 long mod;
42 FPair(File f) {
43 this.f = f;
44 mod = f.lastModified();
45 }
46
47
48
49 @Override
50 public int compareTo(FPair o) {
51 if(mod < o.mod)
52 return -1;
53 else if (mod > o.mod)
54 return 1;
55
56
57 else return (o.f.getName().compareTo(f.getName()));
58 }
59
60 @Override
61 public boolean equals(Object o) {
62 if(o instanceof FPair) {
63 return mod == ((FPair) o).mod;
64 } else {
65 return false;
66 }
67 }
68
69 @Override
70 public int hashCode() {
71 return new HashCodeBuilder(643, 1321).
72 append(this.mod).
73 toHashCode();
74 }
75 }
76
77 long prevFileLastModDate = 0;
78 LinkedList<FPair> fileQ = new LinkedList<FPair>();
79 String fBaseName;
80 File cur;
81
82 boolean caughtUp = false;
83
84
85
86 @Override
87 public String parseArgs(String params) {
88 Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
89 Matcher m = cmd.matcher(params);
90 if (m.matches()) {
91 prevFileLastModDate = Long.parseLong(m.group(1));
92 offsetOfFirstByte = Long.parseLong(m.group(2));
93 toWatch = new File(m.group(3)).getAbsoluteFile();
94 } else {
95 toWatch = new File(params.trim()).getAbsoluteFile();
96 }
97 fBaseName = toWatch.getName();
98 return toWatch.getAbsolutePath();
99 }
100
101 public String getCurrentStatus() {
102 return type.trim() + " d:" + prevFileLastModDate + " " + offsetOfFirstByte + " " + toWatch.getPath();
103 }
104
105 @Override
106 public boolean accept(File pathname) {
107 return pathname.getName().startsWith(fBaseName) &&
108 ( pathname.getName().equals(fBaseName) ||
109 pathname.lastModified() > prevFileLastModDate);
110 }
111
112
113 protected void mkFileQ() {
114
115 File toWatchDir = toWatch.getParentFile();
116 File[] candidates = toWatchDir.listFiles(this);
117 if(candidates == null) {
118 log.error(toWatchDir + " is not a directory in "+adaptorID);
119 } else {
120 log.debug("saw " + candidates.length + " files matching pattern");
121 fileQ = new LinkedList<FPair>();
122 for(File f:candidates)
123 fileQ.add(new FPair(f));
124 Collections.sort(fileQ);
125 }
126 }
127
128 protected void advanceQ() {
129 FPair next = fileQ.poll();
130 if(next != null) {
131 cur = next.f;
132 caughtUp = toWatch.equals(cur);
133 if(caughtUp && !fileQ.isEmpty())
134 log.warn("expected rotation queue to be empty when caught up...");
135 }
136 else {
137 cur = null;
138 caughtUp = true;
139 }
140 }
141
142 @Override
143 public void start(long offset) {
144 mkFileQ();
145 advanceQ();
146 super.start(offset);
147 }
148
149 @Override
150 public boolean tailFile()
151 throws InterruptedException {
152 boolean hasMoreData = false;
153 try {
154
155 if(caughtUp) {
156
157 mkFileQ();
158 advanceQ();
159 }
160 if(cur == null)
161 return false;
162
163
164 long len = cur.length();
165 long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
166
167 if(log.isDebugEnabled())
168 log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
169
170 if(len < fileReadOffset) {
171 log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
172
173 offsetOfFirstByte += fileReadOffset;
174 fileReadOffset = 0;
175 } else if(len > fileReadOffset) {
176 log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
177 RandomAccessFile reader = new RandomAccessFile(cur, "r");
178 slurp(len, reader);
179 reader.close();
180 } else {
181
182 if (!caughtUp) {
183 prevFileLastModDate = cur.lastModified();
184
185 offsetOfFirstByte += fileReadOffset;
186 fileReadOffset = 0;
187 advanceQ();
188 log.debug("not caught up, and hit EOF. Moving forward in queue to " + cur);
189 } else
190 prevFileLastModDate = tsPreTail;
191
192 }
193
194 } catch(IOException e) {
195 log.warn("IOException in "+adaptorID, e);
196 deregisterAndStop();
197 }
198
199 return hasMoreData;
200 }
201
202
203 public String toString() {
204 return "Rotation-aware Tailer on " + toWatch;
205 }
206 }