This project has retired. For details please refer to its
Attic page.
SocketTeeWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20 import java.util.*;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import org.apache.hadoop.chukwa.Chunk;
24 import org.apache.hadoop.chukwa.util.Filter;
25 import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.log4j.Logger;
28 import java.net.ServerSocket;
29 import java.net.Socket;
30 import java.io.*;
31 import org.apache.hadoop.chukwa.util.ExceptionUtil;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class SocketTeeWriter extends PipelineableWriter {
59
60 public static final String WRITABLE = "WRITABLE";
61 public static final String RAW = "RAW";
62 public static final String ASCII_HEADER = "HEADER";
63
64 static enum DataFormat {Raw, Writable, Header};
65
66 static boolean USE_KEEPALIVE = true;
67 public static final int DEFAULT_PORT = 9094;
68 static int QUEUE_LENGTH = 1000;
69
70 static Logger log = Logger.getLogger(SocketTeeWriter.class);
71 volatile boolean running = true;
72 int timeout;
73
74
75
76
77
78 class SocketListenThread extends Thread {
79 ServerSocket s;
80 public SocketListenThread(Configuration conf) throws IOException {
81 int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
82 USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
83 s = new ServerSocket(portno);
84 setDaemon(true);
85 }
86
87 public void run() {
88 log.info("listen thread started");
89 try{
90 while(running) {
91 Socket sock = s.accept();
92 log.info("got connection from " + sock.getInetAddress());
93 new Tee(sock);
94 }
95 } catch(IOException e) {
96 log.debug(ExceptionUtil.getStackTrace(e));
97 }
98 }
99
100 public void shutdown() {
101 try{
102
103 s.close();
104 this.interrupt();
105 } catch(IOException e) {
106 log.debug(ExceptionUtil.getStackTrace(e));
107 }
108 }
109 }
110
111
112
113
114
115 class Tee implements Runnable {
116 Socket sock;
117 BufferedReader in;
118 DataOutputStream out;
119 Filter rules;
120 DataFormat fmt;
121 final BlockingQueue<Chunk> sendQ;
122
123 public Tee(Socket s) throws IOException {
124 sock = s;
125
126 sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
127
128 Thread t = new Thread(this);
129 t.setDaemon(true);
130 t.start();
131 }
132
133 public void run() {
134 setup();
135 try {
136 while(sock.isConnected()) {
137 Chunk c = sendQ.take();
138
139 if(fmt == DataFormat.Raw) {
140 byte[] data = c.getData();
141 out.writeInt(data.length);
142 out.write(data);
143 } else if(fmt == DataFormat.Writable)
144 c.write(out);
145 else {
146 byte[] data = c.getData();
147 byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+
148 c.getSeqID()+"\n").getBytes();
149 out.writeInt(data.length+ header.length);
150 out.write(header);
151 out.write(data);
152 }
153 }
154 out.flush();
155 } catch(IOException e) {
156 log.info("lost tee: "+ e.toString());
157 synchronized(tees) {
158 tees.remove(this);
159 }
160 } catch(InterruptedException e) {
161
162 }
163 }
164
165
166
167
168 public void setup() {
169 try {
170 try {
171 sock.setSoTimeout(timeout);
172 sock.setKeepAlive(USE_KEEPALIVE);
173 in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
174 out = new DataOutputStream(sock.getOutputStream());
175 String cmd = in.readLine();
176 if(!cmd.contains(" ")) {
177
178 throw new IllegalArgumentException(
179 "command should be keyword pattern, but no ' ' seen: " + cmd);
180 }
181 String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
182 if(RAW.equals(uppercased))
183 fmt = DataFormat.Raw;
184 else if(WRITABLE.equals(uppercased))
185 fmt = DataFormat.Writable;
186 else if(ASCII_HEADER.equals(uppercased))
187 fmt = DataFormat.Header;
188 else {
189 throw new IllegalArgumentException("bad command '" + uppercased+
190 "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor "
191 + ASCII_HEADER+"':" + cmd);
192 }
193
194 String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
195 if(cmdAfterSpace.toLowerCase().equals("all"))
196 rules = Filter.ALL;
197 else
198 try {
199 rules = new Filter(cmdAfterSpace);
200 } catch (CheckedPatternSyntaxException pse) {
201 out.write("Error parsing command as a regex: ".getBytes());
202 out.write(pse.getMessage().getBytes());
203 out.writeByte('\n');
204 out.close();
205 in.close();
206 sock.close();
207 log.warn(pse);
208 return;
209 }
210
211
212 synchronized(tees) {
213 tees.add(this);
214 }
215 out.write("OK\n".getBytes());
216 log.info("tee to " + sock.getInetAddress() + " established");
217 } catch(IllegalArgumentException e) {
218 out.write(e.toString().getBytes());
219 out.writeByte('\n');
220 out.close();
221 in.close();
222 sock.close();
223 log.warn(e);
224 }
225 } catch(IOException e) {
226 log.warn(e);
227 }
228 }
229
230 public void close() {
231 try {
232 out.close();
233 in.close();
234 } catch(Exception e) {
235 log.debug(ExceptionUtil.getStackTrace(e));
236 }
237 }
238
239 public void handle(Chunk c) {
240
241
242 if(rules.matches(c))
243 sendQ.offer(c);
244 }
245 }
246
247
248
249
250 SocketListenThread listenThread;
251 List<Tee> tees;
252 ChukwaWriter next;
253
254 @Override
255 public void setNextStage(ChukwaWriter next) {
256 this.next = next;
257 }
258
259 @Override
260 public CommitStatus add(List<Chunk> chunks) throws WriterException {
261 CommitStatus rv = ChukwaWriter.COMMIT_OK;
262 if (next != null)
263 rv = next.add(chunks);
264 synchronized(tees) {
265 Iterator<Tee> loop = tees.iterator();
266 while(loop.hasNext()) {
267 Tee t = loop.next();
268 for(Chunk c: chunks) {
269 t.handle(c);
270 }
271 }
272 }
273 return rv;
274 }
275
276 @Override
277 public void close() throws WriterException {
278 if (next != null)
279 next.close();
280 running = false;
281 listenThread.shutdown();
282 }
283
284 @Override
285 public void init(Configuration c) throws WriterException {
286 try {
287 listenThread = new SocketListenThread(c);
288 listenThread.start();
289 } catch (IOException e) {
290 throw new WriterException(e);
291 }
292 tees = new ArrayList<Tee>();
293 }
294
295 }