This project has retired. For details please refer to its
Attic page.
FileTailingAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.io.File;
24 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
25 import org.apache.hadoop.chukwa.util.ExceptionUtil;
26
27
28
29
30
31
32
33 public class FileTailingAdaptor extends LWFTAdaptor {
34
35
36 public static int MAX_RETRIES = 300;
37 public static int GRACEFUL_PERIOD = 3 * 60 * 1000;
38
39 private int attempts = 0;
40 private long gracefulPeriodExpired = 0l;
41 private boolean adaptorInError = false;
42
43 protected RandomAccessFile reader = null;
44
45 public void start(long bytes) {
46 super.start(bytes);
47 log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
48 this.attempts = 0;
49
50 log.info("started file tailer " + adaptorID + " on file " + toWatch
51 + " with first byte at offset " + offsetOfFirstByte);
52 }
53
54
55 @Override
56 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
57
58 log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
59
60 switch(shutdownPolicy) {
61 case GRACEFULLY :
62 case WAIT_TILL_FINISHED :{
63 if (toWatch.exists()) {
64 int retry = 0;
65 tailer.stopWatchingFile(this);
66 TerminatorThread lastTail = new TerminatorThread(this);
67 lastTail.setDaemon(true);
68 lastTail.start();
69
70 if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
71 while (lastTail.isAlive() && retry < 60) {
72 try {
73 log.info("GRACEFULLY Retry:" + retry);
74 Thread.sleep(1000);
75 retry++;
76 } catch (InterruptedException ex) {
77 }
78 }
79 } else {
80 while (lastTail.isAlive()) {
81 try {
82 if (retry%100 == 0) {
83 log.info("WAIT_TILL_FINISHED Retry:" + retry);
84 }
85 Thread.sleep(1000);
86 retry++;
87 } catch (InterruptedException ex) {
88 }
89 }
90 }
91 }
92 }
93 break;
94
95 case HARD_STOP:
96 default:
97 tailer.stopWatchingFile(this);
98 try {
99 if (reader != null) {
100 reader.close();
101 }
102 reader = null;
103 } catch(Throwable e) {
104 log.warn("Exception while closing reader:",e);
105 }
106 break;
107 }
108 log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
109 return fileReadOffset + offsetOfFirstByte;
110 }
111
112
113
114
115
116
117
118
119
120 @Override
121 public synchronized boolean tailFile()
122 throws InterruptedException {
123 boolean hasMoreData = false;
124
125 try {
126 if ((adaptorInError == true)
127 && (System.currentTimeMillis() > gracefulPeriodExpired)) {
128 if (!toWatch.exists()) {
129 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
130 + "| File does not exist: " + toWatch.getAbsolutePath()
131 + ", streaming policy expired. File removed from streaming.");
132 } else if (!toWatch.canRead()) {
133 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
134 + "| File cannot be read: " + toWatch.getAbsolutePath()
135 + ", streaming policy expired. File removed from streaming.");
136 } else {
137
138 adaptorInError = false;
139 gracefulPeriodExpired = 0L;
140 attempts = 0;
141 return false;
142 }
143
144 deregisterAndStop();
145 return false;
146 } else if (!toWatch.exists() || !toWatch.canRead()) {
147 if (adaptorInError == false) {
148 long now = System.currentTimeMillis();
149 gracefulPeriodExpired = now + GRACEFUL_PERIOD;
150 adaptorInError = true;
151 attempts = 0;
152 log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
153 + ", graceful period will Expire at now:" + now + " + "
154 + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
155 } else if (attempts % 10 == 0) {
156 log.info("failed to stream data for: " + toWatch.getAbsolutePath()
157 + ", attempt: " + attempts);
158 }
159
160 attempts++;
161 return false;
162 }
163
164 if (reader == null) {
165 reader = new RandomAccessFile(toWatch, "r");
166 log.info("Adaptor|" + adaptorID
167 + "|Opening the file for the first time|seek|" + fileReadOffset);
168 }
169
170 long len = 0L;
171 try {
172 len = reader.length();
173 if (lastSlurpTime == 0) {
174 lastSlurpTime = System.currentTimeMillis();
175 }
176 if (offsetOfFirstByte > fileReadOffset) {
177
178
179 fileReadOffset = 0;
180 offsetOfFirstByte = 0L;
181 log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
182 }
183 if (len == fileReadOffset) {
184 File fixedNameFile = new File(toWatch.getAbsolutePath());
185 long fixedNameLastModified = fixedNameFile.lastModified();
186 if (fixedNameLastModified > lastSlurpTime) {
187
188
189
190
191
192 boolean hasLeftData = true;
193 while (hasLeftData) {
194 hasLeftData = slurp(len, reader);
195 }
196 RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
197 if (reader != null) {
198 reader.close();
199 }
200 reader = newReader;
201 fileReadOffset = 0L;
202 len = reader.length();
203 log.debug("Adaptor|" + adaptorID
204 + "| File size mismatched, rotating: "
205 + toWatch.getAbsolutePath());
206 hasMoreData = slurp(len, reader);
207 }
208 } else if (len < fileReadOffset) {
209
210 if (reader != null) {
211 reader.close();
212 }
213 reader = null;
214 fileReadOffset = 0L;
215 offsetOfFirstByte = 0L;
216 hasMoreData = true;
217 log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
218 + ", has rotated and no detection - reset counters to 0L");
219 } else {
220 hasMoreData = slurp(len, reader);
221 }
222 } catch (IOException e) {
223
224 }
225 } catch (IOException e) {
226 log.warn("failure reading " + toWatch, e);
227 }
228 attempts = 0;
229 adaptorInError = false;
230 return hasMoreData;
231 }
232
233
234 }