This project has retired. For details please refer to its Attic page.
ChukwaAgent xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.BufferedWriter;
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.FilenameFilter;
29  import java.io.IOException;
30  import java.io.InputStreamReader;
31  import java.io.OutputStreamWriter;
32  import java.io.PrintWriter;
33  import java.security.NoSuchAlgorithmException;
34  import java.util.*;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.regex.Matcher;
37  import java.util.regex.Pattern;
38  
39  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
40  import org.apache.hadoop.chukwa.datacollection.DataFactory;
41  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
42  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
43  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
44  import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
45  import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
46  import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
47  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
48  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
49  import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
50  import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
51  import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
52  import org.apache.hadoop.chukwa.util.ChukwaUtil;
53  import org.apache.hadoop.chukwa.util.DaemonWatcher;
54  import org.apache.hadoop.chukwa.util.ExceptionUtil;
55  import org.apache.hadoop.conf.Configuration;
56  import org.apache.hadoop.fs.Path;
57  import org.apache.log4j.Logger;
58  import org.mortbay.jetty.Server;
59  import org.mortbay.jetty.servlet.Context;
60  import org.mortbay.jetty.servlet.ServletHolder;
61  import org.mortbay.jetty.nio.SelectChannelConnector;
62  import org.mortbay.thread.BoundedThreadPool;
63  
64  import com.sun.jersey.spi.container.servlet.ServletContainer;
65  
66  import edu.berkeley.confspell.*;
67  
68  /**
69   * The local agent daemon that runs on each machine. This class is designed to
70   * be embeddable, for use in testing.
71   * <P>
72   * The agent will start an HTTP REST interface listening on port. Configs for
73   * the agent are:
74   * <ul>
75   * <li><code>chukwaAgent.http.port</code> Port to listen on (default=9090).</li>
76   * <li><code>chukwaAgent.http.rest.controller.packages</code> Java packages to
77   * inspect for JAX-RS annotated classes to be added as servlets to the REST
78   * server.</li>
79   * </ul>
80   * 
81   */
82  public class ChukwaAgent implements AdaptorManager {
83    // boolean WRITE_CHECKPOINTS = true;
84    static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
85  
86    private static Logger log = Logger.getLogger(ChukwaAgent.class);
87    private static final int HTTP_SERVER_THREADS = 120;
88    private static Server jettyServer = null;
89    private OffsetStatsManager adaptorStatsManager = null;
90    private Timer statsCollector = null;
91    private static volatile Configuration conf = null;
92    private static volatile ChukwaAgent agent = null;
93    public Connector connector = null;
94    
95    protected ChukwaAgent() throws AlreadyRunningException {
96      this(new ChukwaConfiguration());
97    }
98  
99    public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
100     ChukwaAgent.agent = this;
101     this.conf = conf;
102     
103     // almost always just reading this; so use a ConcurrentHM.
104     // since we wrapped the offset, it's not a structural mod.
105     adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
106     adaptorsByName = new HashMap<String, Adaptor>();
107     checkpointNumber = 0;
108 
109     boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
110         "chukwaAgent.checkpoint.enabled", true);
111     CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
112         "chukwa_checkpoint_");
113     final int CHECKPOINT_INTERVAL_MS = conf.getInt(
114         "chukwaAgent.checkpoint.interval", 5000);
115     final int STATS_INTERVAL_MS = conf.getInt(
116         "chukwaAgent.stats.collection.interval", 10000);
117     final int STATS_DATA_TTL_MS = conf.getInt(
118         "chukwaAgent.stats.data.ttl", 1200000);
119 
120     if (conf.get("chukwaAgent.checkpoint.dir") != null)
121       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
122     else
123       DO_CHECKPOINT_RESTORE = false;
124 
125     if (checkpointDir != null && !checkpointDir.exists()) {
126       checkpointDir.mkdirs();
127     }
128     tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
129     DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
130 
131     log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
132     log.info("Config - checkpointDir: [" + checkpointDir + "]");
133     log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
134         + "]");
135     log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
136     log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
137     log.info("Config - tags: [" + tags + "]");
138 
139     if (DO_CHECKPOINT_RESTORE) {
140       log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
141     }
142 
143     File initialAdaptors = null;
144     if (conf.get("chukwaAgent.initial_adaptors") != null)
145       initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
146 
147     try {
148       if (DO_CHECKPOINT_RESTORE) {
149         restoreFromCheckpoint();
150       }
151     } catch (IOException e) {
152       log.warn("failed to restart from checkpoint: ", e);
153     }
154 
155     try {
156       if (initialAdaptors != null && initialAdaptors.exists())
157         readAdaptorsFile(initialAdaptors); 
158     } catch (IOException e) {
159       log.warn("couldn't read user-specified file "
160           + initialAdaptors.getAbsolutePath());
161     }
162 
163     controlSock = new AgentControlSocketListener(this);
164     try {
165       controlSock.tryToBind(); // do this synchronously; if it fails, we know
166       // another agent is running.
167       controlSock.start(); // this sets us up as a daemon
168       log.info("control socket started on port " + controlSock.portno);
169 
170       // start the HTTP server with stats collection
171       try {
172         this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
173         this.statsCollector = new Timer("ChukwaAgent Stats Collector");
174 
175         startHttpServer(conf);
176 
177         statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
178                 STATS_INTERVAL_MS, STATS_INTERVAL_MS);
179       } catch (Exception e) {
180         log.error("Couldn't start HTTP server", e);
181         throw new RuntimeException(e);
182       }
183 
184       // shouldn't start checkpointing until we're finishing launching
185       // adaptors on boot
186       if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
187         checkpointer = new Timer();
188         checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
189       }
190     } catch (IOException e) {
191       log.info("failed to bind to socket; aborting agent launch", e);
192       throw new AlreadyRunningException();
193     }
194 
195   }
196 
197   public static ChukwaAgent getAgent() {
198     if(agent == null) {
199       try {
200         agent = new ChukwaAgent();
201       } catch(AlreadyRunningException e) {
202         log.error("Chukwa Agent is already running", e);
203         agent = null;
204       }
205     } 
206     return agent;
207   }
208 
209   // doesn't need an equals(), comparator, etc
210   public static class Offset {
211     public Offset(long l, String id) {
212       offset = l;
213       this.id = id;
214     }
215 
216     final String id;
217     volatile long offset;
218     public long offset() {
219       return this.offset;
220     }
221     
222     public String adaptorID() {
223       return id;
224     }
225   }
226 
227   public static class AlreadyRunningException extends Exception {
228 
229     private static final long serialVersionUID = 1L;
230 
231     public AlreadyRunningException() {
232       super("Agent already running; aborting");
233     }
234   }
235 
236   private final Map<Adaptor, Offset> adaptorPositions;
237 
238   // basically only used by the control socket thread.
239   //must be locked before access
240   private final Map<String, Adaptor> adaptorsByName;
241 
242   private File checkpointDir; // lock this object to indicate checkpoint in
243   // progress
244   private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
245   // checkpoints
246   private static String tags = "";
247 
248   private Timer checkpointer;
249   private volatile boolean needNewCheckpoint = false; // set to true if any
250   // event has happened
251   // that should cause a new checkpoint to be written
252   private int checkpointNumber; // id number of next checkpoint.
253   // should be protected by grabbing lock on checkpointDir
254 
255   private final AgentControlSocketListener controlSock;
256 
257   public int getControllerPort() {
258     return controlSock.getPort();
259   }
260 
261   public OffsetStatsManager getAdaptorStatsManager() {
262     return adaptorStatsManager;
263   }
264 
265   /**
266    * @param args
267    * @throws AdaptorException
268    */
269   public static void main(String[] args) throws AdaptorException {
270 
271     DaemonWatcher.createInstance("Agent");
272 
273     try {
274       if (args.length > 0 && args[0].equals("-help")) {
275         System.out.println("usage:  LocalAgent [-noCheckPoint]"
276             + "[default collector URL]");
277         System.exit(0);
278       }
279 
280       conf = ChukwaUtil.readConfiguration();
281       agent = new ChukwaAgent(conf);
282       
283       if (agent.anotherAgentIsRunning()) {
284         System.out
285             .println("another agent is running (or port has been usurped). "
286                 + "Bailing out now");
287         throw new AlreadyRunningException();
288       }
289 
290       int uriArgNumber = 0;
291       if (args.length > 0) {
292         if (args[uriArgNumber].equals("local")) {
293           agent.connector = new ConsoleOutConnector(agent);
294         } else {
295           if (!args[uriArgNumber].contains("://")) {
296             args[uriArgNumber] = "http://" + args[uriArgNumber];
297           }
298           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
299         }
300       } else {
301         String connectorType = conf.get("chukwa.agent.connector", 
302             "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
303         agent.connector = (Connector) Class.forName(connectorType).newInstance();
304       }
305       agent.connector.start();
306 
307       log.info("local agent started on port " + agent.getControlSock().portno);
308       //System.out.close();
309       //System.err.close();
310     } catch (AlreadyRunningException e) {
311       log.error("agent started already on this machine with same portno;"
312           + " bailing out");
313       System.out
314           .println("agent started already on this machine with same portno;"
315               + " bailing out");
316       DaemonWatcher.bailout(-1);
317       System.exit(0); // better safe than sorry
318     } catch (Exception e) {
319       e.printStackTrace();
320     }
321   }
322 
323   private boolean anotherAgentIsRunning() {
324     return !controlSock.isBound();
325   }
326 
327   /**
328    * @return the number of running adaptors inside this local agent
329    */
330   @Override
331   public int adaptorCount() {
332     synchronized(adaptorsByName) {
333       return adaptorsByName.size();
334     }
335   }
336 
337   private void startHttpServer(Configuration conf) throws Exception {
338     int portNum = conf.getInt("chukwaAgent.http.port", 9090);
339     String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
340     StringBuilder jaxRsPackages = new StringBuilder(
341             "org.apache.hadoop.chukwa.datacollection.agent.rest");
342 
343     // Allow the ability to add additional servlets to the server
344     if (jaxRsAddlPackages != null)
345       jaxRsPackages.append(';').append(jaxRsAddlPackages);
346 
347     // Set up jetty connector
348     SelectChannelConnector jettyConnector = new SelectChannelConnector();
349     jettyConnector.setLowResourcesConnections(HTTP_SERVER_THREADS - 10);
350     jettyConnector.setLowResourceMaxIdleTime(1500);
351     jettyConnector.setPort(portNum);
352     jettyConnector.setReuseAddress(true);
353 
354     // Set up jetty server, using connector
355     jettyServer = new Server(portNum);
356     jettyServer.setConnectors(new org.mortbay.jetty.Connector[] { jettyConnector });
357     BoundedThreadPool pool = new BoundedThreadPool();
358     pool.setMaxThreads(HTTP_SERVER_THREADS);
359     jettyServer.setThreadPool(pool);
360 
361     // Create the controller servlets
362     ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
363     servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
364             "com.sun.jersey.api.core.PackagesResourceConfig");
365     servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
366             jaxRsPackages.toString());
367 
368     // Create the server context and add the servlet
369     Context root = new Context(jettyServer, "/rest/v2", Context.SESSIONS);
370     root.setAttribute("ChukwaAgent", this);
371     root.addServlet(servletHolder, "/*");
372     root.setAllowNullPathInfo(false);
373 
374     // And finally, fire up the server
375     jettyServer.start();
376     jettyServer.setStopAtShutdown(true);
377 
378     log.info("started Chukwa http agent interface on port " + portNum);
379   }
380 
381   /**
382    * Take snapshots of offset data so we can report flow rate stats.
383    */
384   private class StatsCollectorTask extends TimerTask {
385 
386     public void run() {
387       long now = System.currentTimeMillis();
388 
389       for(String adaptorId : getAdaptorList().keySet()) {
390         Adaptor adaptor = getAdaptor(adaptorId);
391         if(adaptor == null) continue;
392 
393         Offset offset = adaptorPositions.get(adaptor);
394         if(offset == null) continue;
395 
396         adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
397       }
398     }
399   }
400 
401   // words should contain (space delimited):
402   // 0) command ("add")
403   // 1) Optional adaptor name, followed by =
404   // 2) AdaptorClassname
405   // 3) dataType (e.g. "hadoop_log")
406   // 4) params <optional>
407   // (e.g. for files, this is filename,
408   // but can be arbitrarily many space
409   // delimited agent specific params )
410   // 5) offset
411   private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+" // command "add",
412                                                              // any case, plus
413                                                              // at least one
414                                                              // space
415       + "(?:"   //noncapturing group
416       +	"([^\\s=]+)" //containing a string (captured) 
417       + "\\s*=\\s*" //then an equals sign, potentially set off with whitespace
418       + ")?" //end optional noncapturing group 
419       + "([^\\s=]+)\\s+" // the adaptor classname, plus at least one space. No '=' in name
420       + "(\\S+)\\s+" // datatype, plus at least one space
421       + "(?:" // start a non-capturing group, for the parameters
422       + "(.*?)\\s+" // capture the actual parameters reluctantly, followed by
423                     // whitespace
424       + ")?" // end non-matching group for params; group is optional
425       + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
426 
427   /**
428    * Most of the Chukwa wire protocol is implemented in @link{AgentControlSocketListener}
429    * 
430    * Unlike the rest of the chukwa wire protocol, add commands can appear in
431    * initial_adaptors and checkpoint files. So it makes sense to handle them here.
432    * 
433    */
434   public String processAddCommand(String cmd) {
435     try {
436       return processAddCommandE(cmd);
437     } catch(AdaptorException e) {
438       return null;
439     }
440   }
441   
442 
443   public String processAddCommandE(String cmd) throws AdaptorException {
444     Matcher m = addCmdPattern.matcher(cmd);
445     if (m.matches()) {
446       long offset; // check for obvious errors first
447       try {
448         offset = Long.parseLong(m.group(5));
449       } catch (NumberFormatException e) {
450         log.warn("malformed line " + cmd);
451         throw new AdaptorException("bad input syntax");
452       }
453 
454       String adaptorID = m.group(1);
455       String adaptorClassName = m.group(2);
456       String dataType = m.group(3);
457       String params = m.group(4);
458       if (params == null)
459         params = "";
460       
461       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
462       if (adaptor == null) {
463         log.warn("Error creating adaptor of class " + adaptorClassName);
464         throw new AdaptorException("Can't load class " + adaptorClassName);
465       }
466       String coreParams = adaptor.parseArgs(dataType,params,this);
467       if(coreParams == null) {
468         log.warn("invalid params for adaptor: " + params);       
469         throw new AdaptorException("invalid params for adaptor: " + params);
470       }
471       
472       if(adaptorID == null) { //user didn't specify, so synthesize
473         try {
474          adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
475         } catch(NoSuchAlgorithmException e) {
476           log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
477           shutdown(true);
478         }
479       } else if(!adaptorID.startsWith("adaptor_"))
480         adaptorID = "adaptor_"+adaptorID;
481       
482       synchronized (adaptorsByName) {
483         
484         if(adaptorsByName.containsKey(adaptorID))
485           return adaptorID;
486         adaptorsByName.put(adaptorID, adaptor);
487         adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
488         needNewCheckpoint = true;
489         try {
490           adaptor.start(adaptorID, dataType, offset, DataFactory
491               .getInstance().getEventQueue());
492           log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
493           ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
494           ChukwaAgent.agentMetrics.addedAdaptor.inc();
495           return adaptorID;
496 
497         } catch (Exception e) {
498           Adaptor failed = adaptorsByName.remove(adaptorID);
499           adaptorPositions.remove(failed);
500           adaptorStatsManager.remove(failed);
501           log.warn("failed to start adaptor", e);
502           if(e instanceof AdaptorException)
503             throw (AdaptorException)e;
504         }
505       }
506     } else if (cmd.length() > 0)
507       log.warn("only 'add' command supported in config files; cmd was: " + cmd);
508     // no warning for blank line
509 
510     return null;
511   }
512 
513 
514 
515   /**
516    * Tries to restore from a checkpoint file in checkpointDir. There should
517    * usually only be one checkpoint present -- two checkpoints present implies a
518    * crash during writing the higher-numbered one. As a result, this method
519    * chooses the lowest-numbered file present.
520    * 
521    * Lines in the checkpoint file are processed one at a time with
522    * processCommand();
523    * 
524    * @return true if the restore succeeded
525    * @throws IOException
526    */
527   private boolean restoreFromCheckpoint() throws IOException {
528     synchronized (checkpointDir) {
529       String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
530         public boolean accept(File dir, String name) {
531           return name.startsWith(CHECKPOINT_BASE_NAME);
532         }
533       });
534 
535       if (checkpointNames == null) {
536         log.error("Unable to list files in checkpoint dir");
537         return false;
538       }
539       if (checkpointNames.length == 0) {
540         log.info("No checkpoints found in " + checkpointDir);
541         return false;
542       }
543 
544       if (checkpointNames.length > 2)
545         log.warn("expected at most two checkpoint files in " + checkpointDir
546             + "; saw " + checkpointNames.length);
547       else if (checkpointNames.length == 0)
548         return false;
549 
550       String lowestName = null;
551       int lowestIndex = Integer.MAX_VALUE;
552       for (String n : checkpointNames) {
553         int index = Integer
554             .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
555         if (index < lowestIndex) {
556           lowestName = n;
557           lowestIndex = index;
558         }
559       }
560 
561       checkpointNumber = lowestIndex + 1;
562       File checkpoint = new File(checkpointDir, lowestName);
563       readAdaptorsFile(checkpoint);
564     }
565     return true;
566   }
567 
568   private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
569       IOException {
570     log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
571     BufferedReader br = new BufferedReader(new InputStreamReader(
572         new FileInputStream(checkpoint)));
573     String cmd = null;
574     while ((cmd = br.readLine()) != null)
575       processAddCommand(cmd);
576     br.close();
577   }
578 
579   /**
580    * Called periodically to write checkpoints
581    * 
582    * @throws IOException
583    */
584   private void writeCheckpoint() throws IOException {
585     needNewCheckpoint = false;
586     synchronized (checkpointDir) {
587       log.info("writing checkpoint " + checkpointNumber);
588 
589       FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
590           CHECKPOINT_BASE_NAME + checkpointNumber));
591       PrintWriter out = new PrintWriter(new BufferedWriter(
592           new OutputStreamWriter(fos)));
593 
594       for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
595         out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
596       }
597 
598       out.close();
599       File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
600           + (checkpointNumber - 1));
601       log.debug("hopefully removing old checkpoint file "
602           + lastCheckpoint.getAbsolutePath());
603       lastCheckpoint.delete();
604       checkpointNumber++;
605     }
606   }
607 
608   public String reportCommit(Adaptor src, long uuid) {
609     needNewCheckpoint = true;
610     Offset o = adaptorPositions.get(src);
611     if (o != null) {
612       synchronized (o) { // order writes to offset, in case commits are
613                          // processed out of order
614         if (uuid > o.offset)
615           o.offset = uuid;
616       }
617       log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
618       if(src instanceof NotifyOnCommitAdaptor) {
619         ((NotifyOnCommitAdaptor) src).committed(uuid);
620       }
621       return o.id;
622     } else {
623       log.warn("got commit up to " + uuid + "  for adaptor " + src
624           + " that doesn't appear to be running: " + adaptorCount()
625           + " total");
626       return null;
627     }
628   }
629 
630   private class CheckpointTask extends TimerTask {
631     public void run() {
632       try {
633         if (needNewCheckpoint) {
634           writeCheckpoint();
635         }
636       } catch (IOException e) {
637         log.warn("failed to write checkpoint", e);
638       }
639     }
640   }
641 
642   
643   private String formatAdaptorStatus(Adaptor a) {
644     return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + 
645    " " + adaptorPositions.get(a).offset;
646   }
647   
648 /**
649  * Expose the adaptor list.  Keys are adaptor ID numbers, values are the 
650  * adaptor status strings.
651  * @return
652  */
653   public Map<String, String> getAdaptorList() {
654     Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
655     synchronized (adaptorsByName) {
656       for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
657         adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
658       }
659     }
660     return adaptors;
661   }
662   
663 
664   public long stopAdaptor(String name, boolean gracefully) {
665     if (gracefully) 
666       return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
667     else
668       return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
669   }
670 
671   /**
672    * Stop the adaptor with given ID number. Takes a parameter to indicate
673    * whether the adaptor should force out all remaining data, or just exit
674    * abruptly.
675    * 
676    * If the adaptor is written correctly, its offset won't change after
677    * returning from shutdown.
678    * 
679    * @param name the adaptor to stop
680    * @param shutdownMode if true, shutdown, if false, hardStop
681    * @return the number of bytes synched at stop. -1 on error
682    */
683   public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
684     Adaptor toStop;
685     long offset = -1;
686 
687     // at most one thread can get past this critical section with toStop != null
688     // so if multiple callers try to stop the same adaptor, all but one will
689     // fail
690     synchronized (adaptorsByName) {
691       toStop = adaptorsByName.remove(name);
692     }
693     if (toStop == null) {
694       log.warn("trying to stop " + name + " that isn't running");
695       return offset;
696     } else {
697       adaptorPositions.remove(toStop);
698       adaptorStatsManager.remove(toStop);
699     }
700     ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
701     ChukwaAgent.agentMetrics.removedAdaptor.inc();
702     
703     try {
704       offset = toStop.shutdown(shutdownMode);
705       log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
706           + toStop.getCurrentStatus());
707     } catch (AdaptorException e) {
708       log.error("adaptor failed to stop cleanly", e);
709     } finally {
710       needNewCheckpoint = true;
711     }
712     return offset;
713   }
714 
715   @Override
716   public Configuration getConfiguration() {
717     return conf;
718   }
719   
720   public static Configuration getStaticConfiguration() {
721     return conf;
722   }
723   
724   @Override
725   public Adaptor getAdaptor(String name) {
726     synchronized(adaptorsByName) {
727       return adaptorsByName.get(name);
728     }
729   }
730 
731   public Offset offset(Adaptor a) {
732     Offset o = adaptorPositions.get(a);
733     return o;
734   }
735   
736   public Connector getConnector() {
737     return connector;
738   }
739 
740   public void shutdown() {
741     shutdown(false);
742   }
743 
744   /**
745    * Triggers agent shutdown. For now, this method doesn't shut down adaptors
746    * explicitly. It probably should.
747    */
748   public void shutdown(boolean exit) {
749     controlSock.shutdown(); // make sure we don't get new requests
750 
751     if (statsCollector != null) {
752       statsCollector.cancel();
753     }
754 
755     try {
756       jettyServer.stop();
757     } catch (Exception e) {
758       log.error("Couldn't stop jetty server.", e);
759     }
760 
761     // adaptors
762 
763     synchronized (adaptorsByName) {
764       // shut down each adaptor
765       for (Adaptor a : adaptorsByName.values()) {
766         try {
767           a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
768         } catch (AdaptorException e) {
769           log.warn("failed to cleanly stop " + a, e);
770         }
771       }
772     }
773     if (checkpointer != null) {
774       checkpointer.cancel();
775       try {
776         if (needNewCheckpoint)
777           writeCheckpoint(); // write a last checkpoint here, before stopping
778       } catch (IOException e) {
779         log.debug(ExceptionUtil.getStackTrace(e));
780       }
781     }
782     adaptorsByName.clear();
783     adaptorPositions.clear();
784     adaptorStatsManager.clear();
785     if (exit)
786       System.exit(0);
787   }
788 
789   /**
790    * Returns the control socket for this agent.
791    */
792   private AgentControlSocketListener getControlSock() {
793     return controlSock;
794   }
795 
796   public String getAdaptorName(Adaptor initiator) {
797     Offset o = adaptorPositions.get(initiator);
798     if(o != null)
799       return o.id;
800     else return null;
801   }
802 }