This project has retired. For details please refer to its
Attic page.
LocalWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.Calendar;
26 import java.util.List;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31
32 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
33 import org.apache.hadoop.chukwa.Chunk;
34 import org.apache.hadoop.chukwa.ChunkImpl;
35 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
36 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
37 import org.apache.hadoop.chukwa.util.DaemonWatcher;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.io.SequenceFile;
44 import org.apache.log4j.Logger;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public class LocalWriter implements ChukwaWriter {
81
82 static Logger log = Logger.getLogger(LocalWriter.class);
83 static final int STAT_INTERVAL_SECONDS = 30;
84 static String localHostAddr = null;
85
86 private final Object lock = new Object();
87 private BlockingQueue<String> fileQueue = null;
88 @SuppressWarnings("unused")
89 private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
90 private FileSystem fs = null;
91 private Configuration conf = null;
92
93 private String localOutputDir = null;
94 private Calendar calendar = Calendar.getInstance();
95
96 private Path currentPath = null;
97 private String currentFileName = null;
98 private FSDataOutputStream currentOutputStr = null;
99 private SequenceFile.Writer seqFileWriter = null;
100 private int rotateInterval = 1000 * 60;
101
102
103 private volatile long dataSize = 0;
104 private volatile boolean isRunning = false;
105
106 private Timer rotateTimer = null;
107 private Timer statTimer = null;
108
109
110 private int initWriteChunkRetries = 10;
111 private int writeChunkRetries = initWriteChunkRetries;
112 private boolean chunksWrittenThisRotate = false;
113
114 private long timePeriod = -1;
115 private long nextTimePeriodComputation = -1;
116 private int minPercentFreeDisk = 20;
117
118 static {
119 try {
120 localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
121 } catch (UnknownHostException e) {
122 localHostAddr = "-NA-";
123 }
124 }
125
126 public void init(Configuration conf) throws WriterException {
127 this.conf = conf;
128
129 try {
130 fs = FileSystem.getLocal(conf);
131 localOutputDir = conf.get("chukwaCollector.localOutputDir",
132 "/chukwa/datasink/");
133 if (!localOutputDir.endsWith("/")) {
134 localOutputDir += "/";
135 }
136 Path pLocalOutputDir = new Path(localOutputDir);
137 if (!fs.exists(pLocalOutputDir)) {
138 boolean exist = fs.mkdirs(pLocalOutputDir);
139 if (!exist) {
140 throw new WriterException("Cannot create local dataSink dir: "
141 + localOutputDir);
142 }
143 } else {
144 FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
145 if (!fsLocalOutputDir.isDir()) {
146 throw new WriterException("local dataSink dir is not a directory: "
147 + localOutputDir);
148 }
149 }
150 } catch (Throwable e) {
151 log.fatal("Cannot initialize LocalWriter", e);
152 DaemonWatcher.bailout(-1);
153 }
154
155
156 minPercentFreeDisk = conf.getInt("chukwaCollector.minPercentFreeDisk",20);
157
158 rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
159 1000 * 60 * 5);
160
161 initWriteChunkRetries = conf
162 .getInt("chukwaCollector.writeChunkRetries", 10);
163 writeChunkRetries = initWriteChunkRetries;
164
165 log.info("rotateInterval is " + rotateInterval);
166 log.info("outputDir is " + localOutputDir);
167 log.info("localFileSystem is " + fs.getUri().toString());
168 log.info("minPercentFreeDisk is " + minPercentFreeDisk);
169
170
171 rotate();
172
173 rotateTimer = new Timer();
174 rotateTimer.schedule(new RotateTask(), rotateInterval,
175 rotateInterval);
176
177 statTimer = new Timer();
178 statTimer.schedule(new StatReportingTask(), 1000,
179 STAT_INTERVAL_SECONDS * 1000);
180
181 fileQueue = new LinkedBlockingQueue<String>();
182 localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
183
184 }
185
186 private class RotateTask extends TimerTask {
187 public void run() {
188 rotate();
189 };
190 }
191
192 private class StatReportingTask extends TimerTask {
193 private long lastTs = System.currentTimeMillis();
194
195 public void run() {
196
197 long time = System.currentTimeMillis();
198 long currentDs = dataSize;
199 dataSize = 0;
200
201 long interval = time - lastTs;
202 lastTs = time;
203
204 long dataRate = 1000 * currentDs / interval;
205 log.info("stat:datacollection.writer.local.LocalWriter dataSize="
206 + currentDs + " dataRate=" + dataRate);
207 }
208 };
209
210 protected void computeTimePeriod() {
211 synchronized (calendar) {
212 calendar.setTimeInMillis(System.currentTimeMillis());
213 calendar.set(Calendar.MINUTE, 0);
214 calendar.set(Calendar.SECOND, 0);
215 calendar.set(Calendar.MILLISECOND, 0);
216 timePeriod = calendar.getTimeInMillis();
217 calendar.add(Calendar.HOUR, 1);
218 nextTimePeriodComputation = calendar.getTimeInMillis();
219 }
220 }
221
222
223
224
225
226
227 public CommitStatus add(List<Chunk> chunks) throws WriterException {
228 if (!isRunning) {
229 throw new WriterException("Writer not yet ready");
230 }
231 long now = System.currentTimeMillis();
232 if (chunks != null) {
233 try {
234 chunksWrittenThisRotate = true;
235 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
236
237 synchronized (lock) {
238 if (System.currentTimeMillis() >= nextTimePeriodComputation) {
239 computeTimePeriod();
240 }
241
242 for (Chunk chunk : chunks) {
243 archiveKey.setTimePartition(timePeriod);
244 archiveKey.setDataType(chunk.getDataType());
245 archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
246 + "/" + chunk.getStreamName());
247 archiveKey.setSeqId(chunk.getSeqID());
248
249 if (chunk != null) {
250 seqFileWriter.append(archiveKey, chunk);
251
252 dataSize += chunk.getData().length;
253 }
254 }
255 }
256 long end = System.currentTimeMillis();
257 if (log.isDebugEnabled()) {
258 log.debug("duration=" + (end-now) + " size=" + chunks.size());
259 }
260
261 } catch (IOException e) {
262 writeChunkRetries--;
263 log.error("Could not save the chunk. ", e);
264
265 if (writeChunkRetries < 0) {
266 log
267 .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
268 DaemonWatcher.bailout(-1);
269 }
270 throw new WriterException(e);
271 }
272 }
273 return COMMIT_OK;
274 }
275
276 protected void rotate() {
277 isRunning = true;
278 calendar.setTimeInMillis(System.currentTimeMillis());
279 log.info("start Date [" + calendar.getTime() + "]");
280 log.info("Rotate from " + Thread.currentThread().getName());
281
282 String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
283 .format(calendar.getTime());
284 newName += localHostAddr + new java.rmi.server.UID().toString();
285 newName = newName.replace("-", "");
286 newName = newName.replace(":", "");
287 newName = newName.replace(".", "");
288 newName = localOutputDir + "/" + newName.trim();
289
290 synchronized (lock) {
291 try {
292 FSDataOutputStream previousOutputStr = currentOutputStr;
293 Path previousPath = currentPath;
294 String previousFileName = currentFileName;
295
296 if (previousOutputStr != null) {
297 previousOutputStr.close();
298 if (chunksWrittenThisRotate) {
299 fs.rename(previousPath, new Path(previousFileName + ".done"));
300 fileQueue.add(previousFileName + ".done");
301 } else {
302 log.info("no chunks written to " + previousPath + ", deleting");
303 fs.delete(previousPath, false);
304 }
305 }
306
307 Path newOutputPath = new Path(newName + ".chukwa");
308 FSDataOutputStream newOutputStr = fs.create(newOutputPath);
309
310 currentOutputStr = newOutputStr;
311 currentPath = newOutputPath;
312 currentFileName = newName;
313 chunksWrittenThisRotate = false;
314
315 seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
316 ChukwaArchiveKey.class, ChunkImpl.class,
317 SequenceFile.CompressionType.NONE, null);
318
319 } catch (IOException e) {
320 log.fatal("IO Exception in rotate. Exiting!", e);
321
322
323 DaemonWatcher.bailout(-1);
324 }
325 }
326
327
328 File directory4Space = new File(localOutputDir);
329 long totalSpace = directory4Space.getTotalSpace();
330 long freeSpace = directory4Space.getFreeSpace();
331 long minFreeAvailable = (totalSpace * minPercentFreeDisk) /100;
332
333 if (log.isDebugEnabled()) {
334 log.debug("Directory: " + localOutputDir + ", totalSpace: " + totalSpace
335 + ", freeSpace: " + freeSpace + ", minFreeAvailable: " + minFreeAvailable
336 + ", percentFreeDisk: " + minPercentFreeDisk);
337 }
338
339 if (freeSpace < minFreeAvailable) {
340 log.fatal("No space left on device, Bail out!");
341 DaemonWatcher.bailout(-1);
342 }
343
344 log.debug("finished rotate()");
345 }
346
347 public void close() {
348 synchronized (lock) {
349
350 if (rotateTimer != null) {
351 rotateTimer.cancel();
352 }
353
354 if (statTimer != null) {
355 statTimer.cancel();
356 }
357
358 try {
359 if (this.currentOutputStr != null) {
360 this.currentOutputStr.close();
361
362 if (seqFileWriter != null) {
363 seqFileWriter.close();
364 }
365 }
366 if (localToRemoteHdfsMover != null) {
367 localToRemoteHdfsMover.shutdown();
368 }
369
370 fs.rename(currentPath, new Path(currentFileName + ".done"));
371 } catch (IOException e) {
372 log.error("failed to close and rename stream", e);
373 }
374 }
375 }
376 }