This project has retired. For details please refer to its Attic page.
SocketTeeWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  import java.util.*;
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import org.apache.hadoop.chukwa.Chunk;
24  import org.apache.hadoop.chukwa.util.Filter;
25  import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.log4j.Logger;
28  import java.net.ServerSocket;
29  import java.net.Socket;
30  import java.io.*;
31  import org.apache.hadoop.chukwa.util.ExceptionUtil;
32  
33  /**
34   * Effectively a "Tee" in the writer pipeline.
35   * Accepts incoming connections on port specified by chukwaCollector.tee.port.
36   * Defaults to 9094
37   * 
38   * Protocol is as follows:
39   * Client ---> TeeWriter   "RAW | WRITABLE <filter>" 
40   *                  as per DumpChunks.
41   *                  
42   * TeeWriter ---> Client "OK\n"                 
43   *   In RAW mode               
44   * TeeWriter ---> Client (length(int)  byte[length])*
45   *              An indefinite sequence of length, followed by byte array.
46   *              
47   *  In Writable mode
48   * TeeWriter ---> Client    (Chunk serialized as Writable)*
49   *              An indefinite sequence of serialized chunks
50   *              
51   *  In English: clients should connect and say either "RAW " or "WRITABLE " 
52   *  followed by a filter.  (Note that the keyword is followed by exactly one space.)
53   *  They'll then receive either a sequence of byte arrays or of writable-serialized.
54   *  
55   *  Option chukwaCollector.tee.keepalive controls using TCP keepalive. Defaults to true.
56   *  
57   */
58  public class SocketTeeWriter extends PipelineableWriter {
59  
60    public static final String WRITABLE = "WRITABLE";
61    public static final String RAW = "RAW";
62    public static final String ASCII_HEADER = "HEADER";
63    
64    static enum DataFormat {Raw, Writable, Header};
65    
66    static boolean USE_KEEPALIVE = true;
67    public static final int DEFAULT_PORT = 9094;
68    static int QUEUE_LENGTH = 1000;
69    
70    static Logger log = Logger.getLogger(SocketTeeWriter.class);
71    volatile boolean running = true;
72    int timeout;
73  //  private final ExecutorService pool;
74    
75    /**
76     * Listens for incoming connections, spawns a Tee to deal with each.
77     */
78    class SocketListenThread extends Thread {
79      ServerSocket s;
80      public SocketListenThread(Configuration conf) throws IOException {
81        int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
82        USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
83        s = new ServerSocket(portno);
84        setDaemon(true);
85      }
86      
87      public void run() {
88        log.info("listen thread started");
89        try{
90          while(running) {
91            Socket sock = s.accept();
92            log.info("got connection from " + sock.getInetAddress());
93            new Tee(sock);
94          }
95        } catch(IOException e) {
96          log.debug(ExceptionUtil.getStackTrace(e)); 
97        }
98      }
99      
100     public void shutdown() {
101       try{
102         //running was set to false by caller.
103         s.close(); //to break out of run loop
104         this.interrupt();
105       } catch(IOException e) {
106         log.debug(ExceptionUtil.getStackTrace(e)); 
107       }
108     }
109   }
110   
111   /////////////////Internal class Tee//////////////////////
112   /**
113    * Manages a single socket connection
114    */
115   class Tee implements Runnable {
116     Socket sock;
117     BufferedReader in;
118     DataOutputStream out;
119     Filter rules;
120     DataFormat fmt;
121     final BlockingQueue<Chunk> sendQ;
122     
123     public Tee(Socket s) throws IOException {
124       sock = s;
125       //now initialize asynchronously
126       sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
127       
128       Thread t = new Thread(this);
129       t.setDaemon(true);
130       t.start();
131     }
132     
133     public void run() {
134       setup();
135       try {
136         while(sock.isConnected()) {
137           Chunk c = sendQ.take();
138           
139           if(fmt == DataFormat.Raw) {
140             byte[] data = c.getData();
141             out.writeInt(data.length);
142             out.write(data);
143           } else if(fmt == DataFormat.Writable)
144             c.write(out);
145           else {
146             byte[] data = c.getData();
147             byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+  
148                 c.getSeqID()+"\n").getBytes(); 
149             out.writeInt(data.length+ header.length);
150             out.write(header);
151             out.write(data);
152           }
153         }
154         out.flush();
155       } catch(IOException e) {
156         log.info("lost tee: "+ e.toString());
157         synchronized(tees) {
158           tees.remove(this);
159         }
160       } catch(InterruptedException e) {
161         //exit quietly
162       }
163     }
164     
165     /**
166      * initializes the tee.
167      */
168     public void setup() {
169       try {   //outer try catches IOExceptions
170        try { //inner try catches bad command syntax errors
171         sock.setSoTimeout(timeout);
172         sock.setKeepAlive(USE_KEEPALIVE);
173         in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
174         out = new DataOutputStream(sock.getOutputStream());
175         String cmd = in.readLine();
176         if(!cmd.contains(" ")) {
177           
178           throw new IllegalArgumentException(
179               "command should be keyword pattern, but no ' ' seen: " + cmd);
180         }
181         String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
182         if(RAW.equals(uppercased))
183           fmt = DataFormat.Raw;
184         else if(WRITABLE.equals(uppercased))
185           fmt = DataFormat.Writable;
186         else if(ASCII_HEADER.equals(uppercased))
187           fmt = DataFormat.Header;
188         else {
189           throw new IllegalArgumentException("bad command '" + uppercased+
190               "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor " 
191               + ASCII_HEADER+"':" + cmd);
192         }
193         
194         String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
195         if(cmdAfterSpace.toLowerCase().equals("all"))
196           rules = Filter.ALL;
197         else
198           try {
199             rules = new Filter(cmdAfterSpace);
200           } catch (CheckedPatternSyntaxException pse) {
201             out.write("Error parsing command as a regex: ".getBytes());
202             out.write(pse.getMessage().getBytes());
203             out.writeByte('\n');
204             out.close();
205             in.close();
206             sock.close();
207             log.warn(pse);
208             return;
209           }
210 
211           //now that we read everything OK we can add ourselves to list, and return.
212         synchronized(tees) {
213           tees.add(this);
214         }
215         out.write("OK\n".getBytes());
216         log.info("tee to " + sock.getInetAddress() + " established");
217       } catch(IllegalArgumentException e) {
218           out.write(e.toString().getBytes());
219           out.writeByte('\n');
220           out.close();
221           in.close();
222           sock.close();
223           log.warn(e);
224         }//end inner catch
225       } catch(IOException e) { //end outer catch
226          log.warn(e);
227       }
228     }
229     
230     public void close() {
231       try {
232         out.close();
233         in.close();
234       } catch(Exception e) {
235         log.debug(ExceptionUtil.getStackTrace(e));
236       }
237     }
238 
239     public void handle(Chunk c) {
240       
241       //don't ever block; just ignore this chunk if we don't have room for it.
242       if(rules.matches(c)) 
243         sendQ.offer(c);
244     }
245   }
246 
247   /////////////////Main class SocketTeeWriter//////////////////////
248   
249   
250   SocketListenThread listenThread;
251   List<Tee> tees;
252   ChukwaWriter next;
253   
254   @Override
255   public void setNextStage(ChukwaWriter next) {
256     this.next = next;
257   }
258 
259   @Override
260   public CommitStatus add(List<Chunk> chunks) throws WriterException {
261     CommitStatus rv = ChukwaWriter.COMMIT_OK;
262     if (next != null)
263        rv = next.add(chunks); //pass data through
264     synchronized(tees) {
265       Iterator<Tee> loop = tees.iterator();
266       while(loop.hasNext()) {
267         Tee t = loop.next();
268         for(Chunk c: chunks) {
269           t.handle(c);
270         }
271       }
272     }
273     return rv;
274   }
275 
276   @Override
277   public void close() throws WriterException {
278     if (next != null) 
279 	next.close();
280     running = false;
281     listenThread.shutdown();
282   }
283 
284   @Override
285   public void init(Configuration c) throws WriterException {
286     try {
287       listenThread = new SocketListenThread(c);
288       listenThread.start();
289     } catch (IOException e) {
290       throw new WriterException(e);
291     }
292     tees = new ArrayList<Tee>();
293   }
294 
295 }