This project has retired. For details please refer to its
Attic page.
WriteaheadBuffered xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.*;
21 import org.apache.hadoop.chukwa.Chunk;
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24 import org.apache.log4j.Logger;
25 import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*;
26
27 public class WriteaheadBuffered extends AbstractWrapper {
28 Logger log = Logger.getLogger(WriteaheadBuffered.class);
29 static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir";
30 static String BUF_DIR = "/tmp";
31 static long COMPACT_AT = 1024 * 1024;
32
33 File outBuf;
34 DataOutputStream outToDisk;
35 long fSize, highestSentOffset;
36
37
38 @Override
39 public void add(Chunk event) throws InterruptedException {
40 try {
41 event.write(outToDisk);
42 outToDisk.flush();
43 fSize += event.getData().length;
44 long seq = event.getSeqID();
45 if(seq > highestSentOffset)
46 highestSentOffset = seq;
47 } catch(IOException e) {
48 log.error(e);
49 }
50 dest.add(event);
51 }
52
53 @Override
54 public void start(String adaptorID, String type, long offset,
55 ChunkReceiver dest) throws AdaptorException {
56 try {
57 String dummyAdaptorID = adaptorID;
58 this.dest = dest;
59
60 outBuf = new File(BUF_DIR, adaptorID);
61 long newOffset = offset;
62 if(outBuf.length() > 0) {
63 DataInputStream dis = new DataInputStream(new FileInputStream(outBuf));
64 while(dis.available() > 0) {
65 Chunk c = ChunkImpl.read(dis);
66 fSize += c.getData().length;
67 long seq = c.getSeqID();
68 if(seq >offset) {
69 dest.add(c);
70 newOffset = seq;
71 }
72 }
73
74 dis.close();
75 }
76 outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true));
77
78 inner.start(dummyAdaptorID, innerType, newOffset, this);
79 } catch(IOException e) {
80 throw new AdaptorException(e);
81 } catch(InterruptedException e) {
82 throw new AdaptorException(e);
83 }
84 }
85
86 @Override
87 public void committed(long l) {
88
89 try {
90 long bytesOutstanding = highestSentOffset - l;
91 if(fSize - bytesOutstanding > COMPACT_AT) {
92 fSize = 0;
93 outToDisk.close();
94 File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
95 if(!outBuf.renameTo(outBufTmp)) {
96 log.warn("Cannot rename temp file "+outBuf.getAbsolutePath()+
97 " to "+outBufTmp.getAbsolutePath());
98 };
99 outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
100 DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
101 while(dis.available() > 0) {
102 Chunk c = ChunkImpl.read(dis);
103 if(c.getSeqID() > l) {
104 c.write(outToDisk);
105 fSize += c.getData().length;
106 }
107 }
108 dis.close();
109 if(!outBufTmp.delete()) {
110 log.warn("Can not delete temp file: "+outBufTmp.getAbsolutePath());
111 };
112 }
113 } catch(IOException e) {
114 log.error(e);
115
116 }
117 }
118
119 @Override
120 public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
121 if(p != RESTARTING) {
122 if(outBuf.delete()) {
123 log.warn("Cannot delete output buffer file:"+outBuf.getAbsolutePath());
124 };
125 }
126 return inner.shutdown(p);
127 }
128
129 }