This project has retired. For details please refer to its
Attic page.
AgentControlSocketListener xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedOutputStream;
23 import java.io.BufferedReader;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.io.PrintStream;
28 import java.net.*;
29 import java.nio.charset.Charset;
30 import java.util.Map;
31
32 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
33 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
34 import org.apache.log4j.Logger;
35 import org.apache.hadoop.chukwa.util.ExceptionUtil;
36
37
38
39
40
41
42
43
44
45
46 public class AgentControlSocketListener extends Thread {
47
48 static Logger log = Logger.getLogger(AgentControlSocketListener.class);
49
50 protected ChukwaAgent agent;
51 protected int portno;
52 protected ServerSocket s = null;
53 volatile boolean closing = false;
54 static final String VERSION = "0.4.0-dev";
55 public boolean ALLOW_REMOTE = true;
56 public static final String REMOTE_ACCESS_OPT = "chukwaAgent.control.remote";
57
58 private class ListenThread extends Thread {
59 Socket connection;
60
61 ListenThread(Socket conn) {
62 connection = conn;
63 try {
64 connection.setSoTimeout(60000);
65 } catch (SocketException e) {
66 log.warn("Error while settin soTimeout to 60000");
67 e.printStackTrace();
68 }
69 this.setName("listen thread for " + connection.getRemoteSocketAddress());
70 }
71
72 public void run() {
73 try {
74 InputStream in = connection.getInputStream();
75 BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
76 PrintStream out = new PrintStream(new BufferedOutputStream(connection
77 .getOutputStream()), true, "UTF-8");
78 String cmd = null;
79 while ((cmd = br.readLine()) != null) {
80 processCommand(cmd, out);
81 }
82 connection.close();
83 if (log.isDebugEnabled()) {
84 log.debug("control connection closed");
85 }
86 } catch (SocketException e) {
87 if (e.getMessage().equals("Socket Closed"))
88 log.info("control socket closed");
89 } catch (IOException e) {
90 log.warn("a control connection broke", e);
91 try {
92 connection.close();
93 } catch(Exception ex) {
94 log.debug(ExceptionUtil.getStackTrace(ex));
95 }
96 }
97 }
98
99
100
101
102
103
104
105
106 public void processCommand(String cmd, PrintStream out) throws IOException {
107 String[] words = cmd.split("\\s+");
108 if (log.isDebugEnabled()) {
109 log.debug("command from " + connection.getRemoteSocketAddress() + ":"
110 + cmd);
111 }
112
113 if (words[0].equalsIgnoreCase("help")) {
114 out.println("you're talking to the Chukwa agent. Commands available: ");
115 out.println("add [adaptorname] [args] [offset] -- start an adaptor");
116 out.println("shutdown [adaptornumber] -- graceful stop");
117 out.println("stop [adaptornumber] -- abrupt stop");
118 out.println("list -- list running adaptors");
119 out.println("close -- close this connection");
120 out.println("stopagent -- stop the whole agent process");
121 out.println("stopall -- stop all adaptors");
122 out.println("reloadCollectors -- reload the list of collectors");
123 out.println("help -- print this message");
124 out.println("\t Command names are case-blind.");
125 } else if (words[0].equalsIgnoreCase("close")) {
126 connection.close();
127 } else if (words[0].equalsIgnoreCase("add")) {
128 try {
129 String newID = agent.processAddCommandE(cmd);
130 if (newID != null)
131 out.println("OK add completed; new ID is " + newID);
132 else
133 out.println("failed to start adaptor...check logs for details");
134 } catch(AdaptorException e) {
135 out.println(e);
136 }
137 } else if (words[0].equalsIgnoreCase("shutdown")) {
138 if (words.length < 2) {
139 out.println("need to specify an adaptor to shut down, by number");
140 } else {
141 sanitizeAdaptorName(out, words);
142 long offset = agent.stopAdaptor(words[1], AdaptorShutdownPolicy.GRACEFULLY);
143 if (offset != -1)
144 out.println("OK adaptor " + words[1] + " stopping gracefully at "
145 + offset);
146 else
147 out.println("FAIL: perhaps adaptor " + words[1] + " does not exist");
148 }
149 } else if (words[0].equalsIgnoreCase("stop")) {
150 if (words.length < 2) {
151 out.println("need to specify an adaptor to shut down, by number");
152 } else {
153 sanitizeAdaptorName(out, words);
154 agent.stopAdaptor(words[1], AdaptorShutdownPolicy.HARD_STOP);
155 out.println("OK adaptor " + words[1] + " stopped");
156 }
157 } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
158 agent.getConnector().reloadConfiguration();
159 out.println("OK reloadCollectors done");
160 } else if (words[0].equalsIgnoreCase("list")) {
161 java.util.Map<String, String> adaptorList = agent.getAdaptorList();
162
163 if (log.isDebugEnabled()) {
164 log.debug("number of adaptors: " + adaptorList.size());
165 }
166
167 for (Map.Entry<String, String> a: adaptorList.entrySet()) {
168 out.print(a.getKey());
169 out.print(") ");
170 out.print(" ");
171 out.println(a.getValue());
172 }
173 out.println("");
174
175 } else if (words[0].equalsIgnoreCase("stopagent")) {
176 out.println("stopping agent process.");
177 connection.close();
178 agent.shutdown(true);
179 } else if(words[0].equalsIgnoreCase("stopall")) {
180 int stopped = 0;
181 for(String id: agent.getAdaptorList().keySet()) {
182 agent.stopAdaptor(id, false);
183 stopped++;
184 }
185 out.println("stopped " + stopped + " adaptors");
186 } else if (words[0].equals("")) {
187 out.println(getStatusLine());
188 } else {
189 log.warn("unknown command " + words[0]);
190 out.println("unknown command " + words[0]);
191 out.println("say 'help' for a list of legal commands");
192 }
193 out.flush();
194 }
195
196 private void sanitizeAdaptorName(PrintStream out, String[] words) {
197 if(!words[1].startsWith("adaptor_")) {
198 words[1] = "adaptor_" + words[1];
199 out.println("adaptor names should start with adaptor_; "
200 +"assuming you meant"+ words[1] );
201 }
202 }
203
204 }
205
206
207
208
209
210
211 public AgentControlSocketListener(ChukwaAgent agent) {
212
213 this.setDaemon(false);
214 this.agent = agent;
215 this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
216 9093);
217 this.ALLOW_REMOTE = agent.getConfiguration().getBoolean(REMOTE_ACCESS_OPT, ALLOW_REMOTE);
218 log.info("AgentControlSocketListerner ask for port: " + portno);
219 this.setName("control socket listener");
220 }
221
222
223
224
225
226
227 public void run() {
228 try {
229 if (!isBound())
230 tryToBind();
231 } catch (IOException e) {
232 return;
233 }
234
235 while (!closing) {
236 try {
237 Socket connection = s.accept();
238 if (log.isDebugEnabled()) {
239 log.debug("new connection from " + connection.getInetAddress());
240 }
241 ListenThread l = new ListenThread(connection);
242 l.setDaemon(true);
243 l.start();
244 } catch (IOException e) {
245 if (!closing)
246 log.warn("control socket error: ", e);
247 else {
248 log.warn("shutting down listen thread due to shutdown() call");
249 break;
250 }
251 }
252 }
253 }
254
255
256
257
258 public void shutdown() {
259 closing = true;
260 try {
261 if (s != null)
262 s.close();
263 s = null;
264 } catch (IOException e) {
265 log.debug(ExceptionUtil.getStackTrace(e));
266 }
267 }
268
269 public boolean isBound() {
270 return s != null && s.isBound();
271 }
272
273 public void tryToBind() throws IOException {
274 if(ALLOW_REMOTE)
275 s = new ServerSocket(portno);
276 else {
277 s = new ServerSocket();
278 s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
279 }
280 s.setReuseAddress(true);
281 portno = s.getLocalPort();
282 if (s.isBound())
283 log.info("socket bound to " + s.getLocalPort());
284 else
285 log.info("socket isn't bound");
286 }
287
288 public int getPort() {
289 if (!s.isBound()) {
290 return -1;
291 } else {
292 return portno;
293 }
294 }
295
296
297
298 private static String localHostAddr;
299 static {
300 try {
301 localHostAddr = InetAddress.getLocalHost().getHostName();
302 } catch (UnknownHostException e) {
303 localHostAddr = "localhost";
304 }
305 }
306
307 public String getStatusLine() {
308 int adaptorCount = agent.adaptorCount();
309
310 return localHostAddr + ": Chukwa Agent running, version " + VERSION + ", with " + adaptorCount + " adaptors";
311 }
312 }