This project has retired. For details please refer to its Attic page.
ChukwaAgentController xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.controller;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStreamWriter;
27  import java.io.PrintWriter;
28  import java.net.Socket;
29  import java.net.SocketException;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Timer;
37  import java.util.TimerTask;
38  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39  import org.apache.log4j.Logger;
40  
41  /**
42   * A convenience library for applications to communicate to the
43   * {@link ChukwaAgent}. Can be used to register and unregister new
44   * {@link Adaptor}s. Also contains functions for applications to use for
45   * handling log rations.
46   */
47  public class ChukwaAgentController {
48    static Logger log = Logger.getLogger(ChukwaAgentController.class);
49     
50    public class AddAdaptorTask extends TimerTask {
51  
52      String adaptorName;
53      String type;
54      String params;
55      private long offset;
56      long numRetries;
57      long retryInterval;
58  
59      AddAdaptorTask(String adaptorName, String type, String params, long offset,
60                     long numRetries, long retryInterval) {
61        this.adaptorName = adaptorName;
62        this.type = type;
63        this.params = params;
64        this.offset = offset;
65        this.numRetries = numRetries;
66        this.retryInterval = retryInterval;
67      }
68  
69      @Override
70      public void run() {
71        try {
72          log.info("Trying to resend the add command [" + adaptorName + "]["
73              + offset + "][" + params + "] [" + numRetries + "]");
74          addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
75        } catch (Exception e) {
76          log.warn("Exception in AddAdaptorTask.run", e);
77          e.printStackTrace();
78        }
79      }
80    }
81  
82    // our default adaptors, provided here for convenience
83    public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
84    public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
85  
86    static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
87    static int DEFAULT_PORT = 9093;
88    static String DEFAULT_HOST = "localhost";
89    static int numArgs = 0;
90  
91    class Adaptor {
92      public String id;
93      final public String className;
94      final public String params;
95      final public String appType;
96      public long offset;
97  
98      Adaptor(String className, String appType, String params, long offset) {
99        this.className = className;
100       this.appType = appType;
101       this.params = params;
102       this.offset = offset;
103     }
104 
105     Adaptor(String id, String className, String appType, String params,
106             long offset) {
107       this.id = id;
108       this.className = className;
109       this.appType = appType;
110       this.params = params;
111       this.offset = offset;
112     }
113 
114     /**
115      * Registers this {@link Adaptor} with the agent running at the specified
116      * hostname and portno
117      * 
118      * @return The id of the this {@link Adaptor}, assigned by the agent
119      *         upon successful registration
120      * @throws IOException
121      */
122     String register() throws IOException {
123       Socket s = new Socket(hostname, portno);
124       try {
125         s.setSoTimeout(60000);
126       } catch (SocketException e) {
127         log.warn("Error while settin soTimeout to 60000");
128         e.printStackTrace();
129       }
130       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
131           .getOutputStream()));
132       if(id != null)
133         bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
134       else
135         bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
136       bw.flush();
137       BufferedReader br = new BufferedReader(new InputStreamReader(s
138           .getInputStream()));
139       String resp = br.readLine();
140       if (resp != null) {
141         String[] fields = resp.split(" ");
142         if (fields[0].equals("OK")) {
143             id = fields[fields.length - 1];
144         }
145       }
146       s.close();
147       return id;
148     }
149 
150     void unregister() throws IOException {
151       Socket s = new Socket(hostname, portno);
152       try {
153         s.setSoTimeout(60000);
154       } catch (SocketException e) {
155         log.warn("Error while settin soTimeout to 60000");
156         e.printStackTrace();
157       }
158       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
159           .getOutputStream()));
160       bw.println("SHUTDOWN " + id);
161       bw.flush();
162 
163       BufferedReader br = new BufferedReader(new InputStreamReader(s
164           .getInputStream()));
165       String resp = br.readLine();
166       if (resp == null || !resp.startsWith("OK")) {
167         log.error("adaptor unregister error, id: " + id);
168       } else if (resp.startsWith("OK")) {
169         String[] respSplit = resp.split(" ");
170         String newOffset = respSplit[respSplit.length - 1];
171         try {
172           offset = Long.parseLong(newOffset);
173         } catch (NumberFormatException nfe) {
174           log.error("adaptor didn't shutdown gracefully.\n" + nfe);
175         }
176       }
177 
178       s.close();
179     }
180 
181     public String toString() {
182       String[] namePieces = className.split("\\.");
183       String shortName = namePieces[namePieces.length - 1];
184       return id + " " + shortName + " " + appType + " " + params + " " + offset;
185     }
186   }
187 
188   Map<String, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<String, Adaptor>();
189   Map<String, ChukwaAgentController.Adaptor> runningInstanceAdaptors = new HashMap<String, Adaptor>();
190   Map<String, ChukwaAgentController.Adaptor> pausedAdaptors;
191   String hostname;
192   int portno;
193 
194   public ChukwaAgentController() {
195     portno = DEFAULT_PORT;
196     hostname = DEFAULT_HOST;
197     pausedAdaptors = new HashMap<String, Adaptor>();
198 
199     syncWithAgent();
200   }
201 
202   public ChukwaAgentController(String hostname, int portno) {
203     this.hostname = hostname;
204     this.portno = portno;
205     pausedAdaptors = new HashMap<String, Adaptor>();
206 
207     syncWithAgent();
208   }
209 
210   private boolean syncWithAgent() {
211     // set up adaptors by using list here
212     try {
213       runningAdaptors = list();
214       return true;
215     } catch (IOException e) {
216       System.err.println("Error initializing ChukwaClient with list of "
217               + "currently registered adaptors, clearing our local list of adaptors");
218       // e.printStackTrace();
219       // if we can't connect to the LocalAgent, reset/clear our local view of
220       // the Adaptors.
221       runningAdaptors = new HashMap<String, ChukwaAgentController.Adaptor>();
222       return false;
223     }
224   }
225 
226   /**
227    * Registers a new adaptor. Makes no guarantee about success. On failure, we
228    * print a message to stderr and ignore silently so that an application
229    * doesn't crash if it's attempt to register an adaptor fails. This call does
230    * not retry a conection. for that use the overloaded version of this which
231    * accepts a time interval and number of retries
232    * 
233    * @return the id number of the adaptor, generated by the agent
234    */
235   public String add(String adaptorName, String type, String params, long offset) {
236     return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);// retry for
237                                                                  // five
238                                                                  // minutes,
239                                                                  // every
240                                                                  // fifteen
241                                                                  // seconds
242   }
243 
244   /**
245    * Registers a new adaptor. Makes no guarantee about success. On failure, to
246    * connect to server, will retry <code>numRetries</code> times, every
247    * <code>retryInterval</code> milliseconds.
248    * 
249    * @return the id number of the adaptor, generated by the agent
250    */
251   public String addByName(String adaptorID, String adaptorName, String type, String params, long offset,
252       long numRetries, long retryInterval) {
253     ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
254         adaptorName, type, params, offset);
255     adaptor.id = adaptorID;
256     if (numRetries >= 0) {
257       try {
258         adaptorID = adaptor.register();
259 
260         if (adaptorID != null) {
261           runningAdaptors.put(adaptorID, adaptor);
262           runningInstanceAdaptors.put(adaptorID, adaptor);
263         } else {
264           System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
265         }
266       } catch (IOException ioe) {
267         log.warn("AgentClient failed to contact the agent ("
268             + hostname + ":" + portno + ")");
269         
270         log.warn("Scheduling a agent connection retry for adaptor add() in another "
271                 + retryInterval
272                 + " milliseconds, "
273                 + numRetries
274                 + " retries remaining");
275 
276         Timer addFileTimer = new Timer();
277         addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
278             offset, numRetries - 1, retryInterval), retryInterval);
279       }
280     } else {
281       System.err.println("Giving up on connecting to the local agent");
282     }
283     return adaptorID;
284   }
285 
286   public synchronized ChukwaAgentController.Adaptor remove(String adaptorID)
287       throws IOException {
288     syncWithAgent();
289     ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
290     if ( a != null ) {
291       a.unregister();
292     }
293     return a;
294 
295   }
296 
297   public void remove(String className, String appType, String filename)
298       throws IOException {
299     syncWithAgent();
300     // search for FileTail adaptor with string of this file name
301     // get its id, tell it to unregister itself with the agent,
302     // then remove it from the list of adaptors
303     for (Adaptor a : runningAdaptors.values()) {
304       if (a.className.equals(className) && a.params.equals(filename)
305           && a.appType.equals(appType)) {
306         remove(a.id);
307       }
308     }
309   }
310 
311   public void removeAll() {
312     syncWithAgent();
313     ArrayList<String> keyset = new ArrayList<String>();
314     keyset.addAll( runningAdaptors.keySet());
315 
316     for (String id : keyset) {
317       try {
318         remove(id);
319       } catch (IOException ioe) {
320         System.err.println("Error removing an adaptor in removeAll()");
321         ioe.printStackTrace();
322       }
323       log.info("Successfully removed adaptor " + id);
324     }
325   }
326 
327   public void removeInstanceAdaptors() {
328     // Remove adaptors created by this instance of chukwa agent controller.
329     // Instead of removing using id, this is removed by using the stream name
330     // and record type.  This prevents the system to shutdown the wrong
331     // adaptor after agent crashes.
332     for (Adaptor a : runningInstanceAdaptors.values()) {
333       try {
334         remove(a.className, a.appType, a.params);
335       } catch (IOException ioe) {
336         log.warn("Error removing an adaptor in removeInstanceAdaptors()");
337         ioe.printStackTrace();
338       }
339     }
340   }
341 
342   Map<String, ChukwaAgentController.Adaptor> list() throws IOException {
343     Socket s = new Socket(hostname, portno);
344     try {
345       s.setSoTimeout(60000);
346     } catch (SocketException e) {
347       log.warn("Error while settin soTimeout to 60000");
348       e.printStackTrace();
349     }
350     PrintWriter bw = new PrintWriter(
351         new OutputStreamWriter(s.getOutputStream()));
352 
353     bw.println("LIST");
354     bw.flush();
355     BufferedReader br = new BufferedReader(new InputStreamReader(s
356         .getInputStream()));
357     String ln;
358     Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
359     while ((ln = br.readLine()) != null) {
360       if (ln.equals("")) {
361         break;
362       } else {
363         String[] parts = ln.split("\\s+");
364         if (parts.length >= 4) { // should have id, className appType, params,
365                                  // offset
366           String id = parts[0].substring(0, parts[0].length() - 1); // chop
367                                                                         // off
368                                                                         // the
369                                                                         // right
370                                                                         // -
371                                                                         // paren
372           long offset = Long.parseLong(parts[parts.length - 1]);
373           String tmpParams = parts[3];
374           for (int i = 4; i < parts.length - 1; i++) {
375             tmpParams += " " + parts[i];
376           }
377           listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
378               offset));
379         }
380       }
381     }
382     s.close();
383     return listResult;
384   }
385 
386   // ************************************************************************
387   // The following functions are convenience functions, defining an easy
388   // to use API for application developers to integrate chukwa into their app
389   // ************************************************************************
390 
391   /**
392    * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0.
393    * Checks to see if the file is being watched already, if so, won't register
394    * another adaptor with the agent. If you have run the tail adaptor on this
395    * file before and rotated or emptied the file you should use
396    * {@link ChukwaAgentController#pauseFile(String, String)} and
397    * {@link ChukwaAgentController#resumeFile(String, String)} which will store
398    * the adaptors metadata and re-use them to pick up where it left off.
399    * 
400    * @param type the datatype associated with the file to pass through
401    * @param filename of the file for the tail adaptor to start monitoring
402    * @return the id number of the adaptor, generated by the agent
403    */
404   public String addFile(String appType, String filename, long numRetries,
405       long retryInterval) {
406     filename = new File(filename).getAbsolutePath();
407     // TODO: Mabye we want to check to see if the file exists here?
408     // Probably not because they might be talking to an agent on a different
409     // machine?
410 
411     // check to see if this file is being watched already, if yes don't set up
412     // another adaptor for it
413     boolean isDuplicate = false;
414     for (Adaptor a : runningAdaptors.values()) {
415       if (a.className.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
416           && a.params.endsWith(filename)) {
417         isDuplicate = true;
418       }
419     }
420     if (!isDuplicate) {
421       return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
422           numRetries, retryInterval);
423     } else {
424       log.info("An adaptor for filename \"" + filename
425           + "\", type \"" + appType
426           + "\", exists already, addFile() command aborted");
427       return null;
428     }
429   }
430 
431   public String addFile(String appType, String filename) {
432     return addFile(appType, filename, 0, 0);
433   }
434 
435   /**
436    * Pause all active adaptors of the default file tailing type who are tailing
437    * this file This means we actually stop the adaptor and it goes away forever,
438    * but we store it state so that we can re-launch a new adaptor with the same
439    * state later.
440    * 
441    * @param appType
442    * @param filename
443    * @return array of adaptorID numbers which have been created and assigned the
444    *         state of the formerly paused adaptors
445    * @throws IOException
446    */
447   public Collection<String> pauseFile(String appType, String filename)
448       throws IOException {
449     syncWithAgent();
450     // store the unique streamid of the file we are pausing.
451     // search the list of adaptors for this filename
452     // store the current offset for it
453     List<String> results = new ArrayList<String>();
454     for (Adaptor a : runningAdaptors.values()) {
455       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
456           && a.appType.equals(appType)) {
457         pausedAdaptors.put(a.id, a); // add it to our list of paused adaptors
458         remove(a.id); // tell the agent to remove/unregister it
459         results.add(a.id);
460       }
461     }
462     return results;
463   }
464 
465   public boolean isFilePaused(String appType, String filename) {
466     for (Adaptor a : pausedAdaptors.values()) {
467       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
468           && a.appType.equals(appType)) {
469         return true;
470       }
471     }
472     return false;
473   }
474 
475   /**
476    * Resume all adaptors for this filename that have been paused
477    * 
478    * @param appType the appType
479    * @param filename filename by which to lookup adaptors which are paused (and
480    *        tailing this file)
481    * @return an array of the new adaptor ID numbers which have resumed where the
482    *         old adaptors left off
483    * @throws IOException
484    */
485   public Collection<String> resumeFile(String appType, String filename)
486       throws IOException {
487     syncWithAgent();
488     // search for a record of this paused file
489     List<String> results = new ArrayList<String>();
490     for (Adaptor a : pausedAdaptors.values()) {
491       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
492           && a.appType.equals(appType)) {
493         String newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
494             + filename, a.offset);
495         pausedAdaptors.remove(a.id);
496         a.id = newID;
497         results.add(a.id);
498       }
499     }
500     return results;
501   }
502 
503   public void removeFile(String appType, String filename) throws IOException {
504     syncWithAgent();
505     // search for FileTail adaptor with string of this file name
506     // get its id, tell it to unregister itself with the agent,
507     // then remove it from the list of adaptors
508     for (Adaptor a : runningAdaptors.values()) {
509       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
510           && a.appType.equals(appType)) {
511         remove(a.id);
512       }
513     }
514   }
515 
516   // ************************************************************************
517   // command line utilities
518   // ************************************************************************
519 
520   public static void main(String[] args) {
521     ChukwaAgentController c = getClient(args);
522     if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
523       doAddFile(c, args[1], args[2]);
524     } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
525       doRemoveFile(c, args[1], args[2]);
526     } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
527       doList(c);
528     } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
529       doRemoveAll(c);
530     } else {
531       System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
532       System.err.println("       ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
533       System.err.println("       ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
534       System.err.println("       ChukwaClient list [IP] [port]");
535       System.err.println("       ChukwaClient removeAll [IP] [port]");
536     }
537   }
538 
539   private static ChukwaAgentController getClient(String[] args) {
540     int portno = 9093;
541     String hostname = "localhost";
542 
543     numArgs = args.length;
544 
545     for (int i = 0; i < args.length; i++) {
546       if (args[i].equals("-h") && args.length > i + 1) {
547         hostname = args[i + 1];
548         log.debug("Setting hostname to: " + hostname);
549         numArgs -= 2; // subtract for the flag and value
550       } else if (args[i].equals("-p") && args.length > i + 1) {
551         portno = Integer.parseInt(args[i + 1]);
552         log.debug("Setting portno to: " + portno);
553         numArgs -= 2; // subtract for the flat, i.e. -p, and value
554       }
555     }
556     return new ChukwaAgentController(hostname, portno);
557   }
558 
559   private static String doAddFile(ChukwaAgentController c, String appType,
560       String params) {
561     log.info("Adding adaptor with filename: " + params);
562     String adaptorID = c.addFile(appType, params);
563     if (adaptorID != null) {
564       log.info("Successfully added adaptor, id is:" + adaptorID);
565     } else {
566       System.err.println("Agent reported failure to add adaptor, adaptor id returned was:"
567               + adaptorID);
568     }
569     return adaptorID;
570   }
571 
572   private static void doRemoveFile(ChukwaAgentController c, String appType,
573       String params) {
574     try {
575       log.debug("Removing adaptor with filename: " + params);
576       c.removeFile(appType, params);
577     } catch (IOException e) {
578       e.printStackTrace();
579     }
580   }
581 
582   private static void doList(ChukwaAgentController c) {
583     try {
584       Iterator<Adaptor> adptrs = c.list().values().iterator();
585       while (adptrs.hasNext()) {
586         log.debug(adptrs.next().toString());
587       }
588     } catch (Exception e) {
589       e.printStackTrace();
590     }
591   }
592 
593   private static void doRemoveAll(ChukwaAgentController c) {
594     log.info("Removing all adaptors");
595     c.removeAll();
596   }
597 }