This project has retired. For details please refer to its
Attic page.
FileTailingAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.io.File;
24 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
25
26
27
28
29
30
31
32 public class FileTailingAdaptor extends LWFTAdaptor {
33
34
35 public static int MAX_RETRIES = 300;
36 static int GRACEFUL_PERIOD = 3 * 60 * 1000;
37
38 private int attempts = 0;
39 private long gracefulPeriodExpired = 0l;
40 private boolean adaptorInError = false;
41
42 protected RandomAccessFile reader = null;
43
44 public void start(long bytes) {
45 super.start(bytes);
46 log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
47 this.attempts = 0;
48
49 log.info("started file tailer " + adaptorID + " on file " + toWatch
50 + " with first byte at offset " + offsetOfFirstByte);
51 }
52
53
54 @Override
55 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
56
57 log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
58
59 switch(shutdownPolicy) {
60 case GRACEFULLY :
61 case WAIT_TILL_FINISHED :{
62 if (toWatch.exists()) {
63 int retry = 0;
64 tailer.stopWatchingFile(this);
65 TerminatorThread lastTail = new TerminatorThread(this);
66 lastTail.setDaemon(true);
67 lastTail.start();
68
69 if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
70 while (lastTail.isAlive() && retry < 60) {
71 try {
72 log.info("GRACEFULLY Retry:" + retry);
73 Thread.sleep(1000);
74 retry++;
75 } catch (InterruptedException ex) {
76 }
77 }
78 } else {
79 while (lastTail.isAlive()) {
80 try {
81 if (retry%100 == 0) {
82 log.info("WAIT_TILL_FINISHED Retry:" + retry);
83 }
84 Thread.sleep(1000);
85 retry++;
86 } catch (InterruptedException ex) {
87 }
88 }
89 }
90 }
91 }
92 break;
93
94 case HARD_STOP:
95 default:
96 tailer.stopWatchingFile(this);
97 try {
98 if (reader != null) {
99 reader.close();
100 }
101 reader = null;
102 } catch(Throwable e) {
103 log.warn("Exception while closing reader:",e);
104 }
105 break;
106 }
107 log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
108 return fileReadOffset + offsetOfFirstByte;
109 }
110
111
112
113
114
115
116
117
118 @Override
119 public boolean tailFile()
120 throws InterruptedException {
121 boolean hasMoreData = false;
122
123 try {
124 if ((adaptorInError == true)
125 && (System.currentTimeMillis() > gracefulPeriodExpired)) {
126 if (!toWatch.exists()) {
127 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
128 + "| File does not exist: " + toWatch.getAbsolutePath()
129 + ", streaming policy expired. File removed from streaming.");
130 } else if (!toWatch.canRead()) {
131 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
132 + "| File cannot be read: " + toWatch.getAbsolutePath()
133 + ", streaming policy expired. File removed from streaming.");
134 } else {
135
136 adaptorInError = false;
137 gracefulPeriodExpired = 0L;
138 attempts = 0;
139 return false;
140 }
141
142 deregisterAndStop();
143 return false;
144 } else if (!toWatch.exists() || !toWatch.canRead()) {
145 if (adaptorInError == false) {
146 long now = System.currentTimeMillis();
147 gracefulPeriodExpired = now + GRACEFUL_PERIOD;
148 adaptorInError = true;
149 attempts = 0;
150 log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
151 + ", graceful period will Expire at now:" + now + " + "
152 + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
153 } else if (attempts % 10 == 0) {
154 log.info("failed to stream data for: " + toWatch.getAbsolutePath()
155 + ", attempt: " + attempts);
156 }
157
158 attempts++;
159 return false;
160 }
161
162 if (reader == null) {
163 reader = new RandomAccessFile(toWatch, "r");
164 log.info("Adaptor|" + adaptorID
165 + "|Opening the file for the first time|seek|" + fileReadOffset);
166 }
167
168 long len = 0L;
169 try {
170 len = reader.length();
171 if (lastSlurpTime == 0) {
172 lastSlurpTime = System.currentTimeMillis();
173 }
174 if (offsetOfFirstByte > fileReadOffset) {
175
176
177 fileReadOffset = 0;
178 offsetOfFirstByte = 0L;
179 log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
180 }
181 if (len == fileReadOffset) {
182 File fixedNameFile = new File(toWatch.getAbsolutePath());
183 long fixedNameLastModified = fixedNameFile.lastModified();
184 if (fixedNameLastModified > lastSlurpTime) {
185
186
187
188
189
190 boolean hasLeftData = true;
191 while (hasLeftData) {
192 hasLeftData = slurp(len, reader);
193 }
194 RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
195 if (reader != null) {
196 reader.close();
197 }
198 reader = newReader;
199 fileReadOffset = 0L;
200 len = reader.length();
201 log.debug("Adaptor|" + adaptorID
202 + "| File size mismatched, rotating: "
203 + toWatch.getAbsolutePath());
204 hasMoreData = slurp(len, reader);
205 }
206 } else if (len < fileReadOffset) {
207
208 if (reader != null) {
209 reader.close();
210 }
211 reader = null;
212 fileReadOffset = 0L;
213 offsetOfFirstByte = 0L;
214 hasMoreData = true;
215 log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
216 + ", has rotated and no detection - reset counters to 0L");
217 } else {
218 hasMoreData = slurp(len, reader);
219 }
220 } catch (IOException e) {
221
222 }
223 } catch (IOException e) {
224 log.warn("failure reading " + toWatch, e);
225 }
226 attempts = 0;
227 adaptorInError = false;
228 return hasMoreData;
229 }
230
231
232 }