This project has retired. For details please refer to its Attic page.
SocketDataLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.dataloader;
19  
20  import java.io.DataInputStream;
21  import java.io.DataOutputStream;
22  import java.io.IOException;
23  import java.net.Socket;
24  import java.net.SocketException;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.LinkedList;
29  import java.util.NoSuchElementException;
30  import java.util.Queue;
31  import java.util.regex.Matcher;
32  import java.util.regex.Pattern;
33  
34  import org.apache.hadoop.chukwa.Chunk;
35  import org.apache.hadoop.chukwa.ChunkImpl;
36  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37  import org.apache.hadoop.chukwa.datacollection.DataFactory;
38  import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
39  import org.apache.hadoop.chukwa.util.ExceptionUtil;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Socket Data Loader, also known as the SDL, is a framework for allowing direct
44   * access to log data under the Chukwa Collector in a safe and efficient manner.
45   * Subscribe to chukwaCollector.tee.port for data streaming.
46   * Defaults socket tee port is 9094.
47   */
48  public class SocketDataLoader implements Runnable {
49    private String hostname = "localhost";
50    private int port = 9094;
51    private static Logger log = Logger.getLogger(SocketDataLoader.class);
52    private Socket s = null;
53    private DataInputStream dis = null;
54    private DataOutputStream dos = null;
55    private Queue<Chunk> q = new LinkedList<Chunk>();
56    private String recordType = null;
57    private boolean running = false;
58    private static final int QUEUE_MAX = 10;
59    private Iterator<String> collectors = null;
60    private static Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
61  
62    /*
63     * Create and start an instance of SocketDataLoader.
64     * @param Record Type
65     */
66    public SocketDataLoader(String recordType) {
67      this.recordType = recordType;
68      try {
69        collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
70      } catch (IOException e) {
71        log.error(ExceptionUtil.getStackTrace(e));
72      }
73      Matcher m = pattern.matcher(collectors.next());
74      // Socket data loader only supports to stream data from a single collector.  
75      // For large deployment, it may require to setup multi-tiers of collectors to
76      // channel data into a single collector for display.
77      if(m.matches()) {
78        hostname = m.group(2);
79      }
80      start();
81    }
82    
83    /*
84     * Establish a connection to chukwa collector and filter data stream
85     * base on record type.
86     */
87    public synchronized void start() {
88      try {
89        running = true;
90        s = new Socket(hostname, port);
91        try {
92          s.setSoTimeout(120000);
93          dos = new DataOutputStream (s.getOutputStream());
94          StringBuilder output = new StringBuilder();
95          output.append(SocketTeeWriter.WRITABLE);
96          if(recordType.toLowerCase().intern()!="all".intern()) {
97            output.append(" datatype=");
98            output.append(recordType);
99          } else {
100           output.append(" all");
101         }
102         output.append("\n");
103         dos.write((output.toString()).getBytes());
104       } catch (SocketException e) {
105         log.warn("Error while settin soTimeout to 120000");
106       }
107       dis = new DataInputStream(s
108           .getInputStream());
109       dis.readFully(new byte[3]); //read "OK\n"
110       StringBuilder sb = new StringBuilder();
111       sb.append("Subscribe to ");
112       sb.append(hostname);
113       sb.append(":");
114       sb.append(port);
115       sb.append(" for record type: ");
116       sb.append(recordType);
117       log.info(sb.toString());
118       Thread t=new Thread (this);
119       t.start();
120     } catch (IOException e) {
121       log.error(ExceptionUtil.getStackTrace(e));
122       stop();
123     }    
124   }
125   
126   /*
127    * Read the current chunks in the SDL queue.
128    * @return List of chunks in the SDL queue.
129    */
130   public synchronized Collection<Chunk> read() throws NoSuchElementException {
131     Collection<Chunk> list = Collections.synchronizedCollection(q);
132     return list;
133   }
134   
135   /*
136    * Unsubscribe from Chukwa collector and stop streaming.
137    */
138   public void stop() {
139     if(s!=null) {
140       try {
141         dis.close();
142         dos.close();
143         s.close();
144         StringBuilder sb = new StringBuilder();
145         sb.append("Unsubscribe from ");
146         sb.append(hostname);
147         sb.append(":");
148         sb.append(port);
149         sb.append(" for data type: ");
150         sb.append(recordType);
151         log.info(sb.toString());
152         running = false;
153       } catch (IOException e) {
154         log.debug("Unable to close Socket Tee client socket.");
155       }
156     }
157   }
158 
159   /*
160    * Check if streaming is currently happening for the current instance of SDL.
161    * @return running state of the SDL,
162    */
163   public boolean running() {
164     return running;
165   }
166   
167   /*
168    * Background thread for reading data from SocketTeeWriter, and add new data
169    * into SDL queue.
170    */
171   @Override
172   public void run() {
173     try {
174       Chunk c;
175       while ((c = ChunkImpl.read(dis)) != null) {
176         StringBuilder sb = new StringBuilder();
177         sb.append("Chunk received, recordType:");
178         sb.append(c.getDataType());
179         log.debug(sb);
180         if(q.size()>QUEUE_MAX) {
181           q.poll();
182         }
183         q.offer(c);
184       }
185     } catch (IOException e) {
186       log.error(ExceptionUtil.getStackTrace(e));
187       stop();
188     }
189   }
190 }