This project has retired. For details please refer to its
Attic page.
FilePerPostWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.test;
19
20 import java.io.IOException;
21 import java.net.URI;
22
23
24 import java.util.List;
25 import java.util.Timer;
26 import java.util.concurrent.atomic.AtomicLong;
27 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28 import org.apache.hadoop.chukwa.Chunk;
29 import org.apache.hadoop.chukwa.ChunkImpl;
30 import org.apache.hadoop.chukwa.datacollection.writer.*;
31 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
32 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FSDataOutputStream;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.io.SequenceFile;
38
39
40
41
42
43
44 public class FilePerPostWriter extends SeqFileWriter {
45
46 String baseName;
47 AtomicLong counter = new AtomicLong(0);
48
49 protected FileSystem fs = null;
50 protected Configuration conf = null;
51
52 protected String outputDir = null;
53
54
55 protected Path currentPath = null;
56 protected String currentFileName = null;
57
58
59 @Override
60 public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
61
62 try {
63 String newName = baseName +"_" +counter.incrementAndGet();
64 Path newOutputPath = new Path(newName + ".done");
65 FSDataOutputStream currentOutputStr = fs.create(newOutputPath);
66 currentPath = newOutputPath;
67 currentFileName = newName;
68
69 SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, currentOutputStr,
70 ChukwaArchiveKey.class, ChunkImpl.class,
71 SequenceFile.CompressionType.NONE, null);
72
73 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
74
75 if (System.currentTimeMillis() >= nextTimePeriodComputation) {
76 computeTimePeriod();
77 }
78
79 for (Chunk chunk : chunks) {
80 archiveKey.setTimePartition(timePeriod);
81 archiveKey.setDataType(chunk.getDataType());
82 archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
83 + "/" + chunk.getStreamName());
84 archiveKey.setSeqId(chunk.getSeqID());
85
86 if (chunk != null) {
87
88 dataSize += chunk.getData().length;
89 bytesThisRotate += chunk.getData().length;
90 seqFileWriter.append(archiveKey, chunk);
91 }
92
93 }
94
95 seqFileWriter.close();
96 currentOutputStr.close();
97 } catch(IOException e) {
98 throw new WriterException(e);
99 }
100 return COMMIT_OK;
101 }
102
103 @Override
104 public void close() {
105 }
106
107 @Override
108 public void init(Configuration conf) throws WriterException {
109 try {
110 this.conf = conf;
111 outputDir = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa");
112 baseName = outputDir + "/"+System.currentTimeMillis()+ "_" + localHostAddr.hashCode();
113
114 String fsname = conf.get("writer.hdfs.filesystem");
115 if (fsname == null || fsname.equals("")) {
116
117 fsname = conf.get("fs.default.name");
118 }
119
120 fs = FileSystem.get(new URI(fsname), conf);
121 isRunning = true;
122
123 statTimer = new Timer();
124 statTimer.schedule(new StatReportingTask(), 1000,
125 STAT_INTERVAL_SECONDS * 1000);
126
127
128 nextTimePeriodComputation = 0;
129 } catch(Exception e) {
130 throw new WriterException(e);
131 }
132
133 }
134
135 }