This project has retired. For details please refer to its
Attic page.
AgentControlSocketListener xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedOutputStream;
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.io.PrintStream;
28 import java.net.*;
29 import java.nio.charset.Charset;
30 import java.util.Map;
31
32 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
33 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
34 import org.apache.log4j.Logger;
35 import org.apache.hadoop.chukwa.util.ExceptionUtil;
36
37
38
39
40
41
42
43
44
45
46 public class AgentControlSocketListener extends Thread {
47
48 static Logger log = Logger.getLogger(AgentControlSocketListener.class);
49
50 protected ChukwaAgent agent;
51 protected int portno;
52 protected ServerSocket s = null;
53 volatile boolean closing = false;
54 static final String VERSION = "0.4.0-dev";
55 public boolean ALLOW_REMOTE = true;
56 public static final String REMOTE_ACCESS_OPT = "chukwaAgent.control.remote";
57
58 private class ListenThread extends Thread {
59 Socket connection;
60
61 ListenThread(Socket conn) {
62 connection = conn;
63 try {
64 connection.setSoTimeout(60000);
65 } catch (SocketException e) {
66 log.warn("Error while settin soTimeout to 60000");
67 e.printStackTrace();
68 }
69 this.setName("listen thread for " + connection.getRemoteSocketAddress());
70 }
71
72 public void run() {
73 try {
74 InputStream in = connection.getInputStream();
75 BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
76 PrintStream out = new PrintStream(new BufferedOutputStream(connection
77 .getOutputStream()), true, "UTF-8");
78 String cmd = null;
79 while ((cmd = br.readLine()) != null) {
80 processCommand(cmd, out);
81 }
82 connection.close();
83 if (log.isDebugEnabled()) {
84 log.debug("control connection closed");
85 }
86 } catch (SocketException e) {
87 if (e.getMessage().equals("Socket Closed"))
88 log.info("control socket closed");
89 } catch (IOException e) {
90 log.warn("a control connection broke", e);
91 try {
92 connection.close();
93 } catch(Exception ex) {
94 log.debug(ExceptionUtil.getStackTrace(ex));
95 }
96 }
97 }
98
99
100
101
102
103
104
105
106 public void processCommand(String cmd, PrintStream out) throws IOException {
107 String[] words = cmd.split("\\s+");
108 if (log.isDebugEnabled()) {
109 log.debug("command from " + connection.getRemoteSocketAddress() + ":"
110 + cmd);
111 }
112
113 if (words[0].equalsIgnoreCase("help")) {
114 out.println("you're talking to the Chukwa agent. Commands available: ");
115 out.println("add [adaptorname] [args] [offset] -- start an adaptor");
116 out.println("shutdown [adaptornumber] -- graceful stop");
117 out.println("stop [adaptornumber] -- abrupt stop");
118 out.println("list -- list running adaptors");
119 out.println("close -- close this connection");
120 out.println("stopagent -- stop the whole agent process");
121 out.println("stopall -- stop all adaptors");
122 out.println("reloadCollectors -- reload the list of collectors");
123 out.println("help -- print this message");
124 out.println("\t Command names are case-blind.");
125 } else if (words[0].equalsIgnoreCase("close")) {
126 connection.close();
127 } else if (words[0].equalsIgnoreCase("add")) {
128 try {
129 String newID = agent.processAddCommandE(cmd);
130 if (newID != null)
131 out.println("OK add completed; new ID is " + newID);
132 else
133 out.println("failed to start adaptor...check logs for details");
134 } catch(AdaptorException e) {
135 out.println(e);
136 }
137 } else if (words[0].equalsIgnoreCase("shutdown")) {
138 if (words.length < 2) {
139 out.println("need to specify an adaptor to shut down, by number");
140 } else {
141 sanitizeAdaptorName(out, words);
142 long offset = agent.stopAdaptor(words[1], AdaptorShutdownPolicy.GRACEFULLY);
143 if (offset != -1)
144 out.println("OK adaptor " + words[1] + " stopping gracefully at "
145 + offset);
146 else
147 out.println("FAIL: perhaps adaptor " + words[1] + " does not exist");
148 }
149 } else if (words[0].equalsIgnoreCase("stop")) {
150 if (words.length < 2) {
151 out.println("need to specify an adaptor to shut down, by number");
152 } else {
153 sanitizeAdaptorName(out, words);
154 agent.stopAdaptor(words[1], AdaptorShutdownPolicy.HARD_STOP);
155 out.println("OK adaptor " + words[1] + " stopped");
156 }
157 } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
158 agent.getConnector().reloadConfiguration();
159 out.println("OK reloadCollectors done");
160 } else if (words[0].equalsIgnoreCase("list")) {
161 java.util.Map<String, String> adaptorList = agent.getAdaptorList();
162
163 if (log.isDebugEnabled()) {
164 log.debug("number of adaptors: " + adaptorList.size());
165 }
166
167 for (Map.Entry<String, String> a: adaptorList.entrySet()) {
168 out.print(a.getKey());
169 out.print(") ");
170 out.print(" ");
171 out.println(a.getValue());
172 }
173 out.println("");
174
175 } else if (words[0].equalsIgnoreCase("stopagent")) {
176 out.println("stopping agent process.");
177 connection.close();
178 agent.shutdown(true);
179 } else if(words[0].equalsIgnoreCase("stopall")) {
180 int stopped = 0;
181 for(String id: agent.getAdaptorList().keySet()) {
182 agent.stopAdaptor(id, false);
183 stopped++;
184 }
185 out.println("stopped " + stopped + " adaptors");
186 } else if (words[0].equals("")) {
187 out.println(getStatusLine());
188 } else {
189 log.warn("unknown command " + words[0]);
190 out.println("unknown command " + words[0]);
191 out.println("say 'help' for a list of legal commands");
192 }
193 out.flush();
194 }
195
196 private void sanitizeAdaptorName(PrintStream out, String[] words) {
197 if(!words[1].startsWith("adaptor_")) {
198 words[1] = "adaptor_" + words[1];
199 out.println("adaptor names should start with adaptor_; "
200 +"assuming you meant"+ words[1] );
201 }
202 }
203
204 }
205
206
207
208
209
210 public AgentControlSocketListener(ChukwaAgent agent) {
211
212 this.setDaemon(false);
213 this.agent = agent;
214 this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
215 9093);
216 this.ALLOW_REMOTE = agent.getConfiguration().getBoolean(REMOTE_ACCESS_OPT, ALLOW_REMOTE);
217 log.info("AgentControlSocketListerner ask for port: " + portno);
218 this.setName("control socket listener");
219 }
220
221
222
223
224
225
226 public void run() {
227 try {
228 if (!isBound())
229 tryToBind();
230 } catch (IOException e) {
231 return;
232 }
233
234 while (!closing) {
235 try {
236 Socket connection = s.accept();
237 if (log.isDebugEnabled()) {
238 log.debug("new connection from " + connection.getInetAddress());
239 }
240 ListenThread l = new ListenThread(connection);
241 l.setDaemon(true);
242 l.start();
243 } catch (IOException e) {
244 if (!closing)
245 log.warn("control socket error: ", e);
246 else {
247 log.warn("shutting down listen thread due to shutdown() call");
248 break;
249 }
250 }
251 }
252 }
253
254
255
256
257 public void shutdown() {
258 closing = true;
259 try {
260 if (s != null)
261 s.close();
262 s = null;
263 } catch (IOException e) {
264 log.debug(ExceptionUtil.getStackTrace(e));
265 }
266 }
267
268 public boolean isBound() {
269 return s != null && s.isBound();
270 }
271
272 public void tryToBind() throws IOException {
273 if(ALLOW_REMOTE)
274 s = new ServerSocket(portno);
275 else {
276 s = new ServerSocket();
277 s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
278 }
279 s.setReuseAddress(true);
280 portno = s.getLocalPort();
281 if (s.isBound())
282 log.info("socket bound to " + s.getLocalPort());
283 else
284 log.info("socket isn't bound");
285 }
286
287 public int getPort() {
288 if (!s.isBound()) {
289 return -1;
290 } else {
291 return portno;
292 }
293 }
294
295
296
297 private static String localHostAddr;
298 static {
299 try {
300 localHostAddr = InetAddress.getLocalHost().getHostName();
301 } catch (UnknownHostException e) {
302 localHostAddr = "localhost";
303 }
304 }
305
306 public String getStatusLine() {
307 int adaptorCount = agent.adaptorCount();
308
309 return localHostAddr + ": Chukwa Agent running, version " + VERSION + ", with " + adaptorCount + " adaptors";
310 }
311 }