1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.dataloader;
1920import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.net.Socket;
24import java.net.SocketException;
25import java.util.Collection;
26import java.util.Collections;
27import java.util.Iterator;
28import java.util.LinkedList;
29import java.util.NoSuchElementException;
30import java.util.Queue;
31import java.util.regex.Matcher;
32import java.util.regex.Pattern;
3334import org.apache.hadoop.chukwa.Chunk;
35import org.apache.hadoop.chukwa.ChunkImpl;
36import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37import org.apache.hadoop.chukwa.datacollection.DataFactory;
38import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
39import org.apache.hadoop.chukwa.util.ExceptionUtil;
40import org.apache.log4j.Logger;
4142/**43 * Socket Data Loader, also known as the SDL, is a framework for allowing direct44 * access to log data under the Chukwa Collector in a safe and efficient manner.45 * Subscribe to chukwaCollector.tee.port for data streaming.46 * Defaults socket tee port is 9094.47 */48publicclassSocketDataLoaderimplements Runnable {
49private String hostname = "localhost";
50privateint port = 9094;
51privatestatic Logger log = Logger.getLogger(SocketDataLoader.class);
52private Socket s = null;
53private DataInputStream dis = null;
54private DataOutputStream dos = null;
55private Queue<Chunk> q = new LinkedList<Chunk>();
56private String recordType = null;
57privateboolean running = false;
58privatestaticfinalint QUEUE_MAX = 10;
59private Iterator<String> collectors = null;
60privatestatic Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
6162/*63 * Create and start an instance of SocketDataLoader.64 * @param Record Type65 */66publicSocketDataLoader(String recordType) {
67this.recordType = recordType;
68try {
69 collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
70 } catch (IOException e) {
71 log.error(ExceptionUtil.getStackTrace(e));
72 }
73 Matcher m = pattern.matcher(collectors.next());
74// Socket data loader only supports to stream data from a single collector. 75// For large deployment, it may require to setup multi-tiers of collectors to76// channel data into a single collector for display.77if(m.matches()) {
78 hostname = m.group(2);
79 }
80 start();
81 }
8283/*84 * Establish a connection to chukwa collector and filter data stream85 * base on record type.86 */87publicsynchronizedvoid start() {
88try {
89 running = true;
90 s = new Socket(hostname, port);
91try {
92 s.setSoTimeout(120000);
93 dos = new DataOutputStream (s.getOutputStream());
94 StringBuilder output = new StringBuilder();
95 output.append(SocketTeeWriter.WRITABLE);
96if(recordType.toLowerCase().intern()!="all".intern()) {
97 output.append(" datatype=");
98 output.append(recordType);
99 } else {
100 output.append(" all");
101 }
102 output.append("\n");
103 dos.write((output.toString()).getBytes());
104 } catch (SocketException e) {
105 log.warn("Error while settin soTimeout to 120000");
106 }
107 dis = new DataInputStream(s
108 .getInputStream());
109 dis.readFully(new byte[3]); //read "OK\n"110 StringBuilder sb = new StringBuilder();
111 sb.append("Subscribe to ");
112 sb.append(hostname);
113 sb.append(":");
114 sb.append(port);
115 sb.append(" for record type: ");
116 sb.append(recordType);
117 log.info(sb.toString());
118 Thread t=new Thread (this);
119 t.start();
120 } catch (IOException e) {
121 log.error(ExceptionUtil.getStackTrace(e));
122 stop();
123 }
124 }
125126/*127 * Read the current chunks in the SDL queue.128 * @return List of chunks in the SDL queue.129 */130publicsynchronized Collection<Chunk> read() throws NoSuchElementException {
131 Collection<Chunk> list = Collections.synchronizedCollection(q);
132return list;
133 }
134135/*136 * Unsubscribe from Chukwa collector and stop streaming.137 */138publicvoid stop() {
139if(s!=null) {
140try {
141 dis.close();
142 dos.close();
143 s.close();
144 StringBuilder sb = new StringBuilder();
145 sb.append("Unsubscribe from ");
146 sb.append(hostname);
147 sb.append(":");
148 sb.append(port);
149 sb.append(" for data type: ");
150 sb.append(recordType);
151 log.info(sb.toString());
152 running = false;
153 } catch (IOException e) {
154 log.debug("Unable to close Socket Tee client socket.");
155 }
156 }
157 }
158159/*160 * Check if streaming is currently happening for the current instance of SDL.161 * @return running state of the SDL,162 */163publicboolean running() {
164return running;
165 }
166167/*168 * Background thread for reading data from SocketTeeWriter, and add new data169 * into SDL queue.170 */171 @Override
172publicvoid run() {
173try {
174Chunk c;
175while ((c = ChunkImpl.read(dis)) != null) {
176 StringBuilder sb = new StringBuilder();
177 sb.append("Chunk received, recordType:");
178 sb.append(c.getDataType());
179 log.debug(sb);
180if(q.size()>QUEUE_MAX) {
181 q.poll();
182 }
183 q.offer(c);
184 }
185 } catch (IOException e) {
186 log.error(ExceptionUtil.getStackTrace(e));
187 stop();
188 }
189 }
190 }