This project has retired. For details please refer to its Attic page.
SeqFileWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import java.net.InetAddress;
23  import java.net.URI;
24  import java.net.UnknownHostException;
25  import java.util.Calendar;
26  import java.util.List;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.Semaphore;
30  import java.util.concurrent.TimeUnit;
31  import java.io.IOException;
32  
33  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
34  import org.apache.hadoop.chukwa.Chunk;
35  import org.apache.hadoop.chukwa.ChunkImpl;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.io.SequenceFile;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
45   * this object.
46   * 
47   */
48  public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
49    static Logger log = Logger.getLogger(SeqFileWriter.class);
50    private static boolean ENABLE_ROTATION_ON_CLOSE = true;
51  
52    protected int STAT_INTERVAL_SECONDS = 30;
53    private int rotateInterval = 1000 * 60 * 5;
54    private int offsetInterval = 1000 * 30;
55    private boolean if_fixed_interval = false;
56    static final int ACQ_WAIT_ON_TERM = 500; //ms to wait for lock on a SIGTERM before aborting
57    
58    public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
59    public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
60    public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
61    public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
62    public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
63    public String localHostAddr = null;
64    
65    protected final Semaphore lock = new Semaphore(1, true);
66    
67    protected FileSystem fs = null;
68    protected Configuration conf = null;
69  
70    protected String outputDir = null;
71    private Calendar calendar = Calendar.getInstance();
72  
73    protected Path currentPath = null;
74    protected String currentFileName = null;
75    protected FSDataOutputStream currentOutputStr = null;
76    protected SequenceFile.Writer seqFileWriter = null;
77  
78    protected long timePeriod = -1;
79    protected long nextTimePeriodComputation = -1;
80    
81    protected Timer rotateTimer = null;  
82    protected Timer statTimer = null;
83    
84    protected volatile long dataSize = 0;
85    protected volatile long bytesThisRotate = 0;
86    protected volatile boolean isRunning = false;
87  
88    public SeqFileWriter() {
89      try {
90        localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
91      } catch (UnknownHostException e) {
92        localHostAddr = "-NA-";
93      }
94    }
95    
96    public long getBytesWritten() {
97      return dataSize;
98    }
99    
100   public void init(Configuration conf) throws WriterException {
101     outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
102 
103     this.conf = conf;
104 
105     rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
106     if_fixed_interval = conf.getBoolean(IF_FIXED_INTERVAL_OPT,if_fixed_interval);
107     offsetInterval = conf.getInt(FIXED_INTERVAL_OFFSET_OPT,offsetInterval);
108     
109     STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
110 
111     // check if they've told us the file system to use
112     String fsname = conf.get("writer.hdfs.filesystem");
113     if (fsname == null || fsname.equals("")) {
114       // otherwise try to get the filesystem from hadoop
115       fsname = conf.get("fs.defaultFS");
116     }
117 
118     log.info("rotateInterval is " + rotateInterval);
119     if(if_fixed_interval)
120       log.info("using fixed time interval scheme, " +
121               "offsetInterval is " + offsetInterval);
122     else
123       log.info("not using fixed time interval scheme");
124     log.info("outputDir is " + outputDir);
125     log.info("fsname is " + fsname);
126     log.info("filesystem type from core-default.xml is "
127         + conf.get("fs.hdfs.impl"));
128 
129     if (fsname == null) {
130       log.error("no filesystem name");
131       throw new WriterException("no filesystem");
132     }
133     try {
134       fs = FileSystem.get(new URI(fsname), conf);
135       if (fs == null) {
136         log.error("can't connect to HDFS.");
137       }
138     } catch (Throwable e) {
139       log.error(
140           "can't connect to HDFS, trying default file system instead (likely to be local)",
141           e);
142     }
143 
144     // Setup everything by rotating
145 
146     isRunning = true;
147     rotate();
148 
149     statTimer = new Timer();
150     statTimer.schedule(new StatReportingTask(), 1000,
151         STAT_INTERVAL_SECONDS * 1000);
152 
153   }
154 
155   public class StatReportingTask extends TimerTask {
156     private long lastTs = System.currentTimeMillis();
157 
158     public void run() {
159 
160       long time = System.currentTimeMillis();
161       long currentDs = dataSize;
162       dataSize = 0;
163 
164       long interval = time - lastTs;
165       lastTs = time;
166 
167       long dataRate = 1000 * currentDs / interval; // kb/sec
168       log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
169           + " dataRate=" + dataRate);
170     }
171     
172     public StatReportingTask() {}
173   };
174 
175   void rotate() {
176      if (rotateTimer != null) {
177       rotateTimer.cancel();
178     } 
179      
180     if(!isRunning)
181       return;
182     
183     calendar.setTimeInMillis(System.currentTimeMillis());
184 
185     String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
186         .format(calendar.getTime());
187     newName += localHostAddr + new java.rmi.server.UID().toString();
188     newName = newName.replace("-", "");
189     newName = newName.replace(":", "");
190     newName = newName.replace(".", "");
191     newName = outputDir + "/" + newName.trim();
192 
193     try {
194       lock.acquire();
195 
196       FSDataOutputStream previousOutputStr = currentOutputStr;
197       Path previousPath = currentPath;
198       String previousFileName = currentFileName;
199 
200       if (previousOutputStr != null) {
201         boolean closed = false;
202         try {
203           log.info("closing sink file" + previousFileName);
204           previousOutputStr.close();
205           closed = true;
206         }catch (Throwable e) {
207           log.error("couldn't close file" + previousFileName, e);
208           //we probably have an orphaned 0 byte file at this point due to an
209           //intermitant HDFS outage. Once HDFS comes up again we'll be able to
210           //close it, although it will be empty.
211         }
212 
213         if (bytesThisRotate > 0) {
214           if (closed) {
215             log.info("rotating sink file " + previousPath);
216             fs.rename(previousPath, new Path(previousFileName + ".done"));
217           }
218           else {
219             log.warn(bytesThisRotate + " bytes potentially lost, since " +
220                     previousPath + " could not be closed.");
221           }
222         } else {
223           log.info("no chunks written to " + previousPath + ", deleting");
224           fs.delete(previousPath, false);
225         }
226       }
227 
228       Path newOutputPath = new Path(newName + ".chukwa");
229       FSDataOutputStream newOutputStr = fs.create(newOutputPath);
230       // Uncompressed for now
231       seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
232           ChukwaArchiveKey.class, ChunkImpl.class,
233           SequenceFile.CompressionType.NONE, null);
234 
235       // reset these once we know that seqFileWriter was created
236       currentOutputStr = newOutputStr;
237       currentPath = newOutputPath;
238       currentFileName = newName;
239       bytesThisRotate = 0;
240     } catch (Throwable e) {
241       log.warn("Got an exception trying to rotate. Will try again in " +
242               rotateInterval/1000 + " seconds." ,e);
243     } finally {
244       lock.release();
245     }
246     
247     // Schedule the next timer
248     scheduleNextRotation();
249 
250   }
251 
252   /**
253    * Schedules the rotate task using either a fixed time interval scheme or a
254    * relative time interval scheme as specified by the
255    * chukwaCollector.isFixedTimeRotatorScheme configuration parameter. If the
256    * value of this parameter is true then next rotation will be scheduled at a
257    * fixed offset from the current rotateInterval. This fixed offset is
258    * provided by chukwaCollector.fixedTimeIntervalOffset configuration
259    * parameter.
260    */
261   void scheduleNextRotation(){
262     long delay = rotateInterval;
263     if (if_fixed_interval) {
264       long currentTime = System.currentTimeMillis();
265       delay = getDelayForFixedInterval(currentTime, rotateInterval, offsetInterval);
266     }
267     rotateTimer = new Timer();
268     rotateTimer.schedule(new TimerTask() {
269       public void run() {
270         rotate();
271       }
272     }, delay);
273   }
274 
275   /**
276    * Calculates delay for scheduling the next rotation in case of
277    * FixedTimeRotatorScheme. This delay is the time difference between the
278    * currentTimestamp (t1) and the next time the collector should rotate the
279    * sequence files (t2). t2 is the time when the current rotateInterval ends
280    * plus an offset (as set by chukwaCollector.FixedTimeIntervalOffset).
281    * So, delay = t2 - t1
282    *
283    * @param currentTime - the current timestamp
284    * @param rotateInterval - chukwaCollector.rotateInterval
285    * @param offsetInterval - chukwaCollector.fixedTimeIntervalOffset
286    * @return delay for scheduling next rotation
287    */
288   long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
289     // time since last rounded interval
290     long remainder = (currentTime % rotateInterval);
291     long prevRoundedInterval = currentTime - remainder;
292     long nextRoundedInterval = prevRoundedInterval + rotateInterval;
293     long delay = nextRoundedInterval - currentTime + offsetInterval;
294 
295     if (log.isInfoEnabled()) {
296      log.info("currentTime="+currentTime+" prevRoundedInterval="+
297              prevRoundedInterval+" nextRoundedInterval" +
298             "="+nextRoundedInterval+" delay="+delay);
299     }
300 
301     return delay;
302   }
303 
304 
305   protected void computeTimePeriod() {
306     synchronized (calendar) {
307       calendar.setTimeInMillis(System.currentTimeMillis());
308       calendar.set(Calendar.MINUTE, 0);
309       calendar.set(Calendar.SECOND, 0);
310       calendar.set(Calendar.MILLISECOND, 0);
311       timePeriod = calendar.getTimeInMillis();
312       calendar.add(Calendar.HOUR, 1);
313       nextTimePeriodComputation = calendar.getTimeInMillis();
314     }
315   }
316   
317   @Override
318   public CommitStatus add(List<Chunk> chunks) throws WriterException {
319     COMMIT_PENDING result = new COMMIT_PENDING(chunks.size());
320     if (!isRunning) {
321       log.info("Collector not ready");
322       throw new WriterException("Collector not ready");
323     }
324 
325     ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
326     
327     if (System.currentTimeMillis() >= nextTimePeriodComputation) {
328       computeTimePeriod();
329     }
330     try {
331       lock.acquire();
332       for (Chunk chunk : chunks) {
333         archiveKey.setTimePartition(timePeriod);
334         archiveKey.setDataType(chunk.getDataType());
335         archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
336             + "/" + chunk.getStreamName());
337         archiveKey.setSeqId(chunk.getSeqID());
338 
339         seqFileWriter.append(archiveKey, chunk);
340 
341         // compute size for stats only if append succeeded. Note though that
342         // seqFileWriter.append can continue taking data for quite some time
343         // after HDFS goes down while the client is trying to reconnect. Hence
344         // these stats might not reflect reality during an HDFS outage.
345         dataSize += chunk.getData().length;
346         bytesThisRotate += chunk.getData().length;
347 
348         String futureName = currentPath.getName().replace(".chukwa", ".done");
349         result.addPend(futureName, currentOutputStr.getPos());
350 
351       }
352     }
353     catch (IOException e) {
354       log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
355       return COMMIT_FAIL;
356     }
357     catch (Throwable e) {
358       // We don't want to loose anything
359       log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
360       isRunning = false;
361     } finally {
362       lock.release();
363     }
364     return result;
365   }
366 
367   public void close() {
368     
369     isRunning = false;
370 
371     if (statTimer != null) {
372       statTimer.cancel();
373     }
374 
375     if (rotateTimer != null) {
376       rotateTimer.cancel();
377     }
378 
379     // If we are here it's either because of an HDFS exception
380     // or Collector has received a kill -TERM
381     boolean gotLock = false;
382     try {
383       gotLock = lock.tryAcquire(ACQ_WAIT_ON_TERM, TimeUnit.MILLISECONDS);
384       if(gotLock) {
385         
386         if (this.currentOutputStr != null) {
387           this.currentOutputStr.close();
388         }
389         if(ENABLE_ROTATION_ON_CLOSE)
390           if(bytesThisRotate > 0)
391             fs.rename(currentPath, new Path(currentFileName + ".done"));
392           else
393             fs.delete(currentPath, false);
394       }
395     } catch (Throwable e) {
396      log.warn("cannot rename dataSink file:" + currentPath,e);
397     } finally {
398       if(gotLock)
399         lock.release();
400     }
401   }
402   
403   public static void setEnableRotationOnClose(boolean b) {
404     ENABLE_ROTATION_ON_CLOSE = b;
405   }
406 
407 }