This project has retired. For details please refer to its Attic page.
ChukwaAgentController xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.controller;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStreamWriter;
27  import java.io.PrintWriter;
28  import java.net.Socket;
29  import java.net.SocketException;
30  import java.nio.charset.Charset;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.HashMap;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Timer;
38  import java.util.TimerTask;
39  
40  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
41  import org.apache.log4j.Logger;
42  
43  /**
44   * A convenience library for applications to communicate to the
45   * {@link ChukwaAgent}. Can be used to register and unregister new
46   * {@link Adaptor}s. Also contains functions for applications to use for
47   * handling log rations.
48   */
49  public class ChukwaAgentController {
50    static Logger log = Logger.getLogger(ChukwaAgentController.class);
51     
52    public class AddAdaptorTask extends TimerTask {
53  
54      String adaptorName;
55      String type;
56      String params;
57      private long offset;
58      long numRetries;
59      long retryInterval;
60  
61      AddAdaptorTask(String adaptorName, String type, String params, long offset,
62                     long numRetries, long retryInterval) {
63        this.adaptorName = adaptorName;
64        this.type = type;
65        this.params = params;
66        this.offset = offset;
67        this.numRetries = numRetries;
68        this.retryInterval = retryInterval;
69      }
70  
71      @Override
72      public void run() {
73        try {
74          log.info("Trying to resend the add command [" + adaptorName + "]["
75              + offset + "][" + params + "] [" + numRetries + "]");
76          addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
77        } catch (Exception e) {
78          log.warn("Exception in AddAdaptorTask.run", e);
79          e.printStackTrace();
80        }
81      }
82    }
83  
84    // our default adaptors, provided here for convenience
85    public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
86    public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
87  
88    static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
89    static int DEFAULT_PORT = 9093;
90    static String DEFAULT_HOST = "localhost";
91    static int numArgs = 0;
92  
93    class Adaptor {
94      public String id;
95      final public String className;
96      final public String params;
97      final public String appType;
98      public long offset;
99  
100     Adaptor(String className, String appType, String params, long offset) {
101       this.className = className;
102       this.appType = appType;
103       this.params = params;
104       this.offset = offset;
105     }
106 
107     Adaptor(String id, String className, String appType, String params,
108             long offset) {
109       this.id = id;
110       this.className = className;
111       this.appType = appType;
112       this.params = params;
113       this.offset = offset;
114     }
115 
116     /**
117      * Registers this {@link Adaptor} with the agent running at the specified
118      * hostname and portno
119      * 
120      * @return The id of the this {@link Adaptor}, assigned by the agent
121      *         upon successful registration
122      * @throws IOException if problem bind to agent controller port
123      */
124     String register() throws IOException {
125       Socket s = new Socket(hostname, portno);
126       try {
127         s.setSoTimeout(60000);
128       } catch (SocketException e) {
129         log.warn("Error while settin soTimeout to 60000");
130         e.printStackTrace();
131       }
132       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
133           .getOutputStream(), Charset.forName("UTF-8")));
134       if(id != null)
135         bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
136       else
137         bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
138       bw.flush();
139       BufferedReader br = new BufferedReader(new InputStreamReader(s
140           .getInputStream(), Charset.forName("UTF-8")));
141       String resp = br.readLine();
142       if (resp != null) {
143         String[] fields = resp.split(" ");
144         if (fields[0].equals("OK")) {
145             id = fields[fields.length - 1];
146         }
147       }
148       s.close();
149       return id;
150     }
151 
152     void unregister() throws IOException {
153       Socket s = new Socket(hostname, portno);
154       try {
155         s.setSoTimeout(60000);
156       } catch (SocketException e) {
157         log.warn("Error while settin soTimeout to 60000");
158       }
159       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
160           .getOutputStream(), Charset.forName("UTF-8")));
161       bw.println("SHUTDOWN " + id);
162       bw.flush();
163 
164       BufferedReader br = new BufferedReader(new InputStreamReader(s
165           .getInputStream(), Charset.forName("UTF-8")));
166       String resp = br.readLine();
167       if (resp == null || !resp.startsWith("OK")) {
168         log.error("adaptor unregister error, id: " + id);
169       } else if (resp.startsWith("OK")) {
170         String[] respSplit = resp.split(" ");
171         String newOffset = respSplit[respSplit.length - 1];
172         try {
173           offset = Long.parseLong(newOffset);
174         } catch (NumberFormatException nfe) {
175           log.error("adaptor didn't shutdown gracefully.\n" + nfe);
176         }
177       }
178 
179       s.close();
180     }
181 
182     public String toString() {
183       String[] namePieces = className.split("\\.");
184       String shortName = namePieces[namePieces.length - 1];
185       return id + " " + shortName + " " + appType + " " + params + " " + offset;
186     }
187   }
188 
189   Map<String, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<String, Adaptor>();
190   Map<String, ChukwaAgentController.Adaptor> runningInstanceAdaptors = new HashMap<String, Adaptor>();
191   Map<String, ChukwaAgentController.Adaptor> pausedAdaptors;
192   String hostname;
193   int portno;
194 
195   public ChukwaAgentController() {
196     portno = DEFAULT_PORT;
197     hostname = DEFAULT_HOST;
198     pausedAdaptors = new HashMap<String, Adaptor>();
199 
200     syncWithAgent();
201   }
202 
203   public ChukwaAgentController(String hostname, int portno) {
204     this.hostname = hostname;
205     this.portno = portno;
206     pausedAdaptors = new HashMap<String, Adaptor>();
207 
208     syncWithAgent();
209   }
210 
211   private boolean syncWithAgent() {
212     // set up adaptors by using list here
213     try {
214       runningAdaptors = list();
215       return true;
216     } catch (IOException e) {
217       System.err.println("Error initializing ChukwaClient with list of "
218               + "currently registered adaptors, clearing our local list of adaptors");
219       // e.printStackTrace();
220       // if we can't connect to the LocalAgent, reset/clear our local view of
221       // the Adaptors.
222       runningAdaptors = new HashMap<String, ChukwaAgentController.Adaptor>();
223       return false;
224     }
225   }
226 
227   /**
228    * Registers a new adaptor. Makes no guarantee about success. On failure, we
229    * print a message to stderr and ignore silently so that an application
230    * doesn't crash if it's attempt to register an adaptor fails. This call does
231    * not retry a conection. for that use the overloaded version of this which
232    * accepts a time interval and number of retries
233    * @param adaptorName is adaptor class name
234    * @param type is data type
235    * @param params is adaptor specific parameters
236    * @param offset is starting sequence id
237    * 
238    * @return the id number of the adaptor, generated by the agent
239    */
240   public String add(String adaptorName, String type, String params, long offset) {
241     return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);// retry for
242                                                                  // five
243                                                                  // minutes,
244                                                                  // every
245                                                                  // fifteen
246                                                                  // seconds
247   }
248 
249   /**
250    * Registers a new adaptor. Makes no guarantee about success. On failure, to
251    * connect to server, will retry <code>numRetries</code> times, every
252    * <code>retryInterval</code> milliseconds.
253    * @param adaptorID is unique adaptor identifier
254    * @param adaptorName is adaptor class name
255    * @param type is user defined data type name
256    * @param params is adaptor specific configuration
257    * @param offset is starting sequence id
258    * @param numRetries is number of retries
259    * @param retryInterval is time between retries
260    * 
261    * @return the id number of the adaptor, generated by the agent
262    */
263   public String addByName(String adaptorID, String adaptorName, String type, String params, long offset,
264       long numRetries, long retryInterval) {
265     ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
266         adaptorName, type, params, offset);
267     adaptor.id = adaptorID;
268     if (numRetries >= 0) {
269       try {
270         adaptorID = adaptor.register();
271 
272         if (adaptorID != null) {
273           runningAdaptors.put(adaptorID, adaptor);
274           runningInstanceAdaptors.put(adaptorID, adaptor);
275         } else {
276           System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
277         }
278       } catch (IOException ioe) {
279         log.warn("AgentClient failed to contact the agent ("
280             + hostname + ":" + portno + ")");
281         
282         log.warn("Scheduling a agent connection retry for adaptor add() in another "
283                 + retryInterval
284                 + " milliseconds, "
285                 + numRetries
286                 + " retries remaining");
287 
288         Timer addFileTimer = new Timer();
289         addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
290             offset, numRetries - 1, retryInterval), retryInterval);
291       }
292     } else {
293       System.err.println("Giving up on connecting to the local agent");
294     }
295     return adaptorID;
296   }
297 
298   public synchronized ChukwaAgentController.Adaptor remove(String adaptorID)
299       throws IOException {
300     syncWithAgent();
301     ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
302     if ( a != null ) {
303       a.unregister();
304     }
305     return a;
306 
307   }
308 
309   public void remove(String className, String appType, String filename)
310       throws IOException {
311     syncWithAgent();
312     // search for FileTail adaptor with string of this file name
313     // get its id, tell it to unregister itself with the agent,
314     // then remove it from the list of adaptors
315     for (Adaptor a : runningAdaptors.values()) {
316       if (a.className.equals(className) && a.params.equals(filename)
317           && a.appType.equals(appType)) {
318         remove(a.id);
319       }
320     }
321   }
322 
323   public void removeAll() {
324     syncWithAgent();
325     ArrayList<String> keyset = new ArrayList<String>();
326     keyset.addAll( runningAdaptors.keySet());
327 
328     for (String id : keyset) {
329       try {
330         remove(id);
331       } catch (IOException ioe) {
332         System.err.println("Error removing an adaptor in removeAll()");
333         ioe.printStackTrace();
334       }
335       log.info("Successfully removed adaptor " + id);
336     }
337   }
338 
339   public void removeInstanceAdaptors() {
340     // Remove adaptors created by this instance of chukwa agent controller.
341     // Instead of removing using id, this is removed by using the stream name
342     // and record type.  This prevents the system to shutdown the wrong
343     // adaptor after agent crashes.
344     for (Adaptor a : runningInstanceAdaptors.values()) {
345       try {
346         remove(a.className, a.appType, a.params);
347       } catch (IOException ioe) {
348         log.warn("Error removing an adaptor in removeInstanceAdaptors()");
349         ioe.printStackTrace();
350       }
351     }
352   }
353 
354   Map<String, ChukwaAgentController.Adaptor> list() throws IOException {
355     Socket s = new Socket(hostname, portno);
356     try {
357       s.setSoTimeout(60000);
358     } catch (SocketException e) {
359       log.warn("Error while settin soTimeout to 60000");
360       e.printStackTrace();
361     }
362     PrintWriter bw = new PrintWriter(
363         new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8")));
364 
365     bw.println("LIST");
366     bw.flush();
367     BufferedReader br = new BufferedReader(new InputStreamReader(s
368         .getInputStream(), Charset.forName("UTF-8")));
369     String ln;
370     Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
371     while ((ln = br.readLine()) != null) {
372       if (ln.equals("")) {
373         break;
374       } else {
375         String[] parts = ln.split("\\s+");
376         if (parts.length >= 4) { // should have id, className appType, params,
377                                  // offset
378           String id = parts[0].substring(0, parts[0].length() - 1); // chop
379                                                                         // off
380                                                                         // the
381                                                                         // right
382                                                                         // -
383                                                                         // paren
384           long offset = Long.parseLong(parts[parts.length - 1]);
385           StringBuilder tmpParams = new StringBuilder();
386           tmpParams.append(parts[3]);
387           for (int i = 4; i < parts.length - 1; i++) {
388             tmpParams.append(" ");
389             tmpParams.append(parts[i]);
390           }
391           listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(),
392               offset));
393         }
394       }
395     }
396     s.close();
397     return listResult;
398   }
399 
400   // ************************************************************************
401   // The following functions are convenience functions, defining an easy
402   // to use API for application developers to integrate chukwa into their app
403   // ************************************************************************
404 
405   /**
406    * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0.
407    * Checks to see if the file is being watched already, if so, won't register
408    * another adaptor with the agent. If you have run the tail adaptor on this
409    * file before and rotated or emptied the file you should use
410    * {@link ChukwaAgentController#pauseFile(String, String)} and
411    * {@link ChukwaAgentController#resumeFile(String, String)} which will store
412    * the adaptors metadata and re-use them to pick up where it left off.
413    * @param appType is user defined name for the data stream
414    * 
415    * @param filename of the file for the tail adaptor to start monitoring
416    * @param numRetries is number of retries
417    * @param retryInterval is time between retries
418    * @return the id number of the adaptor, generated by the agent
419    */
420   public String addFile(String appType, String filename, long numRetries,
421       long retryInterval) {
422     filename = new File(filename).getAbsolutePath();
423     // TODO: Mabye we want to check to see if the file exists here?
424     // Probably not because they might be talking to an agent on a different
425     // machine?
426 
427     // check to see if this file is being watched already, if yes don't set up
428     // another adaptor for it
429     boolean isDuplicate = false;
430     for (Adaptor a : runningAdaptors.values()) {
431       if (a.className.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
432           && a.params.endsWith(filename)) {
433         isDuplicate = true;
434       }
435     }
436     if (!isDuplicate) {
437       return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
438           numRetries, retryInterval);
439     } else {
440       log.info("An adaptor for filename \"" + filename
441           + "\", type \"" + appType
442           + "\", exists already, addFile() command aborted");
443       return null;
444     }
445   }
446 
447   public String addFile(String appType, String filename) {
448     return addFile(appType, filename, 0, 0);
449   }
450 
451   /**
452    * Pause all active adaptors of the default file tailing type who are tailing
453    * this file This means we actually stop the adaptor and it goes away forever,
454    * but we store it state so that we can re-launch a new adaptor with the same
455    * state later.
456    * 
457    * @param appType is application type
458    * @param filename is file name suffix pattern
459    * @return array of adaptorID numbers which have been created and assigned the
460    *         state of the formerly paused adaptors
461    * @throws IOException if error pausing adaptors
462    */
463   public Collection<String> pauseFile(String appType, String filename)
464       throws IOException {
465     syncWithAgent();
466     // store the unique streamid of the file we are pausing.
467     // search the list of adaptors for this filename
468     // store the current offset for it
469     List<String> results = new ArrayList<String>();
470     for (Adaptor a : runningAdaptors.values()) {
471       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
472           && a.appType.equals(appType)) {
473         pausedAdaptors.put(a.id, a); // add it to our list of paused adaptors
474         remove(a.id); // tell the agent to remove/unregister it
475         results.add(a.id);
476       }
477     }
478     return results;
479   }
480 
481   public boolean isFilePaused(String appType, String filename) {
482     for (Adaptor a : pausedAdaptors.values()) {
483       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
484           && a.appType.equals(appType)) {
485         return true;
486       }
487     }
488     return false;
489   }
490 
491   /**
492    * Resume all adaptors for this filename that have been paused
493    * 
494    * @param appType is application type
495    * @param filename filename by which to lookup adaptors which are paused (and
496    *        tailing this file)
497    * @return an array of the new adaptor ID numbers which have resumed where the
498    *         old adaptors left off
499    * @throws IOException if unable to resume all adaptors
500    */
501   public Collection<String> resumeFile(String appType, String filename)
502       throws IOException {
503     syncWithAgent();
504     // search for a record of this paused file
505     List<String> results = new ArrayList<String>();
506     for (Adaptor a : pausedAdaptors.values()) {
507       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
508           && a.appType.equals(appType)) {
509         String newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
510             + filename, a.offset);
511         pausedAdaptors.remove(a.id);
512         a.id = newID;
513         results.add(a.id);
514       }
515     }
516     return results;
517   }
518 
519   public void removeFile(String appType, String filename) throws IOException {
520     syncWithAgent();
521     // search for FileTail adaptor with string of this file name
522     // get its id, tell it to unregister itself with the agent,
523     // then remove it from the list of adaptors
524     for (Adaptor a : runningAdaptors.values()) {
525       if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
526           && a.appType.equals(appType)) {
527         remove(a.id);
528       }
529     }
530   }
531 
532   // ************************************************************************
533   // command line utilities
534   // ************************************************************************
535 
536   public static void main(String[] args) {
537     ChukwaAgentController c = getClient(args);
538     if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
539       doAddFile(c, args[1], args[2]);
540     } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
541       doRemoveFile(c, args[1], args[2]);
542     } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
543       doList(c);
544     } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
545       doRemoveAll(c);
546     } else {
547       System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
548       System.err.println("       ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
549       System.err.println("       ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
550       System.err.println("       ChukwaClient list [IP] [port]");
551       System.err.println("       ChukwaClient removeAll [IP] [port]");
552     }
553   }
554 
555   private static ChukwaAgentController getClient(String[] args) {
556     int portno = 9093;
557     String hostname = "localhost";
558 
559     numArgs = args.length;
560 
561     for (int i = 0; i < args.length; i++) {
562       if (args[i].equals("-h") && args.length > i + 1) {
563         hostname = args[i + 1];
564         log.debug("Setting hostname to: " + hostname);
565         numArgs -= 2; // subtract for the flag and value
566       } else if (args[i].equals("-p") && args.length > i + 1) {
567         portno = Integer.parseInt(args[i + 1]);
568         log.debug("Setting portno to: " + portno);
569         numArgs -= 2; // subtract for the flat, i.e. -p, and value
570       }
571     }
572     return new ChukwaAgentController(hostname, portno);
573   }
574 
575   private static String doAddFile(ChukwaAgentController c, String appType,
576       String params) {
577     log.info("Adding adaptor with filename: " + params);
578     String adaptorID = c.addFile(appType, params);
579     if (adaptorID != null) {
580       log.info("Successfully added adaptor, id is:" + adaptorID);
581     } else {
582       log.error("Agent reported failure to add adaptor.");
583     }
584     return adaptorID;
585   }
586 
587   private static void doRemoveFile(ChukwaAgentController c, String appType,
588       String params) {
589     try {
590       log.debug("Removing adaptor with filename: " + params);
591       c.removeFile(appType, params);
592     } catch (IOException e) {
593       e.printStackTrace();
594     }
595   }
596 
597   private static void doList(ChukwaAgentController c) {
598     try {
599       Iterator<Adaptor> adptrs = c.list().values().iterator();
600       while (adptrs.hasNext()) {
601         log.debug(adptrs.next().toString());
602       }
603     } catch (Exception e) {
604       e.printStackTrace();
605     }
606   }
607 
608   private static void doRemoveAll(ChukwaAgentController c) {
609     log.info("Removing all adaptors");
610     c.removeAll();
611   }
612 }