This project has retired. For details please refer to its
Attic page.
SocketTeeWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20 import java.util.*;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ArrayBlockingQueue;
23
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.util.Filter;
26 import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.log4j.Logger;
29
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.nio.charset.Charset;
33 import java.io.*;
34
35 import org.apache.hadoop.chukwa.util.ExceptionUtil;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class SocketTeeWriter extends PipelineableWriter {
63
64 public static final String WRITABLE = "WRITABLE";
65 public static final String RAW = "RAW";
66 public static final String ASCII_HEADER = "HEADER";
67
68 static enum DataFormat {Raw, Writable, Header};
69
70 static boolean USE_KEEPALIVE = true;
71 public static final int DEFAULT_PORT = 9094;
72 static int QUEUE_LENGTH = 1000;
73
74 static Logger log = Logger.getLogger(SocketTeeWriter.class);
75 volatile boolean running = true;
76 int timeout;
77
78
79
80
81
82 class SocketListenThread extends Thread {
83 ServerSocket s;
84 public SocketListenThread(Configuration conf) throws IOException {
85 int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
86 USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
87 s = new ServerSocket(portno);
88 setDaemon(true);
89 }
90
91 public void run() {
92 log.info("listen thread started");
93 try{
94 while(running) {
95 Socket sock = s.accept();
96 log.info("got connection from " + sock.getInetAddress());
97 new Tee(sock);
98 }
99 } catch(IOException e) {
100 log.debug(ExceptionUtil.getStackTrace(e));
101 }
102 }
103
104 public void shutdown() {
105 try{
106
107 s.close();
108 this.interrupt();
109 } catch(IOException e) {
110 log.debug(ExceptionUtil.getStackTrace(e));
111 }
112 }
113 }
114
115
116
117
118
119 class Tee implements Runnable {
120 Socket sock;
121 BufferedReader in;
122 DataOutputStream out;
123 Filter rules;
124 DataFormat fmt;
125 final BlockingQueue<Chunk> sendQ;
126
127 public Tee(Socket s) throws IOException {
128 sock = s;
129
130 sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
131
132 Thread t = new Thread(this);
133 t.setDaemon(true);
134 t.start();
135 }
136
137 public void run() {
138 setup();
139 try {
140 while(sock.isConnected()) {
141 Chunk c = sendQ.take();
142
143 if(fmt == DataFormat.Raw) {
144 byte[] data = c.getData();
145 out.writeInt(data.length);
146 out.write(data);
147 } else if(fmt == DataFormat.Writable)
148 c.write(out);
149 else {
150 byte[] data = c.getData();
151 byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+
152 c.getSeqID()+"\n").getBytes(Charset.forName("UTF-8"));
153 out.writeInt(data.length+ header.length);
154 out.write(header);
155 out.write(data);
156 }
157 }
158 out.flush();
159 } catch(IOException e) {
160 log.info("lost tee: "+ e.toString());
161 synchronized(tees) {
162 tees.remove(this);
163 }
164 } catch(InterruptedException e) {
165
166 }
167 }
168
169
170
171
172 public void setup() {
173 try {
174 try {
175 sock.setSoTimeout(timeout);
176 sock.setKeepAlive(USE_KEEPALIVE);
177 in = new BufferedReader(new InputStreamReader(sock.getInputStream(), Charset.forName("UTF-8")));
178 out = new DataOutputStream(sock.getOutputStream());
179 String cmd = in.readLine();
180 if(cmd==null) {
181 throw new IllegalArgumentException("No input found.");
182 }
183 if(!cmd.contains(" ")) {
184
185 throw new IllegalArgumentException(
186 "command should be keyword pattern, but no ' ' seen: " + cmd);
187 }
188 String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
189 if(RAW.equals(uppercased))
190 fmt = DataFormat.Raw;
191 else if(WRITABLE.equals(uppercased))
192 fmt = DataFormat.Writable;
193 else if(ASCII_HEADER.equals(uppercased))
194 fmt = DataFormat.Header;
195 else {
196 throw new IllegalArgumentException("bad command '" + uppercased+
197 "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor "
198 + ASCII_HEADER+"':" + cmd);
199 }
200
201 String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
202 if(cmdAfterSpace.toLowerCase().equals("all"))
203 rules = Filter.ALL;
204 else
205 try {
206 rules = new Filter(cmdAfterSpace);
207 } catch (CheckedPatternSyntaxException pse) {
208 out.write("Error parsing command as a regex: ".getBytes(Charset.forName("UTF-8")));
209 out.write(pse.getMessage().getBytes(Charset.forName("UTF-8")));
210 out.writeByte('\n');
211 out.close();
212 in.close();
213 sock.close();
214 log.warn(pse);
215 return;
216 }
217
218
219 synchronized(tees) {
220 tees.add(this);
221 }
222 out.write("OK\n".getBytes(Charset.forName("UTF-8")));
223 log.info("tee to " + sock.getInetAddress() + " established");
224 } catch(IllegalArgumentException e) {
225 out.write(e.toString().getBytes(Charset.forName("UTF-8")));
226 out.writeByte('\n');
227 out.close();
228 in.close();
229 sock.close();
230 log.warn(e);
231 }
232 } catch(IOException e) {
233 log.warn(e);
234 }
235 }
236
237 public void close() {
238 try {
239 out.close();
240 in.close();
241 } catch(Exception e) {
242 log.debug(ExceptionUtil.getStackTrace(e));
243 }
244 }
245
246 public void handle(Chunk c) {
247
248
249 if(rules.matches(c)) {
250 if(!sendQ.offer(c)) {
251 log.debug("Queue is full.");
252 }
253 }
254 }
255 }
256
257
258
259
260 SocketListenThread listenThread;
261 List<Tee> tees;
262
263 @Override
264 public void setNextStage(ChukwaWriter next) {
265 this.next = next;
266 }
267
268 @Override
269 public CommitStatus add(List<Chunk> chunks) throws WriterException {
270 CommitStatus rv = ChukwaWriter.COMMIT_OK;
271 if (next != null)
272 rv = next.add(chunks);
273 synchronized(tees) {
274 Iterator<Tee> loop = tees.iterator();
275 while(loop.hasNext()) {
276 Tee t = loop.next();
277 for(Chunk c: chunks) {
278 t.handle(c);
279 }
280 }
281 }
282 return rv;
283 }
284
285 @Override
286 public void close() throws WriterException {
287 if (next != null)
288 next.close();
289 running = false;
290 listenThread.shutdown();
291 }
292
293 @Override
294 public void init(Configuration c) throws WriterException {
295 try {
296 listenThread = new SocketListenThread(c);
297 listenThread.start();
298 } catch (IOException e) {
299 throw new WriterException(e);
300 }
301 tees = new ArrayList<Tee>();
302 }
303
304 }