This project has retired. For details please refer to its
Attic page.
FileAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.File;
22 import java.io.RandomAccessFile;
23 import java.util.List;
24 import java.util.concurrent.CopyOnWriteArrayList;
25
26 import org.apache.hadoop.chukwa.ChunkImpl;
27 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
28 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.chukwa.util.ExceptionUtil;
31 import org.apache.log4j.Level;
32 import org.apache.log4j.Logger;
33
34
35 class FileAdaptorTailer extends Thread {
36 static Logger log = Logger.getLogger(FileAdaptorTailer.class);
37 private List<FileAdaptor> adaptors = null;
38 private static Configuration conf = null;
39 private Object lock = new Object();
40
41
42
43
44 int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 10;
45 int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
46
47
48 public FileAdaptorTailer() {
49
50 if (conf == null) {
51 ChukwaAgent agent = ChukwaAgent.getAgent();
52 if (agent != null) {
53 conf = agent.getConfiguration();
54 if (conf != null) {
55 SAMPLE_PERIOD_MS = conf.getInt(
56 "chukwaAgent.adaptor.context.switch.time",
57 DEFAULT_SAMPLE_PERIOD_MS);
58 }
59 }
60 }
61
62
63 adaptors = new CopyOnWriteArrayList<FileAdaptor>();
64
65 setDaemon(true);
66 start();
67 }
68 @Override
69 public void run() {
70 while(true) {
71 try {
72
73 while (adaptors.size() == 0) {
74 synchronized (lock) {
75 try {
76 log.info("Waiting queue is empty");
77 lock.wait();
78 } catch (InterruptedException e) {
79
80 }
81 }
82 }
83
84 long startTime = System.currentTimeMillis();
85 for (FileAdaptor adaptor: adaptors) {
86 log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
87 adaptor.sendFile();
88 }
89
90 long timeToReadFiles = System.currentTimeMillis() - startTime;
91 if (timeToReadFiles < SAMPLE_PERIOD_MS) {
92 Thread.sleep(SAMPLE_PERIOD_MS);
93 }
94
95 }catch (Throwable e) {
96 log.warn("Exception in FileAdaptorTailer:",e);
97 }
98 }
99 }
100
101 public void addFileAdaptor(FileAdaptor adaptor) {
102 adaptors.add(adaptor);
103 synchronized (lock) {
104 lock.notifyAll();
105 }
106 }
107
108 public void removeFileAdaptor(FileAdaptor adaptor) {
109 adaptors.remove(adaptor);
110 }
111 }
112
113
114
115
116 public class FileAdaptor extends AbstractAdaptor {
117
118 static Logger log = Logger.getLogger(FileAdaptor.class);
119 static FileAdaptorTailer tailer = null;
120
121 static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
122 static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
123
124 static {
125 tailer = new FileAdaptorTailer();
126 }
127
128 private long startTime = 0;
129 private long timeOut = 0;
130
131 protected volatile boolean finished = false;
132 File toWatch;
133 protected RandomAccessFile reader = null;
134 protected long fileReadOffset;
135 protected boolean deleteFileOnClose = false;
136 protected boolean shutdownCalled = false;
137
138
139
140
141 private long offsetOfFirstByte = 0;
142
143 public void start(long bytes) {
144
145 log.info("adaptor id: " + adaptorID + " started file adaptor on file "
146 + toWatch);
147 this.startTime = System.currentTimeMillis();
148 TIMEOUT_PERIOD = control.getConfiguration().getInt(
149 "chukwaAgent.adaptor.fileadaptor.timeoutperiod",
150 DEFAULT_TIMEOUT_PERIOD);
151 this.timeOut = startTime + TIMEOUT_PERIOD;
152
153 tailer.addFileAdaptor(this);
154 }
155
156 void sendFile() {
157 long now = System.currentTimeMillis() ;
158 long oneMinAgo = now - (60*1000);
159 if (toWatch.exists()) {
160 if (toWatch.lastModified() > oneMinAgo && now < timeOut) {
161 log.info("Last modified time less than one minute, keep waiting");
162 return;
163 } else {
164 try {
165
166 long bufSize = toWatch.length();
167 byte[] buf = new byte[(int) bufSize];
168
169 reader = new RandomAccessFile(toWatch, "r");
170 reader.read(buf);
171 reader.close();
172 reader = null;
173
174 long fileTime = toWatch.lastModified();
175 int bytesUsed = extractRecords(dest, 0, buf, fileTime);
176 this.fileReadOffset = bytesUsed;
177 finished = true;
178 deregisterAndStop();
179 cleanUp();
180 } catch(Exception e) {
181 log.warn("Exception while trying to read: " + toWatch.getAbsolutePath(),e);
182 } finally {
183 if (reader != null) {
184 try {
185 reader.close();
186 } catch (Exception e) {
187 log.debug(ExceptionUtil.getStackTrace(e));
188 }
189 reader = null;
190 }
191 }
192 }
193 } else {
194 if (now > timeOut) {
195 finished = true;
196 log.warn("Couldn't read this file: " + toWatch.getAbsolutePath());
197 deregisterAndStop();
198 cleanUp() ;
199 }
200 }
201 }
202
203 private void cleanUp() {
204 tailer.removeFileAdaptor(this);
205 if (reader != null) {
206 try {
207 reader.close();
208 } catch (Exception e) {
209 log.debug(ExceptionUtil.getStackTrace(e));
210 }
211 reader = null;
212 }
213 }
214
215
216 @Override
217 public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
218 log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
219 switch(shutdownPolicy) {
220 case GRACEFULLY : {
221 int retry = 0;
222 while (!finished && retry < 60) {
223 try {
224 log.info("GRACEFULLY Retry:" + retry);
225 Thread.sleep(1000);
226 retry++;
227 } catch (InterruptedException ex) {
228 }
229 }
230 }
231 break;
232 case WAIT_TILL_FINISHED : {
233 int retry = 0;
234 while (!finished) {
235 try {
236 if (retry%100 == 0) {
237 log.info("WAIT_TILL_FINISHED Retry:" + retry);
238 }
239
240 Thread.sleep(1000);
241 retry++;
242 } catch (InterruptedException ex) {
243 }
244 }
245 }
246
247 break;
248 default :
249 cleanUp();
250 break;
251 }
252
253 if (deleteFileOnClose && toWatch != null) {
254 if (log.isDebugEnabled()) {
255 log.debug("About to delete " + toWatch.getAbsolutePath());
256 }
257 if (toWatch.delete()) {
258 if (log.isInfoEnabled()) {
259 log.debug("Successfully deleted " + toWatch.getAbsolutePath());
260 }
261 } else {
262 if (log.isEnabledFor(Level.WARN)) {
263 log.warn("Could not delete " + toWatch.getAbsolutePath() + " (for unknown reason)");
264 }
265 }
266 }
267
268 log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
269 return fileReadOffset + offsetOfFirstByte;
270 }
271
272 public String parseArgs(String params) {
273
274 String[] words = params.split(" ");
275 if (words.length == 2) {
276 if (words[1].equals("delete")) {
277 deleteFileOnClose = true;
278 toWatch = new File(words[0]);
279 } else {
280 offsetOfFirstByte = Long.parseLong(words[0]);
281 toWatch = new File(words[1]);
282 }
283 } else if (words.length == 3) {
284 offsetOfFirstByte = Long.parseLong(words[0]);
285 toWatch = new File(words[1]);
286 deleteFileOnClose = words[2].equals("delete");
287 } else {
288 toWatch = new File(params);
289 }
290 return toWatch.getAbsolutePath();
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304
305 protected int extractRecords(final ChunkReceiver eq, long buffOffsetInFile,
306 byte[] buf, long fileTime) throws InterruptedException {
307 final ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
308 buffOffsetInFile + buf.length, buf, this);
309 chunk.addTag("time=\"" + fileTime + "\"");
310 log.info("Adding " + toWatch.getAbsolutePath() + " to the queue");
311 eq.add(chunk);
312 log.info( toWatch.getAbsolutePath() + " added to the queue");
313 return buf.length;
314 }
315
316 @Override
317 public String getCurrentStatus() {
318 return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
319 }
320
321 }