This project has retired. For details please refer to its
Attic page.
SeqFileWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.net.InetAddress;
23 import java.net.URI;
24 import java.net.UnknownHostException;
25 import java.util.Calendar;
26 import java.util.List;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.io.IOException;
32
33 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
34 import org.apache.hadoop.chukwa.Chunk;
35 import org.apache.hadoop.chukwa.ChunkImpl;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.io.SequenceFile;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48 public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
49 static Logger log = Logger.getLogger(SeqFileWriter.class);
50 private static boolean ENABLE_ROTATION_ON_CLOSE = true;
51
52 protected int STAT_INTERVAL_SECONDS = 30;
53 private int rotateInterval = 1000 * 60 * 5;
54 private int offsetInterval = 1000 * 30;
55 private boolean if_fixed_interval = false;
56 static final int ACQ_WAIT_ON_TERM = 500;
57
58 public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
59 public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
60 public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
61 public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
62 public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
63 public String localHostAddr = null;
64
65 protected final Semaphore lock = new Semaphore(1, true);
66
67 protected FileSystem fs = null;
68 protected Configuration conf = null;
69
70 protected String outputDir = null;
71 private Calendar calendar = Calendar.getInstance();
72
73 protected Path currentPath = null;
74 protected String currentFileName = null;
75 protected FSDataOutputStream currentOutputStr = null;
76 protected SequenceFile.Writer seqFileWriter = null;
77
78 protected long timePeriod = -1;
79 protected long nextTimePeriodComputation = -1;
80
81 protected Timer rotateTimer = null;
82 protected Timer statTimer = null;
83
84 protected volatile long dataSize = 0;
85 protected volatile long bytesThisRotate = 0;
86 protected volatile boolean isRunning = false;
87
88 public SeqFileWriter() {
89 try {
90 localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
91 } catch (UnknownHostException e) {
92 localHostAddr = "-NA-";
93 }
94 }
95
96 public long getBytesWritten() {
97 return dataSize;
98 }
99
100 public void init(Configuration conf) throws WriterException {
101 outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
102
103 this.conf = conf;
104
105 rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
106 if_fixed_interval = conf.getBoolean(IF_FIXED_INTERVAL_OPT,if_fixed_interval);
107 offsetInterval = conf.getInt(FIXED_INTERVAL_OFFSET_OPT,offsetInterval);
108
109 STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
110
111
112 String fsname = conf.get("writer.hdfs.filesystem");
113 if (fsname == null || fsname.equals("")) {
114
115 fsname = conf.get("fs.defaultFS");
116 }
117
118 log.info("rotateInterval is " + rotateInterval);
119 if(if_fixed_interval)
120 log.info("using fixed time interval scheme, " +
121 "offsetInterval is " + offsetInterval);
122 else
123 log.info("not using fixed time interval scheme");
124 log.info("outputDir is " + outputDir);
125 log.info("fsname is " + fsname);
126 log.info("filesystem type from core-default.xml is "
127 + conf.get("fs.hdfs.impl"));
128
129 if (fsname == null) {
130 log.error("no filesystem name");
131 throw new WriterException("no filesystem");
132 }
133 try {
134 fs = FileSystem.get(new URI(fsname), conf);
135 if (fs == null) {
136 log.error("can't connect to HDFS.");
137 }
138 } catch (Throwable e) {
139 log.error(
140 "can't connect to HDFS, trying default file system instead (likely to be local)",
141 e);
142 }
143
144
145
146 isRunning = true;
147 rotate();
148
149 statTimer = new Timer();
150 statTimer.schedule(new StatReportingTask(), 1000,
151 STAT_INTERVAL_SECONDS * 1000);
152
153 }
154
155 public class StatReportingTask extends TimerTask {
156 private long lastTs = System.currentTimeMillis();
157
158 public void run() {
159
160 long time = System.currentTimeMillis();
161 long currentDs = dataSize;
162 dataSize = 0;
163
164 long interval = time - lastTs;
165 lastTs = time;
166
167 long dataRate = 1000 * currentDs / interval;
168 log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
169 + " dataRate=" + dataRate);
170 }
171
172 public StatReportingTask() {}
173 };
174
175 void rotate() {
176 if (rotateTimer != null) {
177 rotateTimer.cancel();
178 }
179
180 if(!isRunning)
181 return;
182
183 calendar.setTimeInMillis(System.currentTimeMillis());
184
185 String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
186 .format(calendar.getTime());
187 newName += localHostAddr + new java.rmi.server.UID().toString();
188 newName = newName.replace("-", "");
189 newName = newName.replace(":", "");
190 newName = newName.replace(".", "");
191 newName = outputDir + "/" + newName.trim();
192
193 try {
194 lock.acquire();
195
196 FSDataOutputStream previousOutputStr = currentOutputStr;
197 Path previousPath = currentPath;
198 String previousFileName = currentFileName;
199
200 if (previousOutputStr != null) {
201 boolean closed = false;
202 try {
203 log.info("closing sink file" + previousFileName);
204 previousOutputStr.close();
205 closed = true;
206 }catch (Throwable e) {
207 log.error("couldn't close file" + previousFileName, e);
208
209
210
211 }
212
213 if (bytesThisRotate > 0) {
214 if (closed) {
215 log.info("rotating sink file " + previousPath);
216 fs.rename(previousPath, new Path(previousFileName + ".done"));
217 }
218 else {
219 log.warn(bytesThisRotate + " bytes potentially lost, since " +
220 previousPath + " could not be closed.");
221 }
222 } else {
223 log.info("no chunks written to " + previousPath + ", deleting");
224 fs.delete(previousPath, false);
225 }
226 }
227
228 Path newOutputPath = new Path(newName + ".chukwa");
229 FSDataOutputStream newOutputStr = fs.create(newOutputPath);
230
231 seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
232 ChukwaArchiveKey.class, ChunkImpl.class,
233 SequenceFile.CompressionType.NONE, null);
234
235
236 currentOutputStr = newOutputStr;
237 currentPath = newOutputPath;
238 currentFileName = newName;
239 bytesThisRotate = 0;
240 } catch (Throwable e) {
241 log.warn("Got an exception trying to rotate. Will try again in " +
242 rotateInterval/1000 + " seconds." ,e);
243 } finally {
244 lock.release();
245 }
246
247
248 scheduleNextRotation();
249
250 }
251
252
253
254
255
256
257
258
259
260
261 void scheduleNextRotation(){
262 long delay = rotateInterval;
263 if (if_fixed_interval) {
264 long currentTime = System.currentTimeMillis();
265 delay = getDelayForFixedInterval(currentTime, rotateInterval, offsetInterval);
266 }
267 rotateTimer = new Timer();
268 rotateTimer.schedule(new TimerTask() {
269 public void run() {
270 rotate();
271 }
272 }, delay);
273 }
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288 long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
289
290 long remainder = (currentTime % rotateInterval);
291 long prevRoundedInterval = currentTime - remainder;
292 long nextRoundedInterval = prevRoundedInterval + rotateInterval;
293 long delay = nextRoundedInterval - currentTime + offsetInterval;
294
295 if (log.isInfoEnabled()) {
296 log.info("currentTime="+currentTime+" prevRoundedInterval="+
297 prevRoundedInterval+" nextRoundedInterval" +
298 "="+nextRoundedInterval+" delay="+delay);
299 }
300
301 return delay;
302 }
303
304
305 protected void computeTimePeriod() {
306 synchronized (calendar) {
307 calendar.setTimeInMillis(System.currentTimeMillis());
308 calendar.set(Calendar.MINUTE, 0);
309 calendar.set(Calendar.SECOND, 0);
310 calendar.set(Calendar.MILLISECOND, 0);
311 timePeriod = calendar.getTimeInMillis();
312 calendar.add(Calendar.HOUR, 1);
313 nextTimePeriodComputation = calendar.getTimeInMillis();
314 }
315 }
316
317 @Override
318 public CommitStatus add(List<Chunk> chunks) throws WriterException {
319 COMMIT_PENDING result = new COMMIT_PENDING(chunks.size());
320 if (!isRunning) {
321 log.info("Collector not ready");
322 throw new WriterException("Collector not ready");
323 }
324
325 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
326
327 if (System.currentTimeMillis() >= nextTimePeriodComputation) {
328 computeTimePeriod();
329 }
330 try {
331 lock.acquire();
332 for (Chunk chunk : chunks) {
333 archiveKey.setTimePartition(timePeriod);
334 archiveKey.setDataType(chunk.getDataType());
335 archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
336 + "/" + chunk.getStreamName());
337 archiveKey.setSeqId(chunk.getSeqID());
338
339 seqFileWriter.append(archiveKey, chunk);
340
341
342
343
344
345 dataSize += chunk.getData().length;
346 bytesThisRotate += chunk.getData().length;
347
348 String futureName = currentPath.getName().replace(".chukwa", ".done");
349 result.addPend(futureName, currentOutputStr.getPos());
350
351 }
352 }
353 catch (IOException e) {
354 log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
355 return COMMIT_FAIL;
356 }
357 catch (Throwable e) {
358
359 log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
360 isRunning = false;
361 } finally {
362 lock.release();
363 }
364 return result;
365 }
366
367 public void close() {
368
369 isRunning = false;
370
371 if (statTimer != null) {
372 statTimer.cancel();
373 }
374
375 if (rotateTimer != null) {
376 rotateTimer.cancel();
377 }
378
379
380
381 boolean gotLock = false;
382 try {
383 gotLock = lock.tryAcquire(ACQ_WAIT_ON_TERM, TimeUnit.MILLISECONDS);
384 if(gotLock) {
385
386 if (this.currentOutputStr != null) {
387 this.currentOutputStr.close();
388 }
389 if(ENABLE_ROTATION_ON_CLOSE)
390 if(bytesThisRotate > 0)
391 fs.rename(currentPath, new Path(currentFileName + ".done"));
392 else
393 fs.delete(currentPath, false);
394 }
395 } catch (Throwable e) {
396 log.warn("cannot rename dataSink file:" + currentPath,e);
397 } finally {
398 if(gotLock)
399 lock.release();
400 }
401 }
402
403 public static void setEnableRotationOnClose(boolean b) {
404 ENABLE_ROTATION_ON_CLOSE = b;
405 }
406
407 }