This project has retired. For details please refer to its
Attic page.
LocalWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.nio.ByteBuffer;
26 import java.util.Calendar;
27 import java.util.List;
28 import java.util.Timer;
29 import java.util.TimerTask;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.apache.avro.Schema;
34 import org.apache.avro.generic.GenericData;
35 import org.apache.avro.generic.GenericRecord;
36 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
37 import org.apache.hadoop.chukwa.Chunk;
38 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
39 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
40 import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaAvroSchema;
41 import org.apache.hadoop.chukwa.util.ExceptionUtil;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.log4j.Logger;
47 import org.apache.parquet.avro.AvroParquetWriter;
48 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public class LocalWriter implements ChukwaWriter {
85
86 static Logger log = Logger.getLogger(LocalWriter.class);
87 static final int STAT_INTERVAL_SECONDS = 30;
88 static String localHostAddr = null;
89 private int blockSize = 128 * 1024 * 1024;
90 private int pageSize = 1 * 1024 * 1024;
91
92 private final Object lock = new Object();
93 private BlockingQueue<String> fileQueue = null;
94 @SuppressWarnings("unused")
95 private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
96 private FileSystem fs = null;
97 private Configuration conf = null;
98
99 private String localOutputDir = null;
100 private Calendar calendar = Calendar.getInstance();
101
102 private Path currentPath = null;
103 private String currentFileName = null;
104 private AvroParquetWriter<GenericRecord> parquetWriter = null;
105 private int rotateInterval = 1000 * 60;
106
107
108 private volatile long dataSize = 0;
109 private volatile boolean isRunning = false;
110
111 private Timer rotateTimer = null;
112 private Timer statTimer = null;
113
114 private Schema avroSchema = null;
115 private int initWriteChunkRetries = 10;
116 private int writeChunkRetries = initWriteChunkRetries;
117 private boolean chunksWrittenThisRotate = false;
118
119 private long timePeriod = -1;
120 private long nextTimePeriodComputation = -1;
121 private int minPercentFreeDisk = 20;
122
123 static {
124 try {
125 localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
126 } catch (UnknownHostException e) {
127 localHostAddr = "-NA-";
128 }
129 }
130
131 public LocalWriter(Configuration conf) throws WriterException {
132 setup(conf);
133 }
134
135 public void init(Configuration conf) throws WriterException {
136 }
137
138 public void setup(Configuration conf) throws WriterException {
139 this.conf = conf;
140
141
142 avroSchema = ChukwaAvroSchema.getSchema();
143
144 try {
145 fs = FileSystem.getLocal(conf);
146 localOutputDir = conf.get("chukwaCollector.localOutputDir",
147 "/chukwa/datasink/");
148 if (!localOutputDir.endsWith("/")) {
149 localOutputDir += "/";
150 }
151 Path pLocalOutputDir = new Path(localOutputDir);
152 if (!fs.exists(pLocalOutputDir)) {
153 boolean exist = fs.mkdirs(pLocalOutputDir);
154 if (!exist) {
155 throw new WriterException("Cannot create local dataSink dir: "
156 + localOutputDir);
157 }
158 } else {
159 FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
160 if (!fsLocalOutputDir.isDir()) {
161 throw new WriterException("local dataSink dir is not a directory: "
162 + localOutputDir);
163 }
164 }
165 } catch (Throwable e) {
166 log.fatal("Cannot initialize LocalWriter", e);
167 throw new WriterException(e);
168 }
169
170
171 minPercentFreeDisk = conf.getInt("chukwaCollector.minPercentFreeDisk",20);
172
173 rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
174 1000 * 60 * 5);
175
176 initWriteChunkRetries = conf
177 .getInt("chukwaCollector.writeChunkRetries", 10);
178 writeChunkRetries = initWriteChunkRetries;
179
180 log.info("rotateInterval is " + rotateInterval);
181 log.info("outputDir is " + localOutputDir);
182 log.info("localFileSystem is " + fs.getUri().toString());
183 log.info("minPercentFreeDisk is " + minPercentFreeDisk);
184
185 if(rotateTimer==null) {
186 rotateTimer = new Timer();
187 rotateTimer.schedule(new RotateTask(), 0,
188 rotateInterval);
189 }
190 if(statTimer==null) {
191 statTimer = new Timer();
192 statTimer.schedule(new StatReportingTask(), 0,
193 STAT_INTERVAL_SECONDS * 1000);
194 }
195 fileQueue = new LinkedBlockingQueue<String>();
196 localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
197
198 }
199
200 private class RotateTask extends TimerTask {
201 public void run() {
202 try {
203 rotate();
204 } catch(WriterException e) {
205 log.error(ExceptionUtil.getStackTrace(e));
206 }
207 };
208 }
209
210 private class StatReportingTask extends TimerTask {
211 private long lastTs = System.currentTimeMillis();
212
213 public void run() {
214
215 long time = System.currentTimeMillis();
216 long currentDs = dataSize;
217 dataSize = 0;
218
219 long interval = time - lastTs;
220 lastTs = time;
221
222 if(interval <= 0) {
223 interval = 1;
224 }
225 long dataRate = 1000 * currentDs / interval;
226 log.info("stat:datacollection.writer.local.LocalWriter dataSize="
227 + currentDs + " dataRate=" + dataRate);
228 }
229 };
230
231 protected void computeTimePeriod() {
232 synchronized (calendar) {
233 calendar.setTimeInMillis(System.currentTimeMillis());
234 calendar.set(Calendar.MINUTE, 0);
235 calendar.set(Calendar.SECOND, 0);
236 calendar.set(Calendar.MILLISECOND, 0);
237 timePeriod = calendar.getTimeInMillis();
238 calendar.add(Calendar.HOUR, 1);
239 nextTimePeriodComputation = calendar.getTimeInMillis();
240 }
241 }
242
243
244
245
246
247
248 public CommitStatus add(List<Chunk> chunks) throws WriterException {
249 if (!isRunning) {
250 throw new WriterException("Writer not yet ready");
251 }
252 long now = System.currentTimeMillis();
253 if (chunks != null) {
254 try {
255 chunksWrittenThisRotate = true;
256 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
257
258 synchronized (lock) {
259 if (System.currentTimeMillis() >= nextTimePeriodComputation) {
260 computeTimePeriod();
261 }
262
263 for (Chunk chunk : chunks) {
264 archiveKey.setTimePartition(timePeriod);
265 archiveKey.setDataType(chunk.getDataType());
266 archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
267 + "/" + chunk.getStreamName());
268 archiveKey.setSeqId(chunk.getSeqID());
269 GenericRecord record = new GenericData.Record(avroSchema);
270 record.put("dataType", chunk.getDataType());
271 record.put("data", ByteBuffer.wrap(chunk.getData()));
272 record.put("tags", chunk.getTags());
273 record.put("seqId", chunk.getSeqID());
274 record.put("source", chunk.getSource());
275 record.put("stream", chunk.getStreamName());
276 parquetWriter.write(record);
277
278 dataSize += chunk.getData().length;
279 }
280 }
281 long end = System.currentTimeMillis();
282 if (log.isDebugEnabled()) {
283 log.debug("duration=" + (end-now) + " size=" + chunks.size());
284 }
285
286 } catch (IOException e) {
287 writeChunkRetries--;
288 log.error("Could not save the chunk. ", e);
289
290 if (writeChunkRetries < 0) {
291 log
292 .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
293 }
294 throw new WriterException(e);
295 }
296 }
297 return COMMIT_OK;
298 }
299
300 protected String getNewFileName() {
301 calendar.setTimeInMillis(System.currentTimeMillis());
302 String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
303 .format(calendar.getTime());
304 newName += localHostAddr + new java.rmi.server.UID().toString();
305 newName = newName.replace("-", "");
306 newName = newName.replace(":", "");
307 newName = newName.replace(".", "");
308 newName = localOutputDir + "/" + newName.trim();
309 return newName;
310 }
311
312 protected void rotate() throws WriterException {
313 isRunning = true;
314 log.info("start Date [" + calendar.getTime() + "]");
315 log.info("Rotate from " + Thread.currentThread().getName());
316
317 String newName = getNewFileName();
318
319 synchronized (lock) {
320 try {
321 if (currentPath != null) {
322 Path previousPath = currentPath;
323 if (chunksWrittenThisRotate) {
324 String previousFileName = previousPath.getName().replace(".chukwa", ".done");
325 if(fs.exists(previousPath)) {
326 fs.rename(previousPath, new Path(previousFileName + ".done"));
327 }
328 fileQueue.add(previousFileName + ".done");
329 } else {
330 log.info("no chunks written to " + previousPath + ", deleting");
331 fs.delete(previousPath, false);
332 }
333 }
334
335 Path newOutputPath = new Path(newName + ".chukwa");
336 while(fs.exists(newOutputPath)) {
337 newName = getNewFileName();
338 newOutputPath = new Path(newName + ".chukwa");
339 }
340
341 currentPath = newOutputPath;
342 currentFileName = newName;
343 chunksWrittenThisRotate = false;
344 parquetWriter = new AvroParquetWriter<GenericRecord>(newOutputPath, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
345
346 } catch (IOException e) {
347 log.fatal("IO Exception in rotate: ", e);
348 }
349 }
350
351
352 File directory4Space = new File(localOutputDir);
353 long totalSpace = directory4Space.getTotalSpace();
354 long freeSpace = directory4Space.getFreeSpace();
355 long minFreeAvailable = (totalSpace * minPercentFreeDisk) /100;
356
357 if (log.isDebugEnabled()) {
358 log.debug("Directory: " + localOutputDir + ", totalSpace: " + totalSpace
359 + ", freeSpace: " + freeSpace + ", minFreeAvailable: " + minFreeAvailable
360 + ", percentFreeDisk: " + minPercentFreeDisk);
361 }
362
363 if (freeSpace < minFreeAvailable) {
364 log.fatal("No space left on device.");
365 throw new WriterException("No space left on device.");
366 }
367
368 log.debug("finished rotate()");
369 }
370
371 public void close() {
372 synchronized (lock) {
373
374 if (rotateTimer != null) {
375 rotateTimer.cancel();
376 }
377
378 if (statTimer != null) {
379 statTimer.cancel();
380 }
381
382 try {
383 if (parquetWriter != null) {
384 parquetWriter.close();
385 }
386 if (localToRemoteHdfsMover != null) {
387 localToRemoteHdfsMover.shutdown();
388 }
389
390 fs.rename(currentPath, new Path(currentFileName + ".done"));
391 } catch (IOException e) {
392 log.error("failed to close and rename stream", e);
393 }
394 }
395 }
396 }