This project has retired. For details please refer to its
Attic page.
RCheckFTAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import java.io.File;
21 import java.io.FileFilter;
22 import java.io.IOException;
23 import java.io.RandomAccessFile;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26 import java.util.Collections;
27 import java.util.LinkedList;
28
29
30
31
32
33
34
35 public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
36
37 private static class FPair implements Comparable<FPair> {
38 File f;
39 long mod;
40 FPair(File f) {
41 this.f = f;
42 mod = f.lastModified();
43 }
44
45
46
47 @Override
48 public int compareTo(FPair o) {
49 if(mod < o.mod)
50 return -1;
51 else if (mod > o.mod)
52 return 1;
53
54
55 else return (o.f.getName().compareTo(f.getName()));
56 }
57 }
58
59 long prevFileLastModDate = 0;
60 LinkedList<FPair> fileQ = new LinkedList<FPair>();
61 String fBaseName;
62 File cur;
63
64 boolean caughtUp = false;
65
66
67
68 @Override
69 public String parseArgs(String params) {
70 Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
71 Matcher m = cmd.matcher(params);
72 if (m.matches()) {
73 prevFileLastModDate = Long.parseLong(m.group(1));
74 offsetOfFirstByte = Long.parseLong(m.group(2));
75 toWatch = new File(m.group(3)).getAbsoluteFile();
76 } else {
77 toWatch = new File(params.trim()).getAbsoluteFile();
78 }
79 fBaseName = toWatch.getName();
80 return toWatch.getAbsolutePath();
81 }
82
83 public String getCurrentStatus() {
84 return type.trim() + " d:" + prevFileLastModDate + " " + offsetOfFirstByte + " " + toWatch.getPath();
85 }
86
87 @Override
88 public boolean accept(File pathname) {
89 return pathname.getName().startsWith(fBaseName) &&
90 ( pathname.getName().equals(fBaseName) ||
91 pathname.lastModified() > prevFileLastModDate);
92 }
93
94
95 protected void mkFileQ() {
96
97 File toWatchDir = toWatch.getParentFile();
98 File[] candidates = toWatchDir.listFiles(this);
99 if(candidates == null) {
100 log.error(toWatchDir + " is not a directory in "+adaptorID);
101 } else {
102 log.debug("saw " + candidates.length + " files matching pattern");
103 fileQ = new LinkedList<FPair>();
104 for(File f:candidates)
105 fileQ.add(new FPair(f));
106 Collections.sort(fileQ);
107 }
108 }
109
110 protected void advanceQ() {
111 FPair next = fileQ.poll();
112 if(next != null) {
113 cur = next.f;
114 caughtUp = toWatch.equals(cur);
115 if(caughtUp && !fileQ.isEmpty())
116 log.warn("expected rotation queue to be empty when caught up...");
117 }
118 else {
119 cur = null;
120 caughtUp = true;
121 }
122 }
123
124 @Override
125 public void start(long offset) {
126 mkFileQ();
127 advanceQ();
128 super.start(offset);
129 }
130
131 @Override
132 public synchronized boolean tailFile()
133 throws InterruptedException {
134 boolean hasMoreData = false;
135 try {
136
137 if(caughtUp) {
138
139 mkFileQ();
140 advanceQ();
141 }
142 if(cur == null)
143 return false;
144
145
146 long len = cur.length();
147 long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
148
149 if(log.isDebugEnabled())
150 log.debug(adaptorID + " treating " + cur + " as " + toWatch + " len = " + len);
151
152 if(len < fileReadOffset) {
153 log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
154
155 offsetOfFirstByte += fileReadOffset;
156 fileReadOffset = 0;
157 } else if(len > fileReadOffset) {
158 log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
159 RandomAccessFile reader = new RandomAccessFile(cur, "r");
160 slurp(len, reader);
161 reader.close();
162 } else {
163
164 if (!caughtUp) {
165 prevFileLastModDate = cur.lastModified();
166
167 offsetOfFirstByte += fileReadOffset;
168 fileReadOffset = 0;
169 advanceQ();
170 log.debug("not caught up, and hit EOF. Moving forward in queue to " + cur);
171 } else
172 prevFileLastModDate = tsPreTail;
173
174 }
175
176 } catch(IOException e) {
177 log.warn("IOException in "+adaptorID, e);
178 deregisterAndStop();
179 }
180
181 return hasMoreData;
182 }
183
184
185 public String toString() {
186 return "Rotation-aware Tailer on " + toWatch;
187 }
188 }