This project has retired. For details please refer to its Attic page.
SeqFileWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import java.net.InetAddress;
23  import java.net.URI;
24  import java.net.UnknownHostException;
25  import java.util.Calendar;
26  import java.util.List;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.Semaphore;
30  import java.util.concurrent.TimeUnit;
31  import java.io.IOException;
32  
33  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
34  import org.apache.hadoop.chukwa.Chunk;
35  import org.apache.hadoop.chukwa.ChunkImpl;
36  import org.apache.hadoop.chukwa.util.DaemonWatcher;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.io.SequenceFile;
42  import org.apache.log4j.Logger;
43  
44  /**
45   * This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
46   * this object.
47   * 
48   */
49  public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
50    static Logger log = Logger.getLogger(SeqFileWriter.class);
51    public static boolean ENABLE_ROTATION_ON_CLOSE = true;
52  
53    protected int STAT_INTERVAL_SECONDS = 30;
54    private int rotateInterval = 1000 * 60 * 5;
55    private int offsetInterval = 1000 * 30;
56    private boolean if_fixed_interval = false;
57    static final int ACQ_WAIT_ON_TERM = 500; //ms to wait for lock on a SIGTERM before aborting
58    
59    public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
60    public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
61    public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
62    public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
63    public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
64    protected static String localHostAddr = null;
65    
66    protected final Semaphore lock = new Semaphore(1, true);
67    
68    protected FileSystem fs = null;
69    protected Configuration conf = null;
70  
71    protected String outputDir = null;
72    private Calendar calendar = Calendar.getInstance();
73  
74    protected Path currentPath = null;
75    protected String currentFileName = null;
76    protected FSDataOutputStream currentOutputStr = null;
77    protected SequenceFile.Writer seqFileWriter = null;
78  
79    protected long timePeriod = -1;
80    protected long nextTimePeriodComputation = -1;
81    
82    protected Timer rotateTimer = null;  
83    protected Timer statTimer = null;
84    
85    protected volatile long dataSize = 0;
86    protected volatile long bytesThisRotate = 0;
87    protected volatile boolean isRunning = false;
88  
89    static {
90      try {
91        localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
92      } catch (UnknownHostException e) {
93        localHostAddr = "-NA-";
94      }
95    }
96    
97    public SeqFileWriter() {}
98    
99    public long getBytesWritten() {
100     return dataSize;
101   }
102   
103   public void init(Configuration conf) throws WriterException {
104     outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
105 
106     this.conf = conf;
107 
108     rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
109     if_fixed_interval = conf.getBoolean(IF_FIXED_INTERVAL_OPT,if_fixed_interval);
110     offsetInterval = conf.getInt(FIXED_INTERVAL_OFFSET_OPT,offsetInterval);
111     
112     STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
113 
114     // check if they've told us the file system to use
115     String fsname = conf.get("writer.hdfs.filesystem");
116     if (fsname == null || fsname.equals("")) {
117       // otherwise try to get the filesystem from hadoop
118       fsname = conf.get("fs.default.name");
119     }
120 
121     log.info("rotateInterval is " + rotateInterval);
122     if(if_fixed_interval)
123       log.info("using fixed time interval scheme, " +
124               "offsetInterval is " + offsetInterval);
125     else
126       log.info("not using fixed time interval scheme");
127     log.info("outputDir is " + outputDir);
128     log.info("fsname is " + fsname);
129     log.info("filesystem type from core-default.xml is "
130         + conf.get("fs.hdfs.impl"));
131 
132     if (fsname == null) {
133       log.error("no filesystem name");
134       throw new WriterException("no filesystem");
135     }
136     try {
137       fs = FileSystem.get(new URI(fsname), conf);
138       if (fs == null) {
139         log.error("can't connect to HDFS at " + fs.getUri() + " bail out!");
140         DaemonWatcher.bailout(-1);
141       }
142     } catch (Throwable e) {
143       log.error(
144           "can't connect to HDFS, trying default file system instead (likely to be local)",
145           e);
146       DaemonWatcher.bailout(-1);
147     }
148 
149     // Setup everything by rotating
150 
151     isRunning = true;
152     rotate();
153 
154     statTimer = new Timer();
155     statTimer.schedule(new StatReportingTask(), 1000,
156         STAT_INTERVAL_SECONDS * 1000);
157 
158   }
159 
160   public class StatReportingTask extends TimerTask {
161     private long lastTs = System.currentTimeMillis();
162 
163     public void run() {
164 
165       long time = System.currentTimeMillis();
166       long currentDs = dataSize;
167       dataSize = 0;
168 
169       long interval = time - lastTs;
170       lastTs = time;
171 
172       long dataRate = 1000 * currentDs / interval; // kb/sec
173       log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
174           + " dataRate=" + dataRate);
175     }
176     
177     public StatReportingTask() {}
178   };
179 
180   void rotate() {
181      if (rotateTimer != null) {
182       rotateTimer.cancel();
183     } 
184      
185     if(!isRunning)
186       return;
187     
188     calendar.setTimeInMillis(System.currentTimeMillis());
189 
190     String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
191         .format(calendar.getTime());
192     newName += localHostAddr + new java.rmi.server.UID().toString();
193     newName = newName.replace("-", "");
194     newName = newName.replace(":", "");
195     newName = newName.replace(".", "");
196     newName = outputDir + "/" + newName.trim();
197 
198     try {
199       lock.acquire();
200 
201       FSDataOutputStream previousOutputStr = currentOutputStr;
202       Path previousPath = currentPath;
203       String previousFileName = currentFileName;
204 
205       if (previousOutputStr != null) {
206         boolean closed = false;
207         try {
208           log.info("closing sink file" + previousFileName);
209           previousOutputStr.close();
210           closed = true;
211         }catch (Throwable e) {
212           log.error("couldn't close file" + previousFileName, e);
213           //we probably have an orphaned 0 byte file at this point due to an
214           //intermitant HDFS outage. Once HDFS comes up again we'll be able to
215           //close it, although it will be empty.
216         }
217 
218         if (bytesThisRotate > 0) {
219           if (closed) {
220             log.info("rotating sink file " + previousPath);
221             fs.rename(previousPath, new Path(previousFileName + ".done"));
222           }
223           else {
224             log.warn(bytesThisRotate + " bytes potentially lost, since " +
225                     previousPath + " could not be closed.");
226           }
227         } else {
228           log.info("no chunks written to " + previousPath + ", deleting");
229           fs.delete(previousPath, false);
230         }
231       }
232 
233       Path newOutputPath = new Path(newName + ".chukwa");
234       FSDataOutputStream newOutputStr = fs.create(newOutputPath);
235       // Uncompressed for now
236       seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
237           ChukwaArchiveKey.class, ChunkImpl.class,
238           SequenceFile.CompressionType.NONE, null);
239 
240       // reset these once we know that seqFileWriter was created
241       currentOutputStr = newOutputStr;
242       currentPath = newOutputPath;
243       currentFileName = newName;
244       bytesThisRotate = 0;
245     } catch (Throwable e) {
246       log.warn("Got an exception trying to rotate. Will try again in " +
247               rotateInterval/1000 + " seconds." ,e);
248     } finally {
249       lock.release();
250     }
251     
252     // Schedule the next timer
253     scheduleNextRotation();
254 
255   }
256 
257   /**
258    * Schedules the rotate task using either a fixed time interval scheme or a
259    * relative time interval scheme as specified by the
260    * chukwaCollector.isFixedTimeRotatorScheme configuration parameter. If the
261    * value of this parameter is true then next rotation will be scheduled at a
262    * fixed offset from the current rotateInterval. This fixed offset is
263    * provided by chukwaCollector.fixedTimeIntervalOffset configuration
264    * parameter.
265    */
266   void scheduleNextRotation(){
267     long delay = rotateInterval;
268     if (if_fixed_interval) {
269       long currentTime = System.currentTimeMillis();
270       delay = getDelayForFixedInterval(currentTime, rotateInterval, offsetInterval);
271     }
272     rotateTimer = new Timer();
273     rotateTimer.schedule(new TimerTask() {
274       public void run() {
275         rotate();
276       }
277     }, delay);
278   }
279 
280   /**
281    * Calculates delay for scheduling the next rotation in case of
282    * FixedTimeRotatorScheme. This delay is the time difference between the
283    * currentTimestamp (t1) and the next time the collector should rotate the
284    * sequence files (t2). t2 is the time when the current rotateInterval ends
285    * plus an offset (as set by chukwaCollector.FixedTimeIntervalOffset).
286    * So, delay = t2 - t1
287    *
288    * @param currentTime - the current timestamp
289    * @param rotateInterval - chukwaCollector.rotateInterval
290    * @param offsetInterval - chukwaCollector.fixedTimeIntervalOffset
291    * @return delay for scheduling next rotation
292    */
293   long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
294     // time since last rounded interval
295     long remainder = (currentTime % rotateInterval);
296     long prevRoundedInterval = currentTime - remainder;
297     long nextRoundedInterval = prevRoundedInterval + rotateInterval;
298     long delay = nextRoundedInterval - currentTime + offsetInterval;
299 
300     if (log.isInfoEnabled()) {
301      log.info("currentTime="+currentTime+" prevRoundedInterval="+
302              prevRoundedInterval+" nextRoundedInterval" +
303             "="+nextRoundedInterval+" delay="+delay);
304     }
305 
306     return delay;
307   }
308 
309 
310   protected void computeTimePeriod() {
311     synchronized (calendar) {
312       calendar.setTimeInMillis(System.currentTimeMillis());
313       calendar.set(Calendar.MINUTE, 0);
314       calendar.set(Calendar.SECOND, 0);
315       calendar.set(Calendar.MILLISECOND, 0);
316       timePeriod = calendar.getTimeInMillis();
317       calendar.add(Calendar.HOUR, 1);
318       nextTimePeriodComputation = calendar.getTimeInMillis();
319     }
320   }
321   
322   @Override
323   public CommitStatus add(List<Chunk> chunks) throws WriterException {
324     COMMIT_PENDING result = new COMMIT_PENDING(chunks.size());
325     if (!isRunning) {
326       log.info("Collector not ready");
327       throw new WriterException("Collector not ready");
328     }
329 
330     if (chunks != null) {
331       ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
332       
333       if (System.currentTimeMillis() >= nextTimePeriodComputation) {
334         computeTimePeriod();
335       }
336       try {
337         lock.acquire();
338         for (Chunk chunk : chunks) {
339           archiveKey.setTimePartition(timePeriod);
340           archiveKey.setDataType(chunk.getDataType());
341           archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
342               + "/" + chunk.getStreamName());
343           archiveKey.setSeqId(chunk.getSeqID());
344 
345           if (chunk != null) {
346             seqFileWriter.append(archiveKey, chunk);
347 
348             // compute size for stats only if append succeeded. Note though that
349             // seqFileWriter.append can continue taking data for quite some time
350             // after HDFS goes down while the client is trying to reconnect. Hence
351             // these stats might not reflect reality during an HDFS outage.
352             dataSize += chunk.getData().length;
353             bytesThisRotate += chunk.getData().length;
354 
355             String futureName = currentPath.getName().replace(".chukwa", ".done");
356             result.addPend(futureName, currentOutputStr.getPos());
357           }
358 
359         }
360       }
361       catch (IOException e) {
362         log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
363         return COMMIT_FAIL;
364       }
365       catch (Throwable e) {
366         // We don't want to loose anything
367         log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
368         DaemonWatcher.bailout(-1);
369         isRunning = false;
370       } finally {
371         lock.release();
372       }
373     }
374     return result;
375   }
376 
377   public void close() {
378     
379     isRunning = false;
380 
381     if (statTimer != null) {
382       statTimer.cancel();
383     }
384 
385     if (rotateTimer != null) {
386       rotateTimer.cancel();
387     }
388 
389     // If we are here it's either because of an HDFS exception
390     // or Collector has received a kill -TERM
391     boolean gotLock = false;
392     try {
393       gotLock = lock.tryAcquire(ACQ_WAIT_ON_TERM, TimeUnit.MILLISECONDS);
394       if(gotLock) {
395         
396         if (this.currentOutputStr != null) {
397           this.currentOutputStr.close();
398         }
399         if(ENABLE_ROTATION_ON_CLOSE)
400           if(bytesThisRotate > 0)
401             fs.rename(currentPath, new Path(currentFileName + ".done"));
402           else
403             fs.delete(currentPath, false);
404       }
405     } catch (Throwable e) {
406      log.warn("cannot rename dataSink file:" + currentPath,e);
407     } finally {
408       if(gotLock)
409         lock.release();
410     }
411   }
412 
413 }