This project has retired. For details please refer to its Attic page.
AgentControlSocketListener xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.io.BufferedOutputStream;
23  import java.io.BufferedReader;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.InputStreamReader;
27  import java.io.PrintStream;
28  import java.net.*;
29  import java.nio.charset.Charset;
30  import java.util.Map;
31  
32  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
33  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
34  import org.apache.log4j.Logger;
35  import org.apache.hadoop.chukwa.util.ExceptionUtil;
36  
37  /**
38   * Class to handle the agent control protocol. This is a simple line-oriented
39   * ASCII protocol, that is designed to be easy to work with both
40   * programmatically and via telnet.
41   * 
42   * The port to bind to can be specified by setting option
43   * chukwaAgent.agent.control.port. A port of 0 creates a socket on any free
44   * port.
45   */
46  public class AgentControlSocketListener extends Thread {
47  
48    static Logger log = Logger.getLogger(AgentControlSocketListener.class);
49  
50    protected ChukwaAgent agent;
51    protected int portno;
52    protected ServerSocket s = null;
53    volatile boolean closing = false;
54    static final String VERSION = "0.4.0-dev";
55    public boolean ALLOW_REMOTE = true;
56    public static final String REMOTE_ACCESS_OPT = "chukwaAgent.control.remote";
57  
58    private class ListenThread extends Thread {
59      Socket connection;
60  
61      ListenThread(Socket conn) {
62        connection = conn;
63        try {
64          connection.setSoTimeout(60000);
65        } catch (SocketException e) {
66          log.warn("Error while settin soTimeout to 60000");
67          e.printStackTrace();
68        }
69        this.setName("listen thread for " + connection.getRemoteSocketAddress());
70      }
71  
72      public void run() {
73        try {
74          InputStream in = connection.getInputStream();
75          BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
76          PrintStream out = new PrintStream(new BufferedOutputStream(connection
77              .getOutputStream()), true, "UTF-8");
78          String cmd = null;
79          while ((cmd = br.readLine()) != null) {
80            processCommand(cmd, out);
81          }
82          connection.close();
83          if (log.isDebugEnabled()) {
84            log.debug("control connection closed");
85          }
86        } catch (SocketException e) {
87          if (e.getMessage().equals("Socket Closed"))
88            log.info("control socket closed");
89        } catch (IOException e) {
90          log.warn("a control connection broke", e);
91          try {
92            connection.close();
93          } catch(Exception ex) {
94            log.debug(ExceptionUtil.getStackTrace(ex));
95          }
96        }
97      }
98  
99      /**
100      * process a protocol command
101      * 
102      * @param cmd the command given by the user
103      * @param out a PrintStream writing to the socket
104      * @throws IOException
105      */
106     public void processCommand(String cmd, PrintStream out) throws IOException {
107       String[] words = cmd.split("\\s+");
108       if (log.isDebugEnabled()) {
109         log.debug("command from " + connection.getRemoteSocketAddress() + ":"
110             + cmd);
111       }
112 
113       if (words[0].equalsIgnoreCase("help")) {
114         out.println("you're talking to the Chukwa agent.  Commands available: ");
115         out.println("add [adaptorname] [args] [offset] -- start an adaptor");
116         out.println("shutdown [adaptornumber]  -- graceful stop");
117         out.println("stop [adaptornumber]  -- abrupt stop");
118         out.println("list -- list running adaptors");
119         out.println("close -- close this connection");
120         out.println("stopagent -- stop the whole agent process");
121         out.println("stopall -- stop all adaptors");
122         out.println("reloadCollectors -- reload the list of collectors");
123         out.println("help -- print this message");
124         out.println("\t Command names are case-blind.");
125       } else if (words[0].equalsIgnoreCase("close")) {
126         connection.close();
127       } else if (words[0].equalsIgnoreCase("add")) {
128         try {
129           String newID = agent.processAddCommandE(cmd);
130           if (newID != null)
131             out.println("OK add completed; new ID is " + newID);
132           else
133             out.println("failed to start adaptor...check logs for details");
134         } catch(AdaptorException e) {
135           out.println(e);
136         }
137       } else if (words[0].equalsIgnoreCase("shutdown")) {
138         if (words.length < 2) {
139           out.println("need to specify an adaptor to shut down, by number");
140         } else {
141           sanitizeAdaptorName(out, words);
142           long offset = agent.stopAdaptor(words[1], AdaptorShutdownPolicy.GRACEFULLY);
143           if (offset != -1)
144             out.println("OK adaptor " + words[1] + " stopping gracefully at "
145                 + offset);
146           else
147             out.println("FAIL: perhaps adaptor " + words[1] + " does not exist");
148         }
149       } else if (words[0].equalsIgnoreCase("stop")) {
150         if (words.length < 2) {
151           out.println("need to specify an adaptor to shut down, by number");
152         } else {
153           sanitizeAdaptorName(out, words);
154           agent.stopAdaptor(words[1], AdaptorShutdownPolicy.HARD_STOP);
155           out.println("OK adaptor " + words[1] + " stopped");
156         }
157       } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
158         agent.getConnector().reloadConfiguration();
159         out.println("OK reloadCollectors done");
160       } else if (words[0].equalsIgnoreCase("list")) {
161         java.util.Map<String, String> adaptorList = agent.getAdaptorList();
162 
163         if (log.isDebugEnabled()) {
164           log.debug("number of adaptors: " + adaptorList.size());
165         }
166 
167         for (Map.Entry<String, String> a: adaptorList.entrySet()) {
168             out.print(a.getKey());
169             out.print(") ");
170             out.print(" ");
171             out.println(a.getValue());
172           }
173           out.println("");
174         
175       } else if (words[0].equalsIgnoreCase("stopagent")) {
176         out.println("stopping agent process.");
177         connection.close();
178         agent.shutdown(true);
179       } else if(words[0].equalsIgnoreCase("stopall")) {
180         int stopped = 0;
181         for(String id: agent.getAdaptorList().keySet()) {
182           agent.stopAdaptor(id, false);
183           stopped++;
184          }
185         out.println("stopped " + stopped + " adaptors");
186       } else if (words[0].equals("")) {
187         out.println(getStatusLine());
188       } else {
189         log.warn("unknown command " + words[0]);
190         out.println("unknown command " + words[0]);
191         out.println("say 'help' for a list of legal commands");
192       }
193       out.flush();
194     }
195 
196     private void sanitizeAdaptorName(PrintStream out, String[] words) {
197       if(!words[1].startsWith("adaptor_")) {
198         words[1] = "adaptor_" + words[1];
199         out.println("adaptor names should start with adaptor_; "
200             +"assuming you meant"+ words[1] );
201       }
202     }
203 
204   }
205 
206   /**
207    * Initializes listener, but does not bind to socket.
208    * @param agent the agent to control
209    */
210   public AgentControlSocketListener(ChukwaAgent agent) {
211 
212     this.setDaemon(false); // to keep the local agent alive
213     this.agent = agent;
214     this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
215         9093);
216     this.ALLOW_REMOTE = agent.getConfiguration().getBoolean(REMOTE_ACCESS_OPT, ALLOW_REMOTE);
217     log.info("AgentControlSocketListerner ask for port: " + portno);
218     this.setName("control socket listener");
219   }
220 
221 
222 
223   /**
224    * Binds to socket, starts looping listening for commands
225    */
226   public void run() {
227     try {
228       if (!isBound())
229         tryToBind();
230     } catch (IOException e) {
231       return;
232     }
233 
234     while (!closing) {
235       try {
236         Socket connection = s.accept();
237         if (log.isDebugEnabled()) {
238           log.debug("new connection from " + connection.getInetAddress());
239         }
240         ListenThread l = new ListenThread(connection);
241         l.setDaemon(true);
242         l.start();
243       } catch (IOException e) {
244         if (!closing)
245           log.warn("control socket error: ", e);
246         else {
247           log.warn("shutting down listen thread due to shutdown() call");
248           break;
249         }
250       }
251     }// end while
252   }
253 
254   /**
255    * Close the control socket, and exit. Triggers graceful thread shutdown.
256    */
257   public void shutdown() {
258     closing = true;
259     try {
260       if (s != null)
261         s.close();
262       s = null;
263     } catch (IOException e) {
264       log.debug(ExceptionUtil.getStackTrace(e));
265     } // ignore exception on close
266   }
267 
268   public boolean isBound() {
269     return s != null && s.isBound();
270   }
271 
272   public void tryToBind() throws IOException {
273     if(ALLOW_REMOTE)
274       s = new ServerSocket(portno);
275     else {  //FIXME: is there a way to allow all local addresses? (including IPv6 local)
276       s = new ServerSocket();
277       s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
278     }
279     s.setReuseAddress(true);
280     portno = s.getLocalPort();
281     if (s.isBound())
282       log.info("socket bound to " + s.getLocalPort());
283     else
284       log.info("socket isn't bound");
285   }
286 
287   public int getPort() {
288     if (!s.isBound()) {
289       return -1;
290     } else {
291       return portno;
292     }
293   }
294   
295   //FIXME: we also do this in ChunkImpl; should really do it only once
296   //and make it visible everywhere?
297   private static String localHostAddr;
298   static {
299     try {
300       localHostAddr = InetAddress.getLocalHost().getHostName();
301     } catch (UnknownHostException e) {
302       localHostAddr = "localhost";
303     }
304   }
305   
306   public String getStatusLine() {
307     int adaptorCount = agent.adaptorCount();
308     
309     return localHostAddr + ": Chukwa Agent running, version " + VERSION + ", with " + adaptorCount + " adaptors";
310   }
311 }