This project has retired. For details please refer to its
Attic page.
SeqFileWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.net.InetAddress;
23 import java.net.URI;
24 import java.net.UnknownHostException;
25 import java.util.Calendar;
26 import java.util.List;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.io.IOException;
32
33 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
34 import org.apache.hadoop.chukwa.Chunk;
35 import org.apache.hadoop.chukwa.ChunkImpl;
36 import org.apache.hadoop.chukwa.util.DaemonWatcher;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataOutputStream;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.io.SequenceFile;
42 import org.apache.log4j.Logger;
43
44
45
46
47
48
49 public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
50 static Logger log = Logger.getLogger(SeqFileWriter.class);
51 public static boolean ENABLE_ROTATION_ON_CLOSE = true;
52
53 protected int STAT_INTERVAL_SECONDS = 30;
54 private int rotateInterval = 1000 * 60 * 5;
55 private int offsetInterval = 1000 * 30;
56 private boolean if_fixed_interval = false;
57 static final int ACQ_WAIT_ON_TERM = 500;
58
59 public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
60 public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
61 public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
62 public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
63 public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
64 protected static String localHostAddr = null;
65
66 protected final Semaphore lock = new Semaphore(1, true);
67
68 protected FileSystem fs = null;
69 protected Configuration conf = null;
70
71 protected String outputDir = null;
72 private Calendar calendar = Calendar.getInstance();
73
74 protected Path currentPath = null;
75 protected String currentFileName = null;
76 protected FSDataOutputStream currentOutputStr = null;
77 protected SequenceFile.Writer seqFileWriter = null;
78
79 protected long timePeriod = -1;
80 protected long nextTimePeriodComputation = -1;
81
82 protected Timer rotateTimer = null;
83 protected Timer statTimer = null;
84
85 protected volatile long dataSize = 0;
86 protected volatile long bytesThisRotate = 0;
87 protected volatile boolean isRunning = false;
88
89 static {
90 try {
91 localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
92 } catch (UnknownHostException e) {
93 localHostAddr = "-NA-";
94 }
95 }
96
97 public SeqFileWriter() {}
98
99 public long getBytesWritten() {
100 return dataSize;
101 }
102
103 public void init(Configuration conf) throws WriterException {
104 outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
105
106 this.conf = conf;
107
108 rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
109 if_fixed_interval = conf.getBoolean(IF_FIXED_INTERVAL_OPT,if_fixed_interval);
110 offsetInterval = conf.getInt(FIXED_INTERVAL_OFFSET_OPT,offsetInterval);
111
112 STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
113
114
115 String fsname = conf.get("writer.hdfs.filesystem");
116 if (fsname == null || fsname.equals("")) {
117
118 fsname = conf.get("fs.default.name");
119 }
120
121 log.info("rotateInterval is " + rotateInterval);
122 if(if_fixed_interval)
123 log.info("using fixed time interval scheme, " +
124 "offsetInterval is " + offsetInterval);
125 else
126 log.info("not using fixed time interval scheme");
127 log.info("outputDir is " + outputDir);
128 log.info("fsname is " + fsname);
129 log.info("filesystem type from core-default.xml is "
130 + conf.get("fs.hdfs.impl"));
131
132 if (fsname == null) {
133 log.error("no filesystem name");
134 throw new WriterException("no filesystem");
135 }
136 try {
137 fs = FileSystem.get(new URI(fsname), conf);
138 if (fs == null) {
139 log.error("can't connect to HDFS at " + fs.getUri() + " bail out!");
140 DaemonWatcher.bailout(-1);
141 }
142 } catch (Throwable e) {
143 log.error(
144 "can't connect to HDFS, trying default file system instead (likely to be local)",
145 e);
146 DaemonWatcher.bailout(-1);
147 }
148
149
150
151 isRunning = true;
152 rotate();
153
154 statTimer = new Timer();
155 statTimer.schedule(new StatReportingTask(), 1000,
156 STAT_INTERVAL_SECONDS * 1000);
157
158 }
159
160 public class StatReportingTask extends TimerTask {
161 private long lastTs = System.currentTimeMillis();
162
163 public void run() {
164
165 long time = System.currentTimeMillis();
166 long currentDs = dataSize;
167 dataSize = 0;
168
169 long interval = time - lastTs;
170 lastTs = time;
171
172 long dataRate = 1000 * currentDs / interval;
173 log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
174 + " dataRate=" + dataRate);
175 }
176
177 public StatReportingTask() {}
178 };
179
180 void rotate() {
181 if (rotateTimer != null) {
182 rotateTimer.cancel();
183 }
184
185 if(!isRunning)
186 return;
187
188 calendar.setTimeInMillis(System.currentTimeMillis());
189
190 String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
191 .format(calendar.getTime());
192 newName += localHostAddr + new java.rmi.server.UID().toString();
193 newName = newName.replace("-", "");
194 newName = newName.replace(":", "");
195 newName = newName.replace(".", "");
196 newName = outputDir + "/" + newName.trim();
197
198 try {
199 lock.acquire();
200
201 FSDataOutputStream previousOutputStr = currentOutputStr;
202 Path previousPath = currentPath;
203 String previousFileName = currentFileName;
204
205 if (previousOutputStr != null) {
206 boolean closed = false;
207 try {
208 log.info("closing sink file" + previousFileName);
209 previousOutputStr.close();
210 closed = true;
211 }catch (Throwable e) {
212 log.error("couldn't close file" + previousFileName, e);
213
214
215
216 }
217
218 if (bytesThisRotate > 0) {
219 if (closed) {
220 log.info("rotating sink file " + previousPath);
221 fs.rename(previousPath, new Path(previousFileName + ".done"));
222 }
223 else {
224 log.warn(bytesThisRotate + " bytes potentially lost, since " +
225 previousPath + " could not be closed.");
226 }
227 } else {
228 log.info("no chunks written to " + previousPath + ", deleting");
229 fs.delete(previousPath, false);
230 }
231 }
232
233 Path newOutputPath = new Path(newName + ".chukwa");
234 FSDataOutputStream newOutputStr = fs.create(newOutputPath);
235
236 seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
237 ChukwaArchiveKey.class, ChunkImpl.class,
238 SequenceFile.CompressionType.NONE, null);
239
240
241 currentOutputStr = newOutputStr;
242 currentPath = newOutputPath;
243 currentFileName = newName;
244 bytesThisRotate = 0;
245 } catch (Throwable e) {
246 log.warn("Got an exception trying to rotate. Will try again in " +
247 rotateInterval/1000 + " seconds." ,e);
248 } finally {
249 lock.release();
250 }
251
252
253 scheduleNextRotation();
254
255 }
256
257
258
259
260
261
262
263
264
265
266 void scheduleNextRotation(){
267 long delay = rotateInterval;
268 if (if_fixed_interval) {
269 long currentTime = System.currentTimeMillis();
270 delay = getDelayForFixedInterval(currentTime, rotateInterval, offsetInterval);
271 }
272 rotateTimer = new Timer();
273 rotateTimer.schedule(new TimerTask() {
274 public void run() {
275 rotate();
276 }
277 }, delay);
278 }
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293 long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
294
295 long remainder = (currentTime % rotateInterval);
296 long prevRoundedInterval = currentTime - remainder;
297 long nextRoundedInterval = prevRoundedInterval + rotateInterval;
298 long delay = nextRoundedInterval - currentTime + offsetInterval;
299
300 if (log.isInfoEnabled()) {
301 log.info("currentTime="+currentTime+" prevRoundedInterval="+
302 prevRoundedInterval+" nextRoundedInterval" +
303 "="+nextRoundedInterval+" delay="+delay);
304 }
305
306 return delay;
307 }
308
309
310 protected void computeTimePeriod() {
311 synchronized (calendar) {
312 calendar.setTimeInMillis(System.currentTimeMillis());
313 calendar.set(Calendar.MINUTE, 0);
314 calendar.set(Calendar.SECOND, 0);
315 calendar.set(Calendar.MILLISECOND, 0);
316 timePeriod = calendar.getTimeInMillis();
317 calendar.add(Calendar.HOUR, 1);
318 nextTimePeriodComputation = calendar.getTimeInMillis();
319 }
320 }
321
322 @Override
323 public CommitStatus add(List<Chunk> chunks) throws WriterException {
324 COMMIT_PENDING result = new COMMIT_PENDING(chunks.size());
325 if (!isRunning) {
326 log.info("Collector not ready");
327 throw new WriterException("Collector not ready");
328 }
329
330 if (chunks != null) {
331 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
332
333 if (System.currentTimeMillis() >= nextTimePeriodComputation) {
334 computeTimePeriod();
335 }
336 try {
337 lock.acquire();
338 for (Chunk chunk : chunks) {
339 archiveKey.setTimePartition(timePeriod);
340 archiveKey.setDataType(chunk.getDataType());
341 archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
342 + "/" + chunk.getStreamName());
343 archiveKey.setSeqId(chunk.getSeqID());
344
345 if (chunk != null) {
346 seqFileWriter.append(archiveKey, chunk);
347
348
349
350
351
352 dataSize += chunk.getData().length;
353 bytesThisRotate += chunk.getData().length;
354
355 String futureName = currentPath.getName().replace(".chukwa", ".done");
356 result.addPend(futureName, currentOutputStr.getPos());
357 }
358
359 }
360 }
361 catch (IOException e) {
362 log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
363 return COMMIT_FAIL;
364 }
365 catch (Throwable e) {
366
367 log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
368 DaemonWatcher.bailout(-1);
369 isRunning = false;
370 } finally {
371 lock.release();
372 }
373 }
374 return result;
375 }
376
377 public void close() {
378
379 isRunning = false;
380
381 if (statTimer != null) {
382 statTimer.cancel();
383 }
384
385 if (rotateTimer != null) {
386 rotateTimer.cancel();
387 }
388
389
390
391 boolean gotLock = false;
392 try {
393 gotLock = lock.tryAcquire(ACQ_WAIT_ON_TERM, TimeUnit.MILLISECONDS);
394 if(gotLock) {
395
396 if (this.currentOutputStr != null) {
397 this.currentOutputStr.close();
398 }
399 if(ENABLE_ROTATION_ON_CLOSE)
400 if(bytesThisRotate > 0)
401 fs.rename(currentPath, new Path(currentFileName + ".done"));
402 else
403 fs.delete(currentPath, false);
404 }
405 } catch (Throwable e) {
406 log.warn("cannot rename dataSink file:" + currentPath,e);
407 } finally {
408 if(gotLock)
409 lock.release();
410 }
411 }
412
413 }