This project has retired. For details please refer to its Attic page.
WriteaheadBuffered xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.*;
21  import org.apache.hadoop.chukwa.Chunk;
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24  import org.apache.log4j.Logger;
25  import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*;
26  
27  public class WriteaheadBuffered extends AbstractWrapper {
28    Logger log = Logger.getLogger(WriteaheadBuffered.class);
29    static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir";
30    static  String BUF_DIR = "/tmp"; //1 MB
31    static long COMPACT_AT = 1024 * 1024; //compact when it can free at least this much storage
32    
33    File outBuf;
34    DataOutputStream outToDisk;
35    long fSize, highestSentOffset;
36    
37    
38    @Override
39    public void add(Chunk event) throws InterruptedException {
40      try {
41        event.write(outToDisk);
42        outToDisk.flush();
43        fSize += event.getData().length;
44        long seq = event.getSeqID();
45        if(seq > highestSentOffset)
46          highestSentOffset = seq;
47      } catch(IOException e) {
48        log.error(e);
49      }
50      dest.add(event);
51    }
52    
53    @Override
54    public void start(String adaptorID, String type, long offset,
55        ChunkReceiver dest) throws AdaptorException {
56      try {
57        String dummyAdaptorID = adaptorID;
58        this.dest = dest;
59        
60        outBuf = new File(BUF_DIR, adaptorID);
61        long newOffset = offset;
62        if(outBuf.length() > 0) {
63          DataInputStream dis = new DataInputStream(new FileInputStream(outBuf));
64          while(dis.available() > 0) {
65            Chunk c = ChunkImpl.read(dis);
66            fSize += c.getData().length;
67            long seq = c.getSeqID();
68            if(seq >offset) {
69              dest.add(c);
70              newOffset = seq;
71            }
72          }
73          //send chunks that are outstanding        
74          dis.close();
75        }
76        outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true));
77        
78        inner.start(dummyAdaptorID, innerType, newOffset, this);
79      } catch(IOException e) {
80        throw new AdaptorException(e);
81      } catch(InterruptedException e) {
82        throw new AdaptorException(e);
83      }
84    }
85    
86    @Override
87    public void committed(long l) {
88  
89      try {
90        long bytesOutstanding = highestSentOffset - l;
91        if(fSize - bytesOutstanding > COMPACT_AT) {
92          fSize = 0;
93          outToDisk.close();
94          File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
95          if(!outBuf.renameTo(outBufTmp)) {
96            log.warn("Cannot rename temp file "+outBuf.getAbsolutePath()+
97                " to "+outBufTmp.getAbsolutePath());
98          };
99          outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
100         DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
101         while(dis.available() > 0) {
102           Chunk c = ChunkImpl.read(dis);
103           if(c.getSeqID() > l) { //not yet committed
104             c.write(outToDisk);
105             fSize += c.getData().length;
106           }
107         }
108         dis.close();
109         if(!outBufTmp.delete()) {
110           log.warn("Can not delete temp file: "+outBufTmp.getAbsolutePath());
111         };
112       }
113     } catch(IOException e) {
114       log.error(e);
115       //should this be fatal?
116     }
117   }
118   
119   @Override
120   public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
121     if(p != RESTARTING) {
122       if(outBuf.delete()) {
123         log.warn("Cannot delete output buffer file:"+outBuf.getAbsolutePath());
124       };
125     }
126     return inner.shutdown(p);
127   }
128 
129 }