This project has retired. For details please refer to its
Attic page.
WriteaheadBuffered xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.util.*;
21 import java.io.*;
22 import org.apache.hadoop.chukwa.Chunk;
23 import org.apache.hadoop.chukwa.ChunkImpl;
24 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
25 import org.apache.log4j.Logger;
26 import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*;
27
28 public class WriteaheadBuffered extends AbstractWrapper {
29 Logger log = Logger.getLogger(WriteaheadBuffered.class);
30 static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir";
31 static String BUF_DIR = "/tmp";
32 static long COMPACT_AT = 1024 * 1024;
33
34 File outBuf;
35 DataOutputStream outToDisk;
36 long fSize, highestSentOffset;
37
38
39 @Override
40 public synchronized void add(Chunk event) throws InterruptedException {
41 try {
42 event.write(outToDisk);
43 outToDisk.flush();
44 fSize += event.getData().length;
45 long seq = event.getSeqID();
46 if(seq > highestSentOffset)
47 highestSentOffset = seq;
48 } catch(IOException e) {
49 log.error(e);
50 }
51 dest.add(event);
52 }
53
54 @Override
55 public void start(String adaptorID, String type, long offset,
56 ChunkReceiver dest) throws AdaptorException {
57 try {
58 String dummyAdaptorID = adaptorID;
59 this.dest = dest;
60
61 outBuf = new File(BUF_DIR, adaptorID);
62 long newOffset = offset;
63 if(outBuf.length() > 0) {
64 DataInputStream dis = new DataInputStream(new FileInputStream(outBuf));
65 while(dis.available() > 0) {
66 Chunk c = ChunkImpl.read(dis);
67 fSize += c.getData().length;
68 long seq = c.getSeqID();
69 if(seq >offset) {
70 dest.add(c);
71 newOffset = seq;
72 }
73 }
74
75 dis.close();
76 }
77 outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true));
78
79 inner.start(dummyAdaptorID, innerType, newOffset, this);
80 } catch(IOException e) {
81 throw new AdaptorException(e);
82 } catch(InterruptedException e) {
83 throw new AdaptorException(e);
84 }
85 }
86
87 @Override
88 public synchronized void committed(long l) {
89
90 try {
91 long bytesOutstanding = highestSentOffset - l;
92 if(fSize - bytesOutstanding > COMPACT_AT) {
93 fSize = 0;
94 outToDisk.close();
95 File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
96 outBuf.renameTo(outBufTmp);
97 outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
98 DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
99 while(dis.available() > 0) {
100 Chunk c = ChunkImpl.read(dis);
101 if(c.getSeqID() > l) {
102 c.write(outToDisk);
103 fSize += c.getData().length;
104 }
105 }
106 dis.close();
107 outBufTmp.delete();
108 }
109 } catch(IOException e) {
110 log.error(e);
111
112 }
113 }
114
115 @Override
116 public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
117 if(p != RESTARTING)
118 outBuf.delete();
119 return inner.shutdown(p);
120 }
121
122 }