This project has retired. For details please refer to its Attic page.
SocketAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.io.ObjectInputStream;
24  import java.net.*;
25  import java.nio.charset.Charset;
26  
27  import org.apache.hadoop.chukwa.*;
28  import org.apache.hadoop.chukwa.util.ExceptionUtil;
29  import org.apache.log4j.Logger;
30  import org.apache.log4j.PatternLayout;
31  import org.apache.log4j.spi.LoggingEvent;
32  
33  /**
34   * SocketAdaptor reads TCP message from a port and convert the message to Chukwa
35   * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
36   * 
37   * add SocketAdaptor [DataType] [Port] [SequenceNumber]
38   * 
39   */
40  public class SocketAdaptor extends AbstractAdaptor {
41    PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
42  
43    private final static Logger log = Logger.getLogger(SocketAdaptor.class);
44    volatile boolean running = true;
45    volatile long bytesReceived = 0;
46    private int port = 9095;
47    
48    class Dispatcher extends Thread {
49      private int port;
50      private ServerSocket listener;
51      
52      public Dispatcher(int port) {
53        this.port = port;
54      }
55      
56      public void run() {
57        try{
58          listener = new ServerSocket();
59          listener.setReuseAddress(true);
60          bindWithExponentialBackoff(listener, port, 12000);
61          log.info("SocketAdaptor bound successfully to port:" + port);
62          
63          Socket server;
64  
65          while(running){
66            server = listener.accept();
67            Worker connection = new Worker(server);
68            Thread t = new Thread(connection);
69            t.start();
70          }
71        } catch (IOException ioe) {
72          log.error("SocketAdaptor Dispatcher problem:", ioe);
73        } finally {
74          try {
75            listener.close();
76          } catch (IOException e) {
77            log.warn("IOException closing socket on port:" + port);
78          }
79        }
80      }
81      
82      public void shutdown() {
83        try {
84          listener.close();
85        } catch (IOException e) {
86          log.debug(ExceptionUtil.getStackTrace(e));
87        }
88      }
89      
90      protected void bindWithExponentialBackoff(ServerSocket ss, int p,
91          int maxDelay) throws IOException {
92        int backoff = 1000;
93        int waitedTime = 0;
94        while (!ss.isBound()) {
95          try {
96            ss.bind(new InetSocketAddress(p));
97          } catch (IOException bindEx) {
98            backoff *= 2;
99            log.warn("IOException in bind:" + bindEx);
100           log.warn("Retrying bind to port " + p + " in milliseconds:" + backoff);
101           try {
102             Thread.sleep(backoff);
103           } catch (InterruptedException e) {
104             throw new IOException(
105                 "Interrupted while trying to connect to port:" + p);
106           }
107         }
108         waitedTime += backoff;
109         if (waitedTime > maxDelay) {
110           throw new IOException("Could not bind to port:" + p
111               + " after waiting " + waitedTime
112               + " milliseconds. Abandoning this SocketAdaptor.");
113         }
114       }
115     }
116   }
117   
118   class Worker implements Runnable {
119     private ObjectInputStream ois;
120     private Socket server;
121     
122     public Worker(Socket server) {
123       this.server = server;
124     }
125     
126     public void run() {
127       LoggingEvent event;
128 
129       try {
130         ois = new ObjectInputStream(
131                            new BufferedInputStream(server.getInputStream()));
132         if (ois != null) {
133           while(running) {
134             // read an event from the wire
135             event = (LoggingEvent) ois.readObject();
136             byte[] bytes = layout.format(event).getBytes(Charset.forName("UTF-8"));
137             bytesReceived=bytes.length;
138             Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
139             dest.add(c);
140           }
141         }
142       } catch(java.io.EOFException e) {
143         log.debug("Caught java.io.EOFException:", e);
144       } catch(java.net.SocketException e) {
145         log.debug("Caught java.net.SocketException:", e);
146       } catch(InterruptedIOException e) {
147         Thread.currentThread().interrupt();
148         log.debug("Caught java.io.InterruptedIOException: ", e);
149       } catch(IOException e) {
150         log.debug("Caught java.io.IOException: "+e);
151       } catch(Exception e) {
152         log.error("Unexpected exception. Closing conneciton.", e);
153       } finally {
154         if (ois != null) {
155            try {
156               ois.close();
157            } catch(Exception e) {
158               log.info("Could not close connection.", e);
159            }
160         }
161         if (server != null) {
162           try {
163             server.close();
164           } catch(InterruptedIOException e) {
165             Thread.currentThread().interrupt();
166           } catch(IOException ex) {
167             log.debug(ExceptionUtil.getStackTrace(ex));
168           }
169         }
170       }
171     }
172     
173     public void shutdown() {
174       try {
175         ois.close();
176         server.close();
177       } catch (IOException e) {
178         log.debug(ExceptionUtil.getStackTrace(e));
179       }
180     }
181   }
182   
183   Dispatcher disp;
184   
185   @Override
186   public String parseArgs(String s) {
187     port = Integer.parseInt(s);
188     return s;
189   }
190 
191   @Override
192   public void start(long offset) throws AdaptorException {
193     try {
194       disp = new Dispatcher(port);
195       disp.setDaemon(true);
196       disp.start();      
197     } catch (Exception e) {
198       throw new AdaptorException(ExceptionUtil.getStackTrace(e));
199     }
200   }
201 
202   @Override
203   public String getCurrentStatus() {
204     return type + " " + port;
205   }
206 
207   @Override
208   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
209       throws AdaptorException {
210     try {
211       running = false;
212       disp.shutdown();
213     } catch(Exception e) {
214       log.debug(ExceptionUtil.getStackTrace(e));
215     }
216     return 0;
217   }
218 
219 }