1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.dataloader;
1920import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.net.Socket;
24import java.net.SocketException;
25import java.nio.charset.Charset;
26import java.util.Collection;
27import java.util.Collections;
28import java.util.Iterator;
29import java.util.LinkedList;
30import java.util.NoSuchElementException;
31import java.util.Queue;
32import java.util.regex.Matcher;
33import java.util.regex.Pattern;
3435import org.apache.hadoop.chukwa.Chunk;
36import org.apache.hadoop.chukwa.ChunkImpl;
37import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
38import org.apache.hadoop.chukwa.datacollection.DataFactory;
39import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
40import org.apache.hadoop.chukwa.util.ExceptionUtil;
41import org.apache.log4j.Logger;
4243/**44 * Socket Data Loader, also known as the SDL, is a framework for allowing direct45 * access to log data under the Chukwa Collector in a safe and efficient manner.46 * Subscribe to chukwaCollector.tee.port for data streaming.47 * Defaults socket tee port is 9094.48 */49publicclassSocketDataLoaderimplements Runnable {
50private String hostname = "localhost";
51privateint port = 9094;
52privatestatic Logger log = Logger.getLogger(SocketDataLoader.class);
53private Socket s = null;
54private DataInputStream dis = null;
55private DataOutputStream dos = null;
56private Queue<Chunk> q = new LinkedList<Chunk>();
57private String recordType = null;
58privateboolean running = false;
59privatestaticfinalint QUEUE_MAX = 10;
60private Iterator<String> collectors = null;
61privatestatic Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
6263/*64 * Create and start an instance of SocketDataLoader.65 * @param Record Type66 */67publicSocketDataLoader(String recordType) {
68this.recordType = recordType;
69try {
70 collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
71 } catch (IOException e) {
72 log.error(ExceptionUtil.getStackTrace(e));
73 }
74 Matcher m = pattern.matcher(collectors.next());
75// Socket data loader only supports to stream data from a single collector. 76// For large deployment, it may require to setup multi-tiers of collectors to77// channel data into a single collector for display.78if(m.matches()) {
79 hostname = m.group(2);
80 }
81 start();
82 }
8384/*85 * Establish a connection to chukwa collector and filter data stream86 * base on record type.87 */88publicsynchronizedvoid start() {
89try {
90 running = true;
91 s = new Socket(hostname, port);
92try {
93 s.setSoTimeout(120000);
94 dos = new DataOutputStream (s.getOutputStream());
95 StringBuilder output = new StringBuilder();
96 output.append(SocketTeeWriter.WRITABLE);
97if(recordType.toLowerCase().intern()!="all".intern()) {
98 output.append(" datatype=");
99 output.append(recordType);
100 } else {
101 output.append(" all");
102 }
103 output.append("\n");
104 dos.write((output.toString()).getBytes(Charset.forName("UTF-8")));
105 } catch (SocketException e) {
106 log.warn("Error while settin soTimeout to 120000");
107 }
108 dis = new DataInputStream(s
109 .getInputStream());
110 dis.readFully(new byte[3]); //read "OK\n"111 StringBuilder sb = new StringBuilder();
112 sb.append("Subscribe to ");
113 sb.append(hostname);
114 sb.append(":");
115 sb.append(port);
116 sb.append(" for record type: ");
117 sb.append(recordType);
118 log.info(sb.toString());
119 Thread t=new Thread (this);
120 t.start();
121 } catch (IOException e) {
122 log.error(ExceptionUtil.getStackTrace(e));
123 stop();
124 }
125 }
126127/*128 * Read the current chunks in the SDL queue.129 * @return List of chunks in the SDL queue.130 */131publicsynchronized Collection<Chunk> read() throws NoSuchElementException {
132 Collection<Chunk> list = Collections.synchronizedCollection(q);
133return list;
134 }
135136/*137 * Unsubscribe from Chukwa collector and stop streaming.138 */139publicsynchronizedvoid stop() {
140if(s!=null) {
141try {
142 dis.close();
143 dos.close();
144 s.close();
145 StringBuilder sb = new StringBuilder();
146 sb.append("Unsubscribe from ");
147 sb.append(hostname);
148 sb.append(":");
149 sb.append(port);
150 sb.append(" for data type: ");
151 sb.append(recordType);
152 log.info(sb.toString());
153 running = false;
154 } catch (IOException e) {
155 log.debug("Unable to close Socket Tee client socket.");
156 }
157 }
158 }
159160/*161 * Check if streaming is currently happening for the current instance of SDL.162 * @return running state of the SDL,163 */164publicboolean running() {
165return running;
166 }
167168/*169 * Background thread for reading data from SocketTeeWriter, and add new data170 * into SDL queue.171 */172 @Override
173publicsynchronizedvoid run() {
174try {
175Chunk c;
176while ((c = ChunkImpl.read(dis)) != null) {
177 StringBuilder sb = new StringBuilder();
178 sb.append("Chunk received, recordType:");
179 sb.append(c.getDataType());
180 log.debug(sb);
181if(q.size()>QUEUE_MAX) {
182 q.poll();
183 }
184 q.offer(c);
185 }
186 } catch (IOException e) {
187 log.error(ExceptionUtil.getStackTrace(e));
188 stop();
189 }
190 }
191 }