This project has retired. For details please refer to its
Attic page.
FileTailingAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.io.File;
24 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
25
26
27
28
29
30
31
32 public class FileTailingAdaptor extends LWFTAdaptor {
33
34
35 public static int MAX_RETRIES = 300;
36 static int GRACEFUL_PERIOD = 3 * 60 * 1000;
37
38 private int attempts = 0;
39 private long gracefulPeriodExpired = 0l;
40 private boolean adaptorInError = false;
41
42 protected RandomAccessFile reader = null;
43
44 public void start(long bytes) {
45 super.start(bytes);
46 log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
47 this.attempts = 0;
48
49 log.info("started file tailer " + adaptorID + " on file " + toWatch
50 + " with first byte at offset " + offsetOfFirstByte);
51 }
52
53
54 @Override
55 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
56
57 log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
58
59 switch(shutdownPolicy) {
60 case GRACEFULLY :
61 case WAIT_TILL_FINISHED :{
62 if (toWatch.exists()) {
63 int retry = 0;
64 tailer.stopWatchingFile(this);
65 TerminatorThread lastTail = new TerminatorThread(this);
66 lastTail.setDaemon(true);
67 lastTail.start();
68
69 if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
70 while (lastTail.isAlive() && retry < 60) {
71 try {
72 log.info("GRACEFULLY Retry:" + retry);
73 Thread.sleep(1000);
74 retry++;
75 } catch (InterruptedException ex) {
76 }
77 }
78 } else {
79 while (lastTail.isAlive()) {
80 try {
81 if (retry%100 == 0) {
82 log.info("WAIT_TILL_FINISHED Retry:" + retry);
83 }
84 Thread.sleep(1000);
85 retry++;
86 } catch (InterruptedException ex) {
87 }
88 }
89 }
90 }
91 }
92 break;
93
94 case HARD_STOP:
95 default:
96 tailer.stopWatchingFile(this);
97 try {
98 if (reader != null) {
99 reader.close();
100 }
101 reader = null;
102 } catch(Throwable e) {
103 log.warn("Exception while closing reader:",e);
104 }
105 break;
106 }
107 log.info("Exit Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
108 return fileReadOffset + offsetOfFirstByte;
109 }
110
111
112
113
114
115
116
117 @Override
118 public boolean tailFile()
119 throws InterruptedException {
120 boolean hasMoreData = false;
121
122 try {
123 if ((adaptorInError == true)
124 && (System.currentTimeMillis() > gracefulPeriodExpired)) {
125 if (!toWatch.exists()) {
126 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
127 + "| File does not exist: " + toWatch.getAbsolutePath()
128 + ", streaming policy expired. File removed from streaming.");
129 } else if (!toWatch.canRead()) {
130 log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
131 + "| File cannot be read: " + toWatch.getAbsolutePath()
132 + ", streaming policy expired. File removed from streaming.");
133 } else {
134
135 adaptorInError = false;
136 gracefulPeriodExpired = 0L;
137 attempts = 0;
138 return false;
139 }
140
141 deregisterAndStop();
142 return false;
143 } else if (!toWatch.exists() || !toWatch.canRead()) {
144 if (adaptorInError == false) {
145 long now = System.currentTimeMillis();
146 gracefulPeriodExpired = now + GRACEFUL_PERIOD;
147 adaptorInError = true;
148 attempts = 0;
149 log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
150 + ", graceful period will Expire at now:" + now + " + "
151 + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
152 } else if (attempts % 10 == 0) {
153 log.info("failed to stream data for: " + toWatch.getAbsolutePath()
154 + ", attempt: " + attempts);
155 }
156
157 attempts++;
158 return false;
159 }
160
161 if (reader == null) {
162 reader = new RandomAccessFile(toWatch, "r");
163 log.info("Adaptor|" + adaptorID
164 + "|Opening the file for the first time|seek|" + fileReadOffset);
165 }
166
167 long len = 0L;
168 try {
169 len = reader.length();
170 if (lastSlurpTime == 0) {
171 lastSlurpTime = System.currentTimeMillis();
172 }
173 if (offsetOfFirstByte > fileReadOffset) {
174
175
176 fileReadOffset = 0;
177 offsetOfFirstByte = 0L;
178 log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
179 }
180 if (len == fileReadOffset) {
181 File fixedNameFile = new File(toWatch.getAbsolutePath());
182 long fixedNameLastModified = fixedNameFile.lastModified();
183 if (fixedNameLastModified > lastSlurpTime) {
184
185
186
187
188
189 boolean hasLeftData = true;
190 while (hasLeftData) {
191 hasLeftData = slurp(len, reader);
192 }
193 RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
194 if (reader != null) {
195 reader.close();
196 }
197 reader = newReader;
198 fileReadOffset = 0L;
199 len = reader.length();
200 log.debug("Adaptor|" + adaptorID
201 + "| File size mismatched, rotating: "
202 + toWatch.getAbsolutePath());
203 hasMoreData = slurp(len, reader);
204 }
205 } else if (len < fileReadOffset) {
206
207 if (reader != null) {
208 reader.close();
209 }
210 reader = null;
211 fileReadOffset = 0L;
212 offsetOfFirstByte = 0L;
213 hasMoreData = true;
214 log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
215 + ", has rotated and no detection - reset counters to 0L");
216 } else {
217 hasMoreData = slurp(len, reader);
218 }
219 } catch (IOException e) {
220
221 }
222 } catch (IOException e) {
223 log.warn("failure reading " + toWatch, e);
224 }
225 attempts = 0;
226 adaptorInError = false;
227 return hasMoreData;
228 }
229
230
231 }