This project has retired. For details please refer to its Attic page.
SocketTeeWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  import java.util.*;
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.ArrayBlockingQueue;
23  
24  import org.apache.hadoop.chukwa.Chunk;
25  import org.apache.hadoop.chukwa.util.Filter;
26  import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.log4j.Logger;
29  
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.nio.charset.Charset;
33  import java.io.*;
34  
35  import org.apache.hadoop.chukwa.util.ExceptionUtil;
36  
37  /**
38   * Effectively a "Tee" in the writer pipeline.
39   * Accepts incoming connections on port specified by chukwaCollector.tee.port.
40   * Defaults to 9094
41   * 
42   * Protocol is as follows:
43   * Client ---> TeeWriter   "RAW | WRITABLE <filter>" 
44   *                  as per DumpChunks.
45   *                  
46   * TeeWriter ---> Client "OK\n"                 
47   *   In RAW mode               
48   * TeeWriter ---> Client (length(int)  byte[length])*
49   *              An indefinite sequence of length, followed by byte array.
50   *              
51   *  In Writable mode
52   * TeeWriter ---> Client    (Chunk serialized as Writable)*
53   *              An indefinite sequence of serialized chunks
54   *              
55   *  In English: clients should connect and say either "RAW " or "WRITABLE " 
56   *  followed by a filter.  (Note that the keyword is followed by exactly one space.)
57   *  They'll then receive either a sequence of byte arrays or of writable-serialized.
58   *  
59   *  Option chukwaCollector.tee.keepalive controls using TCP keepalive. Defaults to true.
60   *  
61   */
62  public class SocketTeeWriter extends PipelineableWriter {
63  
64    public static final String WRITABLE = "WRITABLE";
65    public static final String RAW = "RAW";
66    public static final String ASCII_HEADER = "HEADER";
67    
68    static enum DataFormat {Raw, Writable, Header};
69    
70    static boolean USE_KEEPALIVE = true;
71    public static final int DEFAULT_PORT = 9094;
72    static int QUEUE_LENGTH = 1000;
73    
74    static Logger log = Logger.getLogger(SocketTeeWriter.class);
75    volatile boolean running = true;
76    int timeout;
77  //  private final ExecutorService pool;
78    
79    /**
80     * Listens for incoming connections, spawns a Tee to deal with each.
81     */
82    class SocketListenThread extends Thread {
83      ServerSocket s;
84      public SocketListenThread(Configuration conf) throws IOException {
85        int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
86        USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
87        s = new ServerSocket(portno);
88        setDaemon(true);
89      }
90      
91      public void run() {
92        log.info("listen thread started");
93        try{
94          while(running) {
95            Socket sock = s.accept();
96            log.info("got connection from " + sock.getInetAddress());
97            new Tee(sock);
98          }
99        } catch(IOException e) {
100         log.debug(ExceptionUtil.getStackTrace(e)); 
101       }
102     }
103     
104     public void shutdown() {
105       try{
106         //running was set to false by caller.
107         s.close(); //to break out of run loop
108         this.interrupt();
109       } catch(IOException e) {
110         log.debug(ExceptionUtil.getStackTrace(e)); 
111       }
112     }
113   }
114   
115   /////////////////Internal class Tee//////////////////////
116   /**
117    * Manages a single socket connection
118    */
119   class Tee implements Runnable {
120     Socket sock;
121     BufferedReader in;
122     DataOutputStream out;
123     Filter rules;
124     DataFormat fmt;
125     final BlockingQueue<Chunk> sendQ;
126     
127     public Tee(Socket s) throws IOException {
128       sock = s;
129       //now initialize asynchronously
130       sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
131       
132       Thread t = new Thread(this);
133       t.setDaemon(true);
134       t.start();
135     }
136     
137     public void run() {
138       setup();
139       try {
140         while(sock.isConnected()) {
141           Chunk c = sendQ.take();
142           
143           if(fmt == DataFormat.Raw) {
144             byte[] data = c.getData();
145             out.writeInt(data.length);
146             out.write(data);
147           } else if(fmt == DataFormat.Writable)
148             c.write(out);
149           else {
150             byte[] data = c.getData();
151             byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+  
152                 c.getSeqID()+"\n").getBytes(Charset.forName("UTF-8")); 
153             out.writeInt(data.length+ header.length);
154             out.write(header);
155             out.write(data);
156           }
157         }
158         out.flush();
159       } catch(IOException e) {
160         log.info("lost tee: "+ e.toString());
161         synchronized(tees) {
162           tees.remove(this);
163         }
164       } catch(InterruptedException e) {
165         //exit quietly
166       }
167     }
168     
169     /**
170      * initializes the tee.
171      */
172     public void setup() {
173       try {   //outer try catches IOExceptions
174        try { //inner try catches bad command syntax errors
175         sock.setSoTimeout(timeout);
176         sock.setKeepAlive(USE_KEEPALIVE);
177         in = new BufferedReader(new InputStreamReader(sock.getInputStream(), Charset.forName("UTF-8")));
178         out = new DataOutputStream(sock.getOutputStream());
179         String cmd = in.readLine();
180         if(cmd==null) {
181           throw new IllegalArgumentException("No input found.");
182         }
183         if(!cmd.contains(" ")) {
184           
185           throw new IllegalArgumentException(
186               "command should be keyword pattern, but no ' ' seen: " + cmd);
187         }
188         String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
189         if(RAW.equals(uppercased))
190           fmt = DataFormat.Raw;
191         else if(WRITABLE.equals(uppercased))
192           fmt = DataFormat.Writable;
193         else if(ASCII_HEADER.equals(uppercased))
194           fmt = DataFormat.Header;
195         else {
196           throw new IllegalArgumentException("bad command '" + uppercased+
197               "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor " 
198               + ASCII_HEADER+"':" + cmd);
199         }
200         
201         String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
202         if(cmdAfterSpace.toLowerCase().equals("all"))
203           rules = Filter.ALL;
204         else
205           try {
206             rules = new Filter(cmdAfterSpace);
207           } catch (CheckedPatternSyntaxException pse) {
208             out.write("Error parsing command as a regex: ".getBytes(Charset.forName("UTF-8")));
209             out.write(pse.getMessage().getBytes(Charset.forName("UTF-8")));
210             out.writeByte('\n');
211             out.close();
212             in.close();
213             sock.close();
214             log.warn(pse);
215             return;
216           }
217 
218           //now that we read everything OK we can add ourselves to list, and return.
219         synchronized(tees) {
220           tees.add(this);
221         }
222         out.write("OK\n".getBytes(Charset.forName("UTF-8")));
223         log.info("tee to " + sock.getInetAddress() + " established");
224       } catch(IllegalArgumentException e) {
225           out.write(e.toString().getBytes(Charset.forName("UTF-8")));
226           out.writeByte('\n');
227           out.close();
228           in.close();
229           sock.close();
230           log.warn(e);
231         }//end inner catch
232       } catch(IOException e) { //end outer catch
233          log.warn(e);
234       }
235     }
236     
237     public void close() {
238       try {
239         out.close();
240         in.close();
241       } catch(Exception e) {
242         log.debug(ExceptionUtil.getStackTrace(e));
243       }
244     }
245 
246     public void handle(Chunk c) {
247       
248       //don't ever block; just ignore this chunk if we don't have room for it.
249       if(rules.matches(c)) {
250         if(!sendQ.offer(c)) {
251           log.debug("Queue is full.");
252         }
253       }
254     }
255   }
256 
257   /////////////////Main class SocketTeeWriter//////////////////////
258   
259   
260   SocketListenThread listenThread;
261   List<Tee> tees;
262   
263   @Override
264   public void setNextStage(ChukwaWriter next) {
265     this.next = next;
266   }
267 
268   @Override
269   public CommitStatus add(List<Chunk> chunks) throws WriterException {
270     CommitStatus rv = ChukwaWriter.COMMIT_OK;
271     if (next != null)
272        rv = next.add(chunks); //pass data through
273     synchronized(tees) {
274       Iterator<Tee> loop = tees.iterator();
275       while(loop.hasNext()) {
276         Tee t = loop.next();
277         for(Chunk c: chunks) {
278           t.handle(c);
279         }
280       }
281     }
282     return rv;
283   }
284 
285   @Override
286   public void close() throws WriterException {
287     if (next != null) 
288 	next.close();
289     running = false;
290     listenThread.shutdown();
291   }
292 
293   @Override
294   public void init(Configuration c) throws WriterException {
295     try {
296       listenThread = new SocketListenThread(c);
297       listenThread.start();
298     } catch (IOException e) {
299       throw new WriterException(e);
300     }
301     tees = new ArrayList<Tee>();
302   }
303 
304 }