This project has retired. For details please refer to its Attic page.
SocketDataLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.dataloader;
19  
20  import java.io.DataInputStream;
21  import java.io.DataOutputStream;
22  import java.io.IOException;
23  import java.net.Socket;
24  import java.net.SocketException;
25  import java.nio.charset.Charset;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.LinkedList;
30  import java.util.NoSuchElementException;
31  import java.util.Queue;
32  import java.util.regex.Matcher;
33  import java.util.regex.Pattern;
34  
35  import org.apache.hadoop.chukwa.Chunk;
36  import org.apache.hadoop.chukwa.ChunkImpl;
37  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
38  import org.apache.hadoop.chukwa.datacollection.DataFactory;
39  import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
40  import org.apache.hadoop.chukwa.util.ExceptionUtil;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * Socket Data Loader, also known as the SDL, is a framework for allowing direct
45   * access to log data under the Chukwa Collector in a safe and efficient manner.
46   * Subscribe to chukwaCollector.tee.port for data streaming.
47   * Defaults socket tee port is 9094.
48   */
49  public class SocketDataLoader implements Runnable {
50    private String hostname = "localhost";
51    private int port = 9094;
52    private static Logger log = Logger.getLogger(SocketDataLoader.class);
53    private Socket s = null;
54    private DataInputStream dis = null;
55    private DataOutputStream dos = null;
56    private Queue<Chunk> q = new LinkedList<Chunk>();
57    private String recordType = null;
58    private boolean running = false;
59    private static final int QUEUE_MAX = 10;
60    private Iterator<String> collectors = null;
61    private static Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
62  
63    /*
64     * Create and start an instance of SocketDataLoader.
65     * @param Record Type
66     */
67    public SocketDataLoader(String recordType) {
68      this.recordType = recordType;
69      try {
70        collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
71      } catch (IOException e) {
72        log.error(ExceptionUtil.getStackTrace(e));
73      }
74      Matcher m = pattern.matcher(collectors.next());
75      // Socket data loader only supports to stream data from a single collector.  
76      // For large deployment, it may require to setup multi-tiers of collectors to
77      // channel data into a single collector for display.
78      if(m.matches()) {
79        hostname = m.group(2);
80      }
81      start();
82    }
83    
84    /*
85     * Establish a connection to chukwa collector and filter data stream
86     * base on record type.
87     */
88    public synchronized void start() {
89      try {
90        running = true;
91        s = new Socket(hostname, port);
92        try {
93          s.setSoTimeout(120000);
94          dos = new DataOutputStream (s.getOutputStream());
95          StringBuilder output = new StringBuilder();
96          output.append(SocketTeeWriter.WRITABLE);
97          if(recordType.toLowerCase().intern()!="all".intern()) {
98            output.append(" datatype=");
99            output.append(recordType);
100         } else {
101           output.append(" all");
102         }
103         output.append("\n");
104         dos.write((output.toString()).getBytes(Charset.forName("UTF-8")));
105       } catch (SocketException e) {
106         log.warn("Error while settin soTimeout to 120000");
107       }
108       dis = new DataInputStream(s
109           .getInputStream());
110       dis.readFully(new byte[3]); //read "OK\n"
111       StringBuilder sb = new StringBuilder();
112       sb.append("Subscribe to ");
113       sb.append(hostname);
114       sb.append(":");
115       sb.append(port);
116       sb.append(" for record type: ");
117       sb.append(recordType);
118       log.info(sb.toString());
119       Thread t=new Thread (this);
120       t.start();
121     } catch (IOException e) {
122       log.error(ExceptionUtil.getStackTrace(e));
123       stop();
124     }    
125   }
126   
127   /*
128    * Read the current chunks in the SDL queue.
129    * @return List of chunks in the SDL queue.
130    */
131   public synchronized Collection<Chunk> read() throws NoSuchElementException {
132     Collection<Chunk> list = Collections.synchronizedCollection(q);
133     return list;
134   }
135   
136   /*
137    * Unsubscribe from Chukwa collector and stop streaming.
138    */
139   public synchronized void stop() {
140     if(s!=null) {
141       try {
142         dis.close();
143         dos.close();
144         s.close();
145         StringBuilder sb = new StringBuilder();
146         sb.append("Unsubscribe from ");
147         sb.append(hostname);
148         sb.append(":");
149         sb.append(port);
150         sb.append(" for data type: ");
151         sb.append(recordType);
152         log.info(sb.toString());
153         running = false;
154       } catch (IOException e) {
155         log.debug("Unable to close Socket Tee client socket.");
156       }
157     }
158   }
159 
160   /*
161    * Check if streaming is currently happening for the current instance of SDL.
162    * @return running state of the SDL,
163    */
164   public boolean running() {
165     return running;
166   }
167   
168   /*
169    * Background thread for reading data from SocketTeeWriter, and add new data
170    * into SDL queue.
171    */
172   @Override
173   public synchronized  void run() {
174     try {
175       Chunk c;
176       while ((c = ChunkImpl.read(dis)) != null) {
177         StringBuilder sb = new StringBuilder();
178         sb.append("Chunk received, recordType:");
179         sb.append(c.getDataType());
180         log.debug(sb);
181         if(q.size()>QUEUE_MAX) {
182           q.poll();
183         }
184         q.offer(c);
185       }
186     } catch (IOException e) {
187       log.error(ExceptionUtil.getStackTrace(e));
188       stop();
189     }
190   }
191 }