This project has retired. For details please refer to its Attic page.
AgentControlSocketListener xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.io.BufferedOutputStream;
23  import java.io.BufferedReader;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.InputStreamReader;
27  import java.io.PrintStream;
28  import java.net.*;
29  import java.util.Map;
30  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
31  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
32  import org.apache.log4j.Logger;
33  import org.apache.hadoop.chukwa.util.ExceptionUtil;
34  
35  /**
36   * Class to handle the agent control protocol. This is a simple line-oriented
37   * ASCII protocol, that is designed to be easy to work with both
38   * programmatically and via telnet.
39   * 
40   * The port to bind to can be specified by setting option
41   * chukwaAgent.agent.control.port. A port of 0 creates a socket on any free
42   * port.
43   */
44  public class AgentControlSocketListener extends Thread {
45  
46    static Logger log = Logger.getLogger(AgentControlSocketListener.class);
47  
48    protected ChukwaAgent agent;
49    protected int portno;
50    protected ServerSocket s = null;
51    volatile boolean closing = false;
52    static final String VERSION = "0.4.0-dev";
53    public boolean ALLOW_REMOTE = true;
54    public static final String REMOTE_ACCESS_OPT = "chukwaAgent.control.remote";
55  
56    private class ListenThread extends Thread {
57      Socket connection;
58  
59      ListenThread(Socket conn) {
60        connection = conn;
61        try {
62          connection.setSoTimeout(60000);
63        } catch (SocketException e) {
64          log.warn("Error while settin soTimeout to 60000");
65          e.printStackTrace();
66        }
67        this.setName("listen thread for " + connection.getRemoteSocketAddress());
68      }
69  
70      public void run() {
71        try {
72          InputStream in = connection.getInputStream();
73          BufferedReader br = new BufferedReader(new InputStreamReader(in));
74          PrintStream out = new PrintStream(new BufferedOutputStream(connection
75              .getOutputStream()));
76          String cmd = null;
77          while ((cmd = br.readLine()) != null) {
78            processCommand(cmd, out);
79          }
80          connection.close();
81          if (log.isDebugEnabled()) {
82            log.debug("control connection closed");
83          }
84        } catch (SocketException e) {
85          if (e.getMessage().equals("Socket Closed"))
86            log.info("control socket closed");
87        } catch (IOException e) {
88          log.warn("a control connection broke", e);
89          try {
90            connection.close();
91          } catch(Exception ex) {
92            log.debug(ExceptionUtil.getStackTrace(ex));
93          }
94        }
95      }
96  
97      /**
98       * process a protocol command
99       * 
100      * @param cmd the command given by the user
101      * @param out a PrintStream writing to the socket
102      * @throws IOException
103      */
104     public void processCommand(String cmd, PrintStream out) throws IOException {
105       String[] words = cmd.split("\\s+");
106       if (log.isDebugEnabled()) {
107         log.debug("command from " + connection.getRemoteSocketAddress() + ":"
108             + cmd);
109       }
110 
111       if (words[0].equalsIgnoreCase("help")) {
112         out.println("you're talking to the Chukwa agent.  Commands available: ");
113         out.println("add [adaptorname] [args] [offset] -- start an adaptor");
114         out.println("shutdown [adaptornumber]  -- graceful stop");
115         out.println("stop [adaptornumber]  -- abrupt stop");
116         out.println("list -- list running adaptors");
117         out.println("close -- close this connection");
118         out.println("stopagent -- stop the whole agent process");
119         out.println("stopall -- stop all adaptors");
120         out.println("reloadCollectors -- reload the list of collectors");
121         out.println("help -- print this message");
122         out.println("\t Command names are case-blind.");
123       } else if (words[0].equalsIgnoreCase("close")) {
124         connection.close();
125       } else if (words[0].equalsIgnoreCase("add")) {
126         try {
127           String newID = agent.processAddCommandE(cmd);
128           if (newID != null)
129             out.println("OK add completed; new ID is " + newID);
130           else
131             out.println("failed to start adaptor...check logs for details");
132         } catch(AdaptorException e) {
133           out.println(e);
134         }
135       } else if (words[0].equalsIgnoreCase("shutdown")) {
136         if (words.length < 2) {
137           out.println("need to specify an adaptor to shut down, by number");
138         } else {
139           sanitizeAdaptorName(out, words);
140           long offset = agent.stopAdaptor(words[1], AdaptorShutdownPolicy.GRACEFULLY);
141           if (offset != -1)
142             out.println("OK adaptor " + words[1] + " stopping gracefully at "
143                 + offset);
144           else
145             out.println("FAIL: perhaps adaptor " + words[1] + " does not exist");
146         }
147       } else if (words[0].equalsIgnoreCase("stop")) {
148         if (words.length < 2) {
149           out.println("need to specify an adaptor to shut down, by number");
150         } else {
151           sanitizeAdaptorName(out, words);
152           agent.stopAdaptor(words[1], AdaptorShutdownPolicy.HARD_STOP);
153           out.println("OK adaptor " + words[1] + " stopped");
154         }
155       } else if (words[0].equalsIgnoreCase("reloadCollectors")) {
156         agent.getConnector().reloadConfiguration();
157         out.println("OK reloadCollectors done");
158       } else if (words[0].equalsIgnoreCase("list")) {
159         java.util.Map<String, String> adaptorList = agent.getAdaptorList();
160 
161         if (log.isDebugEnabled()) {
162           log.debug("number of adaptors: " + adaptorList.size());
163         }
164 
165         for (Map.Entry<String, String> a: adaptorList.entrySet()) {
166             out.print(a.getKey());
167             out.print(") ");
168             out.print(" ");
169             out.println(a.getValue());
170           }
171           out.println("");
172         
173       } else if (words[0].equalsIgnoreCase("stopagent")) {
174         out.println("stopping agent process.");
175         connection.close();
176         agent.shutdown(true);
177       } else if(words[0].equalsIgnoreCase("stopall")) {
178         int stopped = 0;
179         for(String id: agent.getAdaptorList().keySet()) {
180           agent.stopAdaptor(id, false);
181           stopped++;
182          }
183         out.println("stopped " + stopped + " adaptors");
184       } else if (words[0].equals("")) {
185         out.println(getStatusLine());
186       } else {
187         log.warn("unknown command " + words[0]);
188         out.println("unknown command " + words[0]);
189         out.println("say 'help' for a list of legal commands");
190       }
191       out.flush();
192     }
193 
194     private void sanitizeAdaptorName(PrintStream out, String[] words) {
195       if(!words[1].startsWith("adaptor_")) {
196         words[1] = "adaptor_" + words[1];
197         out.println("adaptor names should start with adaptor_; "
198             +"assuming you meant"+ words[1] );
199       }
200     }
201 
202   }
203 
204   /**
205    * Initializes listener, but does not bind to socket.
206    * 
207    * @param a the agent to control
208    */
209   public AgentControlSocketListener(ChukwaAgent agent) {
210 
211     this.setDaemon(false); // to keep the local agent alive
212     this.agent = agent;
213     this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
214         9093);
215     this.ALLOW_REMOTE = agent.getConfiguration().getBoolean(REMOTE_ACCESS_OPT, ALLOW_REMOTE);
216     log.info("AgentControlSocketListerner ask for port: " + portno);
217     this.setName("control socket listener");
218   }
219 
220 
221 
222   /**
223    * Binds to socket, starts looping listening for commands
224    */
225   public void run() {
226     try {
227       if (!isBound())
228         tryToBind();
229     } catch (IOException e) {
230       return;
231     }
232 
233     while (!closing) {
234       try {
235         Socket connection = s.accept();
236         if (log.isDebugEnabled()) {
237           log.debug("new connection from " + connection.getInetAddress());
238         }
239         ListenThread l = new ListenThread(connection);
240         l.setDaemon(true);
241         l.start();
242       } catch (IOException e) {
243         if (!closing)
244           log.warn("control socket error: ", e);
245         else {
246           log.warn("shutting down listen thread due to shutdown() call");
247           break;
248         }
249       }
250     }// end while
251   }
252 
253   /**
254    * Close the control socket, and exit. Triggers graceful thread shutdown.
255    */
256   public void shutdown() {
257     closing = true;
258     try {
259       if (s != null)
260         s.close();
261       s = null;
262     } catch (IOException e) {
263       log.debug(ExceptionUtil.getStackTrace(e));
264     } // ignore exception on close
265   }
266 
267   public boolean isBound() {
268     return s != null && s.isBound();
269   }
270 
271   public void tryToBind() throws IOException {
272     if(ALLOW_REMOTE)
273       s = new ServerSocket(portno);
274     else {  //FIXME: is there a way to allow all local addresses? (including IPv6 local)
275       s = new ServerSocket();
276       s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
277     }
278     s.setReuseAddress(true);
279     portno = s.getLocalPort();
280     if (s.isBound())
281       log.info("socket bound to " + s.getLocalPort());
282     else
283       log.info("socket isn't bound");
284   }
285 
286   public int getPort() {
287     if (!s.isBound()) {
288       return -1;
289     } else {
290       return portno;
291     }
292   }
293   
294   //FIXME: we also do this in ChunkImpl; should really do it only once
295   //and make it visible everywhere?
296   private static String localHostAddr;
297   static {
298     try {
299       localHostAddr = InetAddress.getLocalHost().getHostName();
300     } catch (UnknownHostException e) {
301       localHostAddr = "localhost";
302     }
303   }
304   
305   public String getStatusLine() {
306     int adaptorCount = agent.adaptorCount();
307     
308     return localHostAddr + ": Chukwa Agent running, version " + VERSION + ", with " + adaptorCount + " adaptors";
309   }
310 }