This project has retired. For details please refer to its
Attic page.
SocketAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.io.ObjectInputStream;
24 import java.net.*;
25 import java.nio.charset.Charset;
26
27 import org.apache.hadoop.chukwa.*;
28 import org.apache.hadoop.chukwa.util.ExceptionUtil;
29 import org.apache.log4j.Logger;
30 import org.apache.log4j.PatternLayout;
31 import org.apache.log4j.spi.LoggingEvent;
32
33
34
35
36
37
38
39
40 public class SocketAdaptor extends AbstractAdaptor {
41 PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
42
43 private final static Logger log = Logger.getLogger(SocketAdaptor.class);
44 volatile boolean running = true;
45 volatile long bytesReceived = 0;
46 private int port = 9095;
47
48 class Dispatcher extends Thread {
49 private int port;
50 private ServerSocket listener;
51
52 public Dispatcher(int port) {
53 this.port = port;
54 }
55
56 public void run() {
57 try{
58 listener = new ServerSocket();
59 listener.setReuseAddress(true);
60 bindWithExponentialBackoff(listener, port, 12000);
61 log.info("SocketAdaptor bound successfully to port:" + port);
62
63 Socket server;
64
65 while(running){
66 server = listener.accept();
67 Worker connection = new Worker(server);
68 Thread t = new Thread(connection);
69 t.start();
70 }
71 } catch (IOException ioe) {
72 log.error("SocketAdaptor Dispatcher problem:", ioe);
73 } finally {
74 try {
75 listener.close();
76 } catch (IOException e) {
77 log.warn("IOException closing socket on port:" + port);
78 }
79 }
80 }
81
82 public void shutdown() {
83 try {
84 listener.close();
85 } catch (IOException e) {
86 log.debug(ExceptionUtil.getStackTrace(e));
87 }
88 }
89
90 protected void bindWithExponentialBackoff(ServerSocket ss, int p,
91 int maxDelay) throws IOException {
92 int backoff = 1000;
93 int waitedTime = 0;
94 while (!ss.isBound()) {
95 try {
96 ss.bind(new InetSocketAddress(p));
97 } catch (IOException bindEx) {
98 backoff *= 2;
99 log.warn("IOException in bind:" + bindEx);
100 log.warn("Retrying bind to port " + p + " in milliseconds:" + backoff);
101 try {
102 Thread.sleep(backoff);
103 } catch (InterruptedException e) {
104 throw new IOException(
105 "Interrupted while trying to connect to port:" + p);
106 }
107 }
108 waitedTime += backoff;
109 if (waitedTime > maxDelay) {
110 throw new IOException("Could not bind to port:" + p
111 + " after waiting " + waitedTime
112 + " milliseconds. Abandoning this SocketAdaptor.");
113 }
114 }
115 }
116 }
117
118 class Worker implements Runnable {
119 private ObjectInputStream ois;
120 private Socket server;
121
122 public Worker(Socket server) {
123 this.server = server;
124 }
125
126 public void run() {
127 LoggingEvent event;
128
129 try {
130 ois = new ObjectInputStream(
131 new BufferedInputStream(server.getInputStream()));
132 if (ois != null) {
133 while(running) {
134
135 event = (LoggingEvent) ois.readObject();
136 byte[] bytes = layout.format(event).getBytes(Charset.forName("UTF-8"));
137 bytesReceived=bytes.length;
138 Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
139 dest.add(c);
140 }
141 }
142 } catch(java.io.EOFException e) {
143 log.debug("Caught java.io.EOFException:", e);
144 } catch(java.net.SocketException e) {
145 log.debug("Caught java.net.SocketException:", e);
146 } catch(InterruptedIOException e) {
147 Thread.currentThread().interrupt();
148 log.debug("Caught java.io.InterruptedIOException: ", e);
149 } catch(IOException e) {
150 log.debug("Caught java.io.IOException: "+e);
151 } catch(Exception e) {
152 log.error("Unexpected exception. Closing conneciton.", e);
153 } finally {
154 if (ois != null) {
155 try {
156 ois.close();
157 } catch(Exception e) {
158 log.info("Could not close connection.", e);
159 }
160 }
161 if (server != null) {
162 try {
163 server.close();
164 } catch(InterruptedIOException e) {
165 Thread.currentThread().interrupt();
166 } catch(IOException ex) {
167 log.debug(ExceptionUtil.getStackTrace(ex));
168 }
169 }
170 }
171 }
172
173 public void shutdown() {
174 try {
175 ois.close();
176 server.close();
177 } catch (IOException e) {
178 log.debug(ExceptionUtil.getStackTrace(e));
179 }
180 }
181 }
182
183 Dispatcher disp;
184
185 @Override
186 public String parseArgs(String s) {
187 port = Integer.parseInt(s);
188 return s;
189 }
190
191 @Override
192 public void start(long offset) throws AdaptorException {
193 try {
194 disp = new Dispatcher(port);
195 disp.setDaemon(true);
196 disp.start();
197 } catch (Exception e) {
198 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
199 }
200 }
201
202 @Override
203 public String getCurrentStatus() {
204 return type + " " + port;
205 }
206
207 @Override
208 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
209 throws AdaptorException {
210 try {
211 running = false;
212 disp.shutdown();
213 } catch(Exception e) {
214 log.debug(ExceptionUtil.getStackTrace(e));
215 }
216 return 0;
217 }
218
219 }