This project has retired. For details please refer to its Attic page.
WriteaheadBuffered xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.util.*;
21  import java.io.*;
22  import org.apache.hadoop.chukwa.Chunk;
23  import org.apache.hadoop.chukwa.ChunkImpl;
24  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
25  import org.apache.log4j.Logger;
26  import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*;
27  
28  public class WriteaheadBuffered extends AbstractWrapper {
29    Logger log = Logger.getLogger(WriteaheadBuffered.class);
30    static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir";
31    static  String BUF_DIR = "/tmp"; //1 MB
32    static long COMPACT_AT = 1024 * 1024; //compact when it can free at least this much storage
33    
34    File outBuf;
35    DataOutputStream outToDisk;
36    long fSize, highestSentOffset;
37    
38    
39    @Override
40    public synchronized void add(Chunk event) throws InterruptedException {
41      try {
42        event.write(outToDisk);
43        outToDisk.flush();
44        fSize += event.getData().length;
45        long seq = event.getSeqID();
46        if(seq > highestSentOffset)
47          highestSentOffset = seq;
48      } catch(IOException e) {
49        log.error(e);
50      }
51      dest.add(event);
52    }
53    
54    @Override
55    public void start(String adaptorID, String type, long offset,
56        ChunkReceiver dest) throws AdaptorException {
57      try {
58        String dummyAdaptorID = adaptorID;
59        this.dest = dest;
60        
61        outBuf = new File(BUF_DIR, adaptorID);
62        long newOffset = offset;
63        if(outBuf.length() > 0) {
64          DataInputStream dis = new DataInputStream(new FileInputStream(outBuf));
65          while(dis.available() > 0) {
66            Chunk c = ChunkImpl.read(dis);
67            fSize += c.getData().length;
68            long seq = c.getSeqID();
69            if(seq >offset) {
70              dest.add(c);
71              newOffset = seq;
72            }
73          }
74          //send chunks that are outstanding        
75          dis.close();
76        }
77        outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true));
78        
79        inner.start(dummyAdaptorID, innerType, newOffset, this);
80      } catch(IOException e) {
81        throw new AdaptorException(e);
82      } catch(InterruptedException e) {
83        throw new AdaptorException(e);
84      }
85    }
86    
87    @Override
88    public synchronized void committed(long l) {
89  
90      try {
91        long bytesOutstanding = highestSentOffset - l;
92        if(fSize - bytesOutstanding > COMPACT_AT) {
93          fSize = 0;
94          outToDisk.close();
95          File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
96          outBuf.renameTo(outBufTmp);
97          outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
98          DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
99          while(dis.available() > 0) {
100           Chunk c = ChunkImpl.read(dis);
101           if(c.getSeqID() > l) { //not yet committed
102             c.write(outToDisk);
103             fSize += c.getData().length;
104           }
105         }
106         dis.close();
107         outBufTmp.delete();
108       }
109     } catch(IOException e) {
110       log.error(e);
111       //should this be fatal?
112     }
113   }
114   
115   @Override
116   public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
117     if(p != RESTARTING)
118       outBuf.delete();    
119     return inner.shutdown(p);
120   }
121 
122 }