This project has retired. For details please refer to its Attic page.
LocalWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.net.UnknownHostException;
25  import java.util.Calendar;
26  import java.util.List;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.BlockingQueue;
30  import java.util.concurrent.LinkedBlockingQueue;
31  
32  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
33  import org.apache.hadoop.chukwa.Chunk;
34  import org.apache.hadoop.chukwa.ChunkImpl;
35  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
36  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
37  import org.apache.hadoop.chukwa.util.DaemonWatcher;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.FSDataOutputStream;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.io.SequenceFile;
44  import org.apache.log4j.Logger;
45  
46  /**
47   * <p>This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
48   * lock object.
49   * </p>
50   * <p>
51   * Write data to a local fileSystem then move it to the remote HDFS
52   * <br>
53   * Warning:
54   * <br>
55   * There's no lock/waiting time for the remote client.
56   * The connection is released as soon as the last append is done,
57   * so therefore there is no guarantee that this class will not loose 
58   * any data.
59   * <br>
60   * This class has been designed this way for performance reason.
61   * </p>
62   * <p>
63   * In order to use this class, you need to define some parameters,
64   * in chukwa-collector-conf.xml
65   * <p>
66   * <br>
67   *  &lt;property&gt;<br>
68   *   &lt;name&gt;chukwaCollector.localOutputDir&lt;/name&gt;<br>
69   *   &lt;value&gt;/grid/0/gs/chukwa/chukwa-0.1.2/dataSink/&lt;/value&gt;<br>
70   *   &lt;description&gt;Chukwa data sink directory&lt;/description&gt;<br>
71   *  &lt;/property&gt;<br>
72   *<br>
73   *  &lt;property&gt;<br>
74   *    &lt;name&gt;chukwaCollector.writerClass&lt;/name&gt;<br>
75   *    &lt;value&gt;org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter&lt;/value&gt;<br>
76   *    &lt;description&gt;Local chukwa writer&lt;/description&gt;<br>
77   *  &lt;/property&gt;<br>
78   * <br>
79   */
80  public class LocalWriter implements ChukwaWriter {
81  
82    static Logger log = Logger.getLogger(LocalWriter.class);
83    static final int STAT_INTERVAL_SECONDS = 30;
84    static String localHostAddr = null;
85  
86    private final Object lock = new Object();
87    private BlockingQueue<String> fileQueue = null;
88    @SuppressWarnings("unused")
89    private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
90    private FileSystem fs = null;
91    private Configuration conf = null;
92  
93    private String localOutputDir = null;
94    private Calendar calendar = Calendar.getInstance();
95  
96    private Path currentPath = null;
97    private String currentFileName = null;
98    private FSDataOutputStream currentOutputStr = null;
99    private SequenceFile.Writer seqFileWriter = null;
100   private int rotateInterval = 1000 * 60;
101 
102  
103   private volatile long dataSize = 0;
104   private volatile boolean isRunning = false;
105   
106   private Timer rotateTimer = null;
107   private Timer statTimer = null;
108   
109   
110   private int initWriteChunkRetries = 10;
111   private int writeChunkRetries = initWriteChunkRetries;
112   private boolean chunksWrittenThisRotate = false;
113 
114   private long timePeriod = -1;
115   private long nextTimePeriodComputation = -1;
116   private int minPercentFreeDisk = 20;
117   
118   static {
119     try {
120       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
121     } catch (UnknownHostException e) {
122       localHostAddr = "-NA-";
123     }
124   }
125 
126   public void init(Configuration conf) throws WriterException {
127     this.conf = conf;
128 
129     try {
130       fs = FileSystem.getLocal(conf);
131       localOutputDir = conf.get("chukwaCollector.localOutputDir",
132           "/chukwa/datasink/");
133       if (!localOutputDir.endsWith("/")) {
134         localOutputDir += "/";
135       }
136       Path pLocalOutputDir = new Path(localOutputDir);
137       if (!fs.exists(pLocalOutputDir)) {
138         boolean exist = fs.mkdirs(pLocalOutputDir);
139         if (!exist) {
140           throw new WriterException("Cannot create local dataSink dir: "
141               + localOutputDir);
142         }
143       } else {
144         FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
145         if (!fsLocalOutputDir.isDir()) {
146           throw new WriterException("local dataSink dir is not a directory: "
147               + localOutputDir);
148         }
149       }
150     } catch (Throwable e) {
151       log.fatal("Cannot initialize LocalWriter", e);
152       DaemonWatcher.bailout(-1);
153     }
154 
155     
156     minPercentFreeDisk = conf.getInt("chukwaCollector.minPercentFreeDisk",20);
157     
158     rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
159         1000 * 60 * 5);// defaults to 5 minutes
160    
161     initWriteChunkRetries = conf
162         .getInt("chukwaCollector.writeChunkRetries", 10);
163     writeChunkRetries = initWriteChunkRetries;
164 
165     log.info("rotateInterval is " + rotateInterval);
166     log.info("outputDir is " + localOutputDir);
167     log.info("localFileSystem is " + fs.getUri().toString());
168     log.info("minPercentFreeDisk is " + minPercentFreeDisk);
169     
170     // Setup everything by rotating
171     rotate();
172 
173     rotateTimer = new Timer();
174     rotateTimer.schedule(new RotateTask(), rotateInterval,
175         rotateInterval);
176     
177     statTimer = new Timer();
178     statTimer.schedule(new StatReportingTask(), 1000,
179         STAT_INTERVAL_SECONDS * 1000);
180 
181     fileQueue = new LinkedBlockingQueue<String>();
182     localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
183     
184   }
185 
186   private class RotateTask extends TimerTask {
187         public void run() {
188           rotate();
189       };
190   }
191   
192   private class StatReportingTask extends TimerTask {
193     private long lastTs = System.currentTimeMillis();
194 
195     public void run() {
196 
197       long time = System.currentTimeMillis();
198       long currentDs = dataSize;
199       dataSize = 0;
200 
201       long interval = time - lastTs;
202       lastTs = time;
203 
204       long dataRate = 1000 * currentDs / interval; // kb/sec
205       log.info("stat:datacollection.writer.local.LocalWriter dataSize="
206           + currentDs + " dataRate=" + dataRate);
207     }
208   };
209 
210   protected void computeTimePeriod() {
211     synchronized (calendar) {
212       calendar.setTimeInMillis(System.currentTimeMillis());
213       calendar.set(Calendar.MINUTE, 0);
214       calendar.set(Calendar.SECOND, 0);
215       calendar.set(Calendar.MILLISECOND, 0);
216       timePeriod = calendar.getTimeInMillis();
217       calendar.add(Calendar.HOUR, 1);
218       nextTimePeriodComputation = calendar.getTimeInMillis();
219     }
220   }
221 
222 
223   /**
224    *  Best effort, there's no guarantee that chunks 
225    *  have really been written to disk
226    */
227   public CommitStatus add(List<Chunk> chunks) throws WriterException {
228     if (!isRunning) {
229       throw new WriterException("Writer not yet ready");
230     }
231     long now = System.currentTimeMillis();
232     if (chunks != null) {
233       try {
234         chunksWrittenThisRotate = true;
235         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
236 
237         synchronized (lock) {
238           if (System.currentTimeMillis() >= nextTimePeriodComputation) {
239             computeTimePeriod();
240           }
241 
242           for (Chunk chunk : chunks) {
243             archiveKey.setTimePartition(timePeriod);
244             archiveKey.setDataType(chunk.getDataType());
245             archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
246                 + "/" + chunk.getStreamName());
247             archiveKey.setSeqId(chunk.getSeqID());
248 
249             if (chunk != null) {
250               seqFileWriter.append(archiveKey, chunk);
251               // compute size for stats
252               dataSize += chunk.getData().length;
253             }
254           }
255         }// End synchro
256         long end = System.currentTimeMillis();
257         if (log.isDebugEnabled()) {
258           log.debug("duration=" + (end-now) + " size=" + chunks.size());
259         }
260         
261       } catch (IOException e) {
262         writeChunkRetries--;
263         log.error("Could not save the chunk. ", e);
264 
265         if (writeChunkRetries < 0) {
266           log
267               .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
268           DaemonWatcher.bailout(-1);
269         }
270         throw new WriterException(e);
271       }
272     }
273     return COMMIT_OK;
274   }
275 
276   protected void rotate() {
277     isRunning = true;
278     calendar.setTimeInMillis(System.currentTimeMillis());
279     log.info("start Date [" + calendar.getTime() + "]");
280     log.info("Rotate from " + Thread.currentThread().getName());
281 
282     String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
283         .format(calendar.getTime());
284     newName += localHostAddr + new java.rmi.server.UID().toString();
285     newName = newName.replace("-", "");
286     newName = newName.replace(":", "");
287     newName = newName.replace(".", "");
288     newName = localOutputDir + "/" + newName.trim();
289 
290     synchronized (lock) {
291       try {
292         FSDataOutputStream previousOutputStr = currentOutputStr;
293         Path previousPath = currentPath;
294         String previousFileName = currentFileName;
295 
296         if (previousOutputStr != null) {
297           previousOutputStr.close();
298           if (chunksWrittenThisRotate) {
299             fs.rename(previousPath, new Path(previousFileName + ".done"));
300             fileQueue.add(previousFileName + ".done");
301           } else {
302             log.info("no chunks written to " + previousPath + ", deleting");
303             fs.delete(previousPath, false);
304           }
305         }
306         
307         Path newOutputPath = new Path(newName + ".chukwa");
308         FSDataOutputStream newOutputStr = fs.create(newOutputPath);
309         
310         currentOutputStr = newOutputStr;
311         currentPath = newOutputPath;
312         currentFileName = newName;
313         chunksWrittenThisRotate = false;
314         // Uncompressed for now
315         seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
316             ChukwaArchiveKey.class, ChunkImpl.class,
317             SequenceFile.CompressionType.NONE, null);
318 
319       } catch (IOException e) {
320         log.fatal("IO Exception in rotate. Exiting!", e);
321         // Shutting down the collector
322         // Watchdog will re-start it automatically
323         DaemonWatcher.bailout(-1);
324       }
325     }
326  
327     // Check for disk space
328     File directory4Space = new File(localOutputDir);
329     long totalSpace = directory4Space.getTotalSpace();
330     long freeSpace = directory4Space.getFreeSpace();
331     long minFreeAvailable = (totalSpace * minPercentFreeDisk) /100;
332     
333     if (log.isDebugEnabled()) {
334       log.debug("Directory: " + localOutputDir + ", totalSpace: " + totalSpace 
335           + ", freeSpace: " + freeSpace + ", minFreeAvailable: " + minFreeAvailable
336           + ", percentFreeDisk: " + minPercentFreeDisk);
337     }
338   
339     if (freeSpace < minFreeAvailable) {
340       log.fatal("No space left on device, Bail out!");
341       DaemonWatcher.bailout(-1);
342     } 
343     
344     log.debug("finished rotate()");
345   }
346 
347   public void close() {
348     synchronized (lock) {
349   
350       if (rotateTimer != null) {
351         rotateTimer.cancel();
352       }
353 
354       if (statTimer != null) {
355         statTimer.cancel();
356       }
357 
358       try {
359         if (this.currentOutputStr != null) {
360           this.currentOutputStr.close();
361 
362           if (seqFileWriter != null) {
363             seqFileWriter.close();
364           }
365         }
366         if (localToRemoteHdfsMover != null) {
367           localToRemoteHdfsMover.shutdown();
368         }
369         
370         fs.rename(currentPath, new Path(currentFileName + ".done"));
371       } catch (IOException e) {
372         log.error("failed to close and rename stream", e);
373       }
374     }
375   }
376 }