1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import java.io.DataInputStream;
21 import java.io.DataOutputStream;
22 import java.io.IOException;
23 import java.net.Socket;
24 import java.net.SocketException;
25 import java.nio.charset.Charset;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.LinkedList;
30 import java.util.NoSuchElementException;
31 import java.util.Queue;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34
35 import org.apache.hadoop.chukwa.Chunk;
36 import org.apache.hadoop.chukwa.ChunkImpl;
37 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
38 import org.apache.hadoop.chukwa.datacollection.DataFactory;
39 import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
40 import org.apache.hadoop.chukwa.util.ExceptionUtil;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48
49 public class SocketDataLoader implements Runnable {
50 private String hostname = "localhost";
51 private int port = 9094;
52 private static Logger log = Logger.getLogger(SocketDataLoader.class);
53 private Socket s = null;
54 private DataInputStream dis = null;
55 private DataOutputStream dos = null;
56 private Queue<Chunk> q = new LinkedList<Chunk>();
57 private String recordType = null;
58 private boolean running = false;
59 private static final int QUEUE_MAX = 10;
60 private Iterator<String> collectors = null;
61 private static Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
62
63
64
65
66
67 public SocketDataLoader(String recordType) {
68 this.recordType = recordType;
69 try {
70 collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
71 } catch (IOException e) {
72 log.error(ExceptionUtil.getStackTrace(e));
73 }
74 Matcher m = pattern.matcher(collectors.next());
75
76
77
78 if(m.matches()) {
79 hostname = m.group(2);
80 }
81 start();
82 }
83
84
85
86
87
88 public synchronized void start() {
89 try {
90 running = true;
91 s = new Socket(hostname, port);
92 try {
93 s.setSoTimeout(120000);
94 dos = new DataOutputStream (s.getOutputStream());
95 StringBuilder output = new StringBuilder();
96 output.append(SocketTeeWriter.WRITABLE);
97 if(recordType.toLowerCase().intern()!="all".intern()) {
98 output.append(" datatype=");
99 output.append(recordType);
100 } else {
101 output.append(" all");
102 }
103 output.append("\n");
104 dos.write((output.toString()).getBytes(Charset.forName("UTF-8")));
105 } catch (SocketException e) {
106 log.warn("Error while settin soTimeout to 120000");
107 }
108 dis = new DataInputStream(s
109 .getInputStream());
110 dis.readFully(new byte[3]);
111 StringBuilder sb = new StringBuilder();
112 sb.append("Subscribe to ");
113 sb.append(hostname);
114 sb.append(":");
115 sb.append(port);
116 sb.append(" for record type: ");
117 sb.append(recordType);
118 log.info(sb.toString());
119 Thread t=new Thread (this);
120 t.start();
121 } catch (IOException e) {
122 log.error(ExceptionUtil.getStackTrace(e));
123 stop();
124 }
125 }
126
127
128
129
130
131 public synchronized Collection<Chunk> read() throws NoSuchElementException {
132 Collection<Chunk> list = Collections.synchronizedCollection(q);
133 return list;
134 }
135
136
137
138
139 public synchronized void stop() {
140 if(s!=null) {
141 try {
142 dis.close();
143 dos.close();
144 s.close();
145 StringBuilder sb = new StringBuilder();
146 sb.append("Unsubscribe from ");
147 sb.append(hostname);
148 sb.append(":");
149 sb.append(port);
150 sb.append(" for data type: ");
151 sb.append(recordType);
152 log.info(sb.toString());
153 running = false;
154 } catch (IOException e) {
155 log.debug("Unable to close Socket Tee client socket.");
156 }
157 }
158 }
159
160
161
162
163
164 public boolean running() {
165 return running;
166 }
167
168
169
170
171
172 @Override
173 public synchronized void run() {
174 try {
175 Chunk c;
176 while ((c = ChunkImpl.read(dis)) != null) {
177 StringBuilder sb = new StringBuilder();
178 sb.append("Chunk received, recordType:");
179 sb.append(c.getDataType());
180 log.debug(sb);
181 if(q.size()>QUEUE_MAX) {
182 q.poll();
183 }
184 q.offer(c);
185 }
186 } catch (IOException e) {
187 log.error(ExceptionUtil.getStackTrace(e));
188 stop();
189 }
190 }
191 }