This project has retired. For details please refer to its Attic page.
LocalWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.net.UnknownHostException;
25  import java.nio.ByteBuffer;
26  import java.util.Calendar;
27  import java.util.List;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.LinkedBlockingQueue;
32  
33  import org.apache.avro.Schema;
34  import org.apache.avro.generic.GenericData;
35  import org.apache.avro.generic.GenericRecord;
36  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
37  import org.apache.hadoop.chukwa.Chunk;
38  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
39  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
40  import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaAvroSchema;
41  import org.apache.hadoop.chukwa.util.ExceptionUtil;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.log4j.Logger;
47  import org.apache.parquet.avro.AvroParquetWriter;
48  import org.apache.parquet.hadoop.metadata.CompressionCodecName;
49  
50  /**
51   * <p>This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
52   * lock object.
53   * </p>
54   * <p>
55   * Write data to a local fileSystem then move it to the remote HDFS
56   * <br>
57   * Warning:
58   * <br>
59   * There's no lock/waiting time for the remote client.
60   * The connection is released as soon as the last append is done,
61   * so therefore there is no guarantee that this class will not loose 
62   * any data.
63   * <br>
64   * This class has been designed this way for performance reason.
65   * </p>
66   * <p>
67   * In order to use this class, you need to define some parameters,
68   * in chukwa-collector-conf.xml
69   * <p>
70   * <br>
71   *  &lt;property&gt;<br>
72   *   &lt;name&gt;chukwaCollector.localOutputDir&lt;/name&gt;<br>
73   *   &lt;value&gt;/grid/0/gs/chukwa/chukwa-0.1.2/dataSink/&lt;/value&gt;<br>
74   *   &lt;description&gt;Chukwa data sink directory&lt;/description&gt;<br>
75   *  &lt;/property&gt;<br>
76   *<br>
77   *  &lt;property&gt;<br>
78   *    &lt;name&gt;chukwaCollector.writerClass&lt;/name&gt;<br>
79   *    &lt;value&gt;org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter&lt;/value&gt;<br>
80   *    &lt;description&gt;Local chukwa writer&lt;/description&gt;<br>
81   *  &lt;/property&gt;<br>
82   * <br>
83   */
84  public class LocalWriter implements ChukwaWriter {
85  
86    static Logger log = Logger.getLogger(LocalWriter.class);
87    static final int STAT_INTERVAL_SECONDS = 30;
88    static String localHostAddr = null;
89    private int blockSize = 128 * 1024 * 1024;
90    private int pageSize = 1 * 1024 * 1024;
91  
92    private final Object lock = new Object();
93    private BlockingQueue<String> fileQueue = null;
94    @SuppressWarnings("unused")
95    private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
96    private FileSystem fs = null;
97    private Configuration conf = null;
98  
99    private String localOutputDir = null;
100   private Calendar calendar = Calendar.getInstance();
101 
102   private Path currentPath = null;
103   private String currentFileName = null;
104   private AvroParquetWriter<GenericRecord> parquetWriter = null;
105   private int rotateInterval = 1000 * 60;
106 
107  
108   private volatile long dataSize = 0;
109   private volatile boolean isRunning = false;
110   
111   private Timer rotateTimer = null;
112   private Timer statTimer = null;
113   
114   private Schema avroSchema = null;
115   private int initWriteChunkRetries = 10;
116   private int writeChunkRetries = initWriteChunkRetries;
117   private boolean chunksWrittenThisRotate = false;
118 
119   private long timePeriod = -1;
120   private long nextTimePeriodComputation = -1;
121   private int minPercentFreeDisk = 20;
122   
123   static {
124     try {
125       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
126     } catch (UnknownHostException e) {
127       localHostAddr = "-NA-";
128     }
129   }
130 
131   public LocalWriter(Configuration conf) throws WriterException {
132     setup(conf);
133   }
134 
135   public void init(Configuration conf) throws WriterException {
136   }
137 
138   public void setup(Configuration conf) throws WriterException {
139     this.conf = conf;
140 
141     // load Chukwa Avro schema
142     avroSchema = ChukwaAvroSchema.getSchema();
143 
144     try {
145       fs = FileSystem.getLocal(conf);
146       localOutputDir = conf.get("chukwaCollector.localOutputDir",
147           "/chukwa/datasink/");
148       if (!localOutputDir.endsWith("/")) {
149         localOutputDir += "/";
150       }
151       Path pLocalOutputDir = new Path(localOutputDir);
152       if (!fs.exists(pLocalOutputDir)) {
153         boolean exist = fs.mkdirs(pLocalOutputDir);
154         if (!exist) {
155           throw new WriterException("Cannot create local dataSink dir: "
156               + localOutputDir);
157         }
158       } else {
159         FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
160         if (!fsLocalOutputDir.isDir()) {
161           throw new WriterException("local dataSink dir is not a directory: "
162               + localOutputDir);
163         }
164       }
165     } catch (Throwable e) {
166       log.fatal("Cannot initialize LocalWriter", e);
167       throw new WriterException(e);
168     }
169 
170     
171     minPercentFreeDisk = conf.getInt("chukwaCollector.minPercentFreeDisk",20);
172     
173     rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
174         1000 * 60 * 5);// defaults to 5 minutes
175    
176     initWriteChunkRetries = conf
177         .getInt("chukwaCollector.writeChunkRetries", 10);
178     writeChunkRetries = initWriteChunkRetries;
179 
180     log.info("rotateInterval is " + rotateInterval);
181     log.info("outputDir is " + localOutputDir);
182     log.info("localFileSystem is " + fs.getUri().toString());
183     log.info("minPercentFreeDisk is " + minPercentFreeDisk);
184 
185     if(rotateTimer==null) {
186       rotateTimer = new Timer();
187       rotateTimer.schedule(new RotateTask(), 0,
188         rotateInterval);
189     }
190     if(statTimer==null) {
191       statTimer = new Timer();
192       statTimer.schedule(new StatReportingTask(), 0,
193         STAT_INTERVAL_SECONDS * 1000);
194     }
195     fileQueue = new LinkedBlockingQueue<String>();
196     localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
197     
198   }
199 
200   private class RotateTask extends TimerTask {
201         public void run() {
202           try {
203             rotate();
204           } catch(WriterException e) {
205             log.error(ExceptionUtil.getStackTrace(e));
206           }
207       };
208   }
209   
210   private class StatReportingTask extends TimerTask {
211     private long lastTs = System.currentTimeMillis();
212 
213     public void run() {
214 
215       long time = System.currentTimeMillis();
216       long currentDs = dataSize;
217       dataSize = 0;
218 
219       long interval = time - lastTs;
220       lastTs = time;
221 
222       if(interval <= 0) {
223         interval = 1;
224       }
225       long dataRate = 1000 * currentDs / interval; // kb/sec
226       log.info("stat:datacollection.writer.local.LocalWriter dataSize="
227           + currentDs + " dataRate=" + dataRate);
228     }
229   };
230 
231   protected void computeTimePeriod() {
232     synchronized (calendar) {
233       calendar.setTimeInMillis(System.currentTimeMillis());
234       calendar.set(Calendar.MINUTE, 0);
235       calendar.set(Calendar.SECOND, 0);
236       calendar.set(Calendar.MILLISECOND, 0);
237       timePeriod = calendar.getTimeInMillis();
238       calendar.add(Calendar.HOUR, 1);
239       nextTimePeriodComputation = calendar.getTimeInMillis();
240     }
241   }
242 
243 
244   /**
245    *  Best effort, there's no guarantee that chunks 
246    *  have really been written to disk
247    */
248   public CommitStatus add(List<Chunk> chunks) throws WriterException {
249     if (!isRunning) {
250       throw new WriterException("Writer not yet ready");
251     }
252     long now = System.currentTimeMillis();
253     if (chunks != null) {
254       try {
255         chunksWrittenThisRotate = true;
256         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
257 
258         synchronized (lock) {
259           if (System.currentTimeMillis() >= nextTimePeriodComputation) {
260             computeTimePeriod();
261           }
262 
263           for (Chunk chunk : chunks) {
264             archiveKey.setTimePartition(timePeriod);
265             archiveKey.setDataType(chunk.getDataType());
266             archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
267                 + "/" + chunk.getStreamName());
268             archiveKey.setSeqId(chunk.getSeqID());
269             GenericRecord record = new GenericData.Record(avroSchema);
270             record.put("dataType", chunk.getDataType());
271             record.put("data", ByteBuffer.wrap(chunk.getData()));
272             record.put("tags", chunk.getTags());
273             record.put("seqId", chunk.getSeqID());
274             record.put("source", chunk.getSource());
275             record.put("stream", chunk.getStreamName());
276             parquetWriter.write(record);
277             // compute size for stats
278             dataSize += chunk.getData().length;
279           }
280         }// End synchro
281         long end = System.currentTimeMillis();
282         if (log.isDebugEnabled()) {
283           log.debug("duration=" + (end-now) + " size=" + chunks.size());
284         }
285         
286       } catch (IOException e) {
287         writeChunkRetries--;
288         log.error("Could not save the chunk. ", e);
289 
290         if (writeChunkRetries < 0) {
291           log
292               .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
293         }
294         throw new WriterException(e);
295       }
296     }
297     return COMMIT_OK;
298   }
299 
300   protected String getNewFileName() {
301     calendar.setTimeInMillis(System.currentTimeMillis());
302     String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
303     .format(calendar.getTime());
304     newName += localHostAddr + new java.rmi.server.UID().toString();
305     newName = newName.replace("-", "");
306     newName = newName.replace(":", "");
307     newName = newName.replace(".", "");
308     newName = localOutputDir + "/" + newName.trim();
309     return newName;
310   }
311 
312   protected void rotate() throws WriterException {
313     isRunning = true;
314     log.info("start Date [" + calendar.getTime() + "]");
315     log.info("Rotate from " + Thread.currentThread().getName());
316 
317     String newName = getNewFileName();
318 
319     synchronized (lock) {
320       try {
321         if (currentPath != null) {
322           Path previousPath = currentPath;
323           if (chunksWrittenThisRotate) {
324             String previousFileName = previousPath.getName().replace(".chukwa", ".done");
325             if(fs.exists(previousPath)) {
326               fs.rename(previousPath, new Path(previousFileName + ".done"));
327             }
328             fileQueue.add(previousFileName + ".done");
329           } else {
330             log.info("no chunks written to " + previousPath + ", deleting");
331             fs.delete(previousPath, false);
332           }
333         }
334         
335         Path newOutputPath = new Path(newName + ".chukwa");
336         while(fs.exists(newOutputPath)) {
337           newName = getNewFileName();
338           newOutputPath = new Path(newName + ".chukwa");
339         }
340 
341         currentPath = newOutputPath;
342         currentFileName = newName;
343         chunksWrittenThisRotate = false;
344         parquetWriter = new AvroParquetWriter<GenericRecord>(newOutputPath, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
345 
346       } catch (IOException e) {
347         log.fatal("IO Exception in rotate: ", e);
348       }
349     }
350  
351     // Check for disk space
352     File directory4Space = new File(localOutputDir);
353     long totalSpace = directory4Space.getTotalSpace();
354     long freeSpace = directory4Space.getFreeSpace();
355     long minFreeAvailable = (totalSpace * minPercentFreeDisk) /100;
356     
357     if (log.isDebugEnabled()) {
358       log.debug("Directory: " + localOutputDir + ", totalSpace: " + totalSpace 
359           + ", freeSpace: " + freeSpace + ", minFreeAvailable: " + minFreeAvailable
360           + ", percentFreeDisk: " + minPercentFreeDisk);
361     }
362   
363     if (freeSpace < minFreeAvailable) {
364       log.fatal("No space left on device.");
365       throw new WriterException("No space left on device.");
366     } 
367     
368     log.debug("finished rotate()");
369   }
370 
371   public void close() {
372     synchronized (lock) {
373   
374       if (rotateTimer != null) {
375         rotateTimer.cancel();
376       }
377 
378       if (statTimer != null) {
379         statTimer.cancel();
380       }
381 
382       try {
383         if (parquetWriter != null) {
384           parquetWriter.close();
385         }
386         if (localToRemoteHdfsMover != null) {
387           localToRemoteHdfsMover.shutdown();
388         }
389         
390         fs.rename(currentPath, new Path(currentFileName + ".done"));
391       } catch (IOException e) {
392         log.error("failed to close and rename stream", e);
393       }
394     }
395   }
396 }