This project has retired. For details please refer to its Attic page.
ChukwaAgent xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.BufferedWriter;
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.FilenameFilter;
29  import java.io.IOException;
30  import java.io.InputStreamReader;
31  import java.io.OutputStreamWriter;
32  import java.io.PrintWriter;
33  import java.nio.charset.Charset;
34  import java.security.NoSuchAlgorithmException;
35  import java.util.*;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.regex.Matcher;
38  import java.util.regex.Pattern;
39  
40  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
41  import org.apache.hadoop.chukwa.datacollection.DataFactory;
42  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
43  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
44  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
45  import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
46  import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
47  import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
48  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
49  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
50  import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
51  import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
52  import org.apache.hadoop.chukwa.util.ChukwaUtil;
53  import org.apache.hadoop.chukwa.util.ExceptionUtil;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.log4j.Logger;
56  
57  /**
58   * The local agent daemon that runs on each machine. This class is designed to
59   * be embeddable, for use in testing.
60   * <P>
61   * The agent will start an HTTP REST interface listening on port. Configs for
62   * the agent are:
63   * <ul>
64   * <li><code>chukwaAgent.http.port</code> Port to listen on (default=9090).</li>
65   * <li><code>chukwaAgent.http.rest.controller.packages</code> Java packages to
66   * inspect for JAX-RS annotated classes to be added as servlets to the REST
67   * server.</li>
68   * </ul>
69   * 
70   */
71  public class ChukwaAgent implements AdaptorManager {
72    // boolean WRITE_CHECKPOINTS = true;
73    static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
74  
75    private final static Logger log = Logger.getLogger(ChukwaAgent.class);  
76    private OffsetStatsManager<Adaptor> adaptorStatsManager = null;
77    private Timer statsCollector = null;
78    private static Configuration conf = null;
79    private volatile static ChukwaAgent agent = null;
80    public Connector connector = null;
81    private boolean stopped = false;
82    
83    private ChukwaAgent() {
84      agent = new ChukwaAgent(new ChukwaConfiguration());
85    }
86  
87    private ChukwaAgent(Configuration conf) {
88      agent = this;
89      ChukwaAgent.conf = conf;
90      // almost always just reading this; so use a ConcurrentHM.
91      // since we wrapped the offset, it's not a structural mod.
92      adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
93      adaptorsByName = new HashMap<String, Adaptor>();
94      checkpointNumber = 0;
95      stopped = false;
96    }
97  
98    public static ChukwaAgent getAgent() {
99      if(agent == null || agent.isStopped()) {
100         agent = new ChukwaAgent();
101     } 
102     return agent;
103   }
104 
105   public static ChukwaAgent getAgent(Configuration conf) {
106     if(agent == null || agent.isStopped()) {
107       agent = new ChukwaAgent(conf);
108     }
109     return agent;
110   }
111 
112   public void start() throws AlreadyRunningException {
113     boolean checkPointRestore = conf.getBoolean(
114         "chukwaAgent.checkpoint.enabled", true);
115     checkPointBaseName = conf.get("chukwaAgent.checkpoint.name",
116         "chukwa_checkpoint_");
117     final int checkPointIntervalMs = conf.getInt(
118         "chukwaAgent.checkpoint.interval", 5000);
119     final int statsIntervalMs = conf.getInt(
120         "chukwaAgent.stats.collection.interval", 10000);
121     int statsDataTTLMs = conf.getInt(
122         "chukwaAgent.stats.data.ttl", 1200000);
123 
124     if (conf.get("chukwaAgent.checkpoint.dir") != null)
125       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
126     else
127       checkPointRestore = false;
128 
129     if (checkpointDir != null && !checkpointDir.exists()) {
130       boolean result = checkpointDir.mkdirs();
131       if(!result) {
132         log.error("Failed to create check point directory.");
133       }
134     }
135     String tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
136     DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
137 
138     log.info("Config - CHECKPOINT_BASE_NAME: [" + checkPointBaseName + "]");
139     log.info("Config - checkpointDir: [" + checkpointDir + "]");
140     log.info("Config - CHECKPOINT_INTERVAL_MS: [" + checkPointIntervalMs
141         + "]");
142     log.info("Config - DO_CHECKPOINT_RESTORE: [" + checkPointRestore + "]");
143     log.info("Config - STATS_INTERVAL_MS: [" + statsIntervalMs + "]");
144     log.info("Config - tags: [" + tags + "]");
145 
146     if (checkPointRestore) {
147       log.info("checkpoints are enabled, period is " + checkPointIntervalMs);
148     }
149 
150     File initialAdaptors = null;
151     if (conf.get("chukwaAgent.initial_adaptors") != null)
152       initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
153 
154     try {
155       if (checkPointRestore) {
156         restoreFromCheckpoint();
157       }
158     } catch (IOException e) {
159       log.warn("failed to restart from checkpoint: ", e);
160     }
161 
162     try {
163       if (initialAdaptors != null && initialAdaptors.exists())
164         readAdaptorsFile(initialAdaptors); 
165     } catch (IOException e) {
166       log.warn("couldn't read user-specified file "
167           + initialAdaptors.getAbsolutePath());
168     }
169 
170     controlSock = new AgentControlSocketListener(this);
171     try {
172       controlSock.tryToBind(); // do this synchronously; if it fails, we know
173       // another agent is running.
174       controlSock.start(); // this sets us up as a daemon
175       log.info("control socket started on port " + controlSock.portno);
176     } catch (IOException e) {
177       log.info("failed to bind to socket; aborting agent launch", e);
178       throw new AlreadyRunningException();
179     }
180 
181     // start the HTTP server with stats collection
182     try {
183       adaptorStatsManager = new OffsetStatsManager<Adaptor>(statsDataTTLMs);
184       statsCollector = new Timer("ChukwaAgent Stats Collector");
185 
186       startHttpServer(conf);
187 
188       statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
189           statsIntervalMs, statsIntervalMs);
190     } catch (Exception e) {
191       log.error("Couldn't start HTTP server", e);
192       throw new RuntimeException(e);
193     }
194 
195     // shouldn't start check pointing until we're finishing launching
196     // adaptors on boot
197     if (checkPointIntervalMs > 0 && checkpointDir != null) {
198       checkpointer = new Timer();
199       checkpointer.schedule(new CheckpointTask(), 0, checkPointIntervalMs);
200     }
201   }
202 
203   // doesn't need an equals(), comparator, etc
204   public static class Offset {
205     public Offset(long l, String id) {
206       offset = l;
207       this.id = id;
208     }
209 
210     final String id;
211     volatile long offset;
212     public long offset() {
213       return this.offset;
214     }
215     
216     public String adaptorID() {
217       return id;
218     }
219   }
220 
221   public static class AlreadyRunningException extends Exception {
222 
223     private static final long serialVersionUID = 1L;
224 
225     public AlreadyRunningException() {
226       super("Agent already running; aborting");
227     }
228   }
229 
230   private static Map<Adaptor, Offset> adaptorPositions;
231 
232   // basically only used by the control socket thread.
233   //must be locked before access
234   private static Map<String, Adaptor> adaptorsByName;
235 
236   private File checkpointDir; // lock this object to indicate checkpoint in
237   // progress
238   private String checkPointBaseName; // base filename for checkpoint files
239   // checkpoints
240 
241   private Timer checkpointer;
242   private volatile boolean needNewCheckpoint = false; // set to true if any
243   // event has happened
244   // that should cause a new checkpoint to be written
245   private int checkpointNumber; // id number of next checkpoint.
246   // should be protected by grabbing lock on checkpointDir
247 
248   private AgentControlSocketListener controlSock;
249 
250   public int getControllerPort() {
251     return controlSock.getPort();
252   }
253 
254   public OffsetStatsManager<Adaptor> getAdaptorStatsManager() {
255     return adaptorStatsManager;
256   }
257 
258   /**
259    * @param args is command line arguements
260    * @throws AdaptorException if error registering adaptors
261    */
262   public static void main(String[] args) throws AdaptorException {
263 
264     try {
265       if (args.length > 0 && args[0].equals("-help")) {
266         System.out.println("usage:  LocalAgent [-noCheckPoint]"
267             + "[default collector URL]");
268         return;
269       }
270 
271       Configuration conf = ChukwaUtil.readConfiguration();
272       agent = ChukwaAgent.getAgent(conf);
273       if (agent.anotherAgentIsRunning()) {
274         log.error("another agent is running (or port has been usurped). "
275                 + "Bailing out now");
276         throw new AlreadyRunningException();
277       }
278 
279       int uriArgNumber = 0;
280       if (args.length > 0) {
281         if (args[uriArgNumber].equals("local")) {
282           agent.connector = new ConsoleOutConnector(agent);
283         } else {
284           if (!args[uriArgNumber].contains("://")) {
285             args[uriArgNumber] = "http://" + args[uriArgNumber];
286           }
287           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
288         }
289       } else {
290         String connectorType = conf.get("chukwa.agent.connector", 
291             "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
292         agent.connector = (Connector) Class.forName(connectorType).newInstance();
293       }
294       agent.start();
295       agent.connector.start();
296 
297       log.info("local agent started on port " + agent.getControlSock().portno);
298       System.out.close();
299       System.err.close();
300     } catch (AlreadyRunningException e) {
301       log.error("agent started already on this machine with same portno;"
302           + " bailing out");
303       System.out
304           .println("agent started already on this machine with same portno;"
305               + " bailing out");
306       return;
307     } catch (Exception e) {
308       e.printStackTrace();
309     }
310   }
311 
312   private boolean anotherAgentIsRunning() {
313     boolean result = false;
314     if(controlSock!=null) {
315       result = !controlSock.isBound();
316     }
317     return result;
318   }
319 
320   /**
321    * @return the number of running adaptors inside this local agent
322    */
323   @Override
324   public int adaptorCount() {
325     synchronized(adaptorsByName) {
326       return adaptorsByName.size();
327     }
328   }
329 
330   private void startHttpServer(Configuration conf) throws Exception {
331     ChukwaRestServer.startInstance(conf);
332   }
333   
334   private void stopHttpServer() throws Exception {
335     ChukwaRestServer.stopInstance();
336   }
337   
338   /**
339    * Take snapshots of offset data so we can report flow rate stats.
340    */
341   private class StatsCollectorTask extends TimerTask {
342 
343     public void run() {
344       long now = System.currentTimeMillis();
345 
346       for(String adaptorId : getAdaptorList().keySet()) {
347         Adaptor adaptor = getAdaptor(adaptorId);
348         if(adaptor == null) continue;
349 
350         Offset offset = adaptorPositions.get(adaptor);
351         if(offset == null) continue;
352 
353         adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
354       }
355     }
356   }
357 
358   // words should contain (space delimited):
359   // 0) command ("add")
360   // 1) Optional adaptor name, followed by =
361   // 2) AdaptorClassname
362   // 3) dataType (e.g. "hadoop_log")
363   // 4) params <optional>
364   // (e.g. for files, this is filename,
365   // but can be arbitrarily many space
366   // delimited agent specific params )
367   // 5) offset
368   private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+" // command "add",
369                                                              // any case, plus
370                                                              // at least one
371                                                              // space
372       + "(?:"   //noncapturing group
373       +	"([^\\s=]+)" //containing a string (captured) 
374       + "\\s*=\\s*" //then an equals sign, potentially set off with whitespace
375       + ")?" //end optional noncapturing group 
376       + "([^\\s=]+)\\s+" // the adaptor classname, plus at least one space. No '=' in name
377       + "(\\S+)\\s+" // datatype, plus at least one space
378       + "(?:" // start a non-capturing group, for the parameters
379       + "(.*?)\\s+" // capture the actual parameters reluctantly, followed by
380                     // whitespace
381       + ")?" // end non-matching group for params; group is optional
382       + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
383 
384   /**
385    * Most of the Chukwa wire protocol is implemented in @link{AgentControlSocketListener}
386    * 
387    * Unlike the rest of the chukwa wire protocol, add commands can appear in
388    * initial_adaptors and checkpoint files. So it makes sense to handle them here.
389    * 
390    */
391   public String processAddCommand(String cmd) {
392     try {
393       return processAddCommandE(cmd);
394     } catch(AdaptorException e) {
395       return null;
396     }
397   }
398   
399 
400   public String processAddCommandE(String cmd) throws AdaptorException {
401     Matcher m = addCmdPattern.matcher(cmd);
402     if (m.matches()) {
403       long offset; // check for obvious errors first
404       try {
405         offset = Long.parseLong(m.group(5));
406       } catch (NumberFormatException e) {
407         log.warn("malformed line " + cmd);
408         throw new AdaptorException("bad input syntax");
409       }
410 
411       String adaptorID = m.group(1);
412       String adaptorClassName = m.group(2);
413       String dataType = m.group(3);
414       String params = m.group(4);
415       if (params == null)
416         params = "";
417       
418       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
419       if (adaptor == null) {
420         log.warn("Error creating adaptor of class " + adaptorClassName);
421         throw new AdaptorException("Can't load class " + adaptorClassName);
422       }
423       String coreParams = adaptor.parseArgs(dataType,params,this);
424       if(coreParams == null) {
425         log.warn("invalid params for adaptor: " + params);       
426         throw new AdaptorException("invalid params for adaptor: " + params);
427       }
428       
429       if(adaptorID == null) { //user didn't specify, so synthesize
430         try {
431          adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
432         } catch(NoSuchAlgorithmException e) {
433           log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
434           shutdown(true);
435         }
436       } else if(!adaptorID.startsWith("adaptor_"))
437         adaptorID = "adaptor_"+adaptorID;
438       
439       synchronized (adaptorsByName) {
440         
441         if(adaptorsByName.containsKey(adaptorID))
442           return adaptorID;
443         adaptorsByName.put(adaptorID, adaptor);
444         adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
445         needNewCheckpoint = true;
446         try {
447           adaptor.start(adaptorID, dataType, offset, DataFactory
448               .getInstance().getEventQueue());
449           log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
450           ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
451           ChukwaAgent.agentMetrics.addedAdaptor.inc();
452           return adaptorID;
453 
454         } catch (Exception e) {
455           Adaptor failed = adaptorsByName.remove(adaptorID);
456           adaptorPositions.remove(failed);
457           adaptorStatsManager.remove(failed);
458           log.warn("failed to start adaptor", e);
459           if(e instanceof AdaptorException)
460             throw (AdaptorException)e;
461         }
462       }
463     } else if (cmd.length() > 0)
464       log.warn("only 'add' command supported in config files; cmd was: " + cmd);
465     // no warning for blank line
466 
467     return null;
468   }
469 
470 
471 
472   /**
473    * Tries to restore from a checkpoint file in checkpointDir. There should
474    * usually only be one checkpoint present -- two checkpoints present implies a
475    * crash during writing the higher-numbered one. As a result, this method
476    * chooses the lowest-numbered file present.
477    * 
478    * Lines in the checkpoint file are processed one at a time with
479    * processCommand();
480    * 
481    * @return true if the restore succeeded
482    * @throws IOException
483    */
484   private boolean restoreFromCheckpoint() throws IOException {
485     synchronized (checkpointDir) {
486       String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
487         public boolean accept(File dir, String name) {
488           return name.startsWith(checkPointBaseName);
489         }
490       });
491 
492       if (checkpointNames == null) {
493         log.error("Unable to list files in checkpoint dir");
494         return false;
495       } else if (checkpointNames.length == 0) {
496         log.info("No checkpoints found in " + checkpointDir);
497         return false;
498       } else if (checkpointNames.length > 2) {
499         log.warn("expected at most two checkpoint files in " + checkpointDir
500             + "; saw " + checkpointNames.length);
501       }
502 
503       String lowestName = null;
504       int lowestIndex = Integer.MAX_VALUE;
505       for (String n : checkpointNames) {
506         int index = Integer
507             .parseInt(n.substring(checkPointBaseName.length()));
508         if (index < lowestIndex) {
509           lowestName = n;
510           lowestIndex = index;
511         }
512       }
513 
514       checkpointNumber = lowestIndex + 1;
515       File checkpoint = new File(checkpointDir, lowestName);
516       readAdaptorsFile(checkpoint);
517     }
518     return true;
519   }
520 
521   private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
522       IOException {
523     log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
524     BufferedReader br = new BufferedReader(new InputStreamReader(
525         new FileInputStream(checkpoint), Charset.forName("UTF-8")));
526     String cmd = null;
527     while ((cmd = br.readLine()) != null)
528       processAddCommand(cmd);
529     br.close();
530   }
531 
532   /**
533    * Called periodically to write checkpoints
534    * 
535    * @throws IOException
536    */
537   private void writeCheckpoint() throws IOException {
538     needNewCheckpoint = false;
539     synchronized (checkpointDir) {
540       log.info("writing checkpoint " + checkpointNumber);
541 
542       FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
543           checkPointBaseName + checkpointNumber));
544       PrintWriter out = new PrintWriter(new BufferedWriter(
545           new OutputStreamWriter(fos, Charset.forName("UTF-8"))));
546 
547       for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
548         out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
549       }
550 
551       out.close();
552       File lastCheckpoint = new File(checkpointDir, checkPointBaseName
553           + (checkpointNumber - 1));
554       log.debug("hopefully removing old checkpoint file "
555           + lastCheckpoint.getAbsolutePath());
556       boolean result = lastCheckpoint.delete();
557       if(!result) {
558         log.warn("Unable to delete lastCheckpoint file: "+lastCheckpoint.getAbsolutePath());
559       }
560       checkpointNumber++;
561     }
562   }
563 
564   public String reportCommit(Adaptor src, long uuid) {
565     needNewCheckpoint = true;
566     Offset o = adaptorPositions.get(src);
567     if (o != null) {
568       synchronized (o) { // order writes to offset, in case commits are
569                          // processed out of order
570         if (uuid > o.offset)
571           o.offset = uuid;
572       }
573       log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
574       if(src instanceof NotifyOnCommitAdaptor) {
575         ((NotifyOnCommitAdaptor) src).committed(uuid);
576       }
577       return o.id;
578     } else {
579       log.warn("got commit up to " + uuid + "  for adaptor " + src
580           + " that doesn't appear to be running: " + adaptorCount()
581           + " total");
582       return null;
583     }
584   }
585 
586   private class CheckpointTask extends TimerTask {
587     public void run() {
588       try {
589         if (needNewCheckpoint) {
590           writeCheckpoint();
591         }
592       } catch (IOException e) {
593         log.warn("failed to write checkpoint", e);
594       }
595     }
596   }
597 
598   
599   private String formatAdaptorStatus(Adaptor a) {
600     return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + 
601    " " + adaptorPositions.get(a).offset;
602   }
603   
604 /**
605  * Expose the adaptor list.  Keys are adaptor ID numbers, values are the 
606  * adaptor status strings.
607  * @return adaptor list
608  */
609   public Map<String, String> getAdaptorList() {
610     Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
611     synchronized (adaptorsByName) {
612       for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
613         adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
614       }
615     }
616     return adaptors;
617   }
618   
619 
620   public long stopAdaptor(String name, boolean gracefully) {
621     if (gracefully) 
622       return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
623     else
624       return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
625   }
626 
627   /**
628    * Stop the adaptor with given ID number. Takes a parameter to indicate
629    * whether the adaptor should force out all remaining data, or just exit
630    * abruptly.
631    * 
632    * If the adaptor is written correctly, its offset won't change after
633    * returning from shutdown.
634    * 
635    * @param name the adaptor to stop
636    * @param shutdownMode if true, shutdown, if false, hardStop
637    * @return the number of bytes synched at stop. -1 on error
638    */
639   public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
640     Adaptor toStop;
641     long offset = -1;
642 
643     // at most one thread can get past this critical section with toStop != null
644     // so if multiple callers try to stop the same adaptor, all but one will
645     // fail
646     synchronized (adaptorsByName) {
647       toStop = adaptorsByName.remove(name);
648     }
649     if (toStop == null) {
650       log.warn("trying to stop " + name + " that isn't running");
651       return offset;
652     } else {
653       adaptorPositions.remove(toStop);
654       adaptorStatsManager.remove(toStop);
655     }
656     ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
657     ChukwaAgent.agentMetrics.removedAdaptor.inc();
658     
659     try {
660       offset = toStop.shutdown(shutdownMode);
661       log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
662           + toStop.getCurrentStatus());
663     } catch (AdaptorException e) {
664       log.error("adaptor failed to stop cleanly", e);
665     } finally {
666       needNewCheckpoint = true;
667     }
668     return offset;
669   }
670 
671   @Override
672   public Configuration getConfiguration() {
673     return conf;
674   }
675   
676   public static Configuration getStaticConfiguration() {
677     return conf;
678   }
679   
680   @Override
681   public Adaptor getAdaptor(String name) {
682     synchronized(adaptorsByName) {
683       return adaptorsByName.get(name);
684     }
685   }
686 
687   public Offset offset(Adaptor a) {
688     Offset o = adaptorPositions.get(a);
689     return o;
690   }
691   
692   public Connector getConnector() {
693     return connector;
694   }
695 
696   public void shutdown() {
697     shutdown(false);
698   }
699 
700   /**
701    * Triggers agent shutdown. For now, this method doesn't shut down adaptors
702    * explicitly. It probably should.
703    * @param force sets flag to exit forcefully
704    */
705   public void shutdown(boolean force) {
706     controlSock.shutdown(); // make sure we don't get new requests
707 
708     if (statsCollector != null) {
709       statsCollector.cancel();
710     }
711 
712     try {
713       stopHttpServer();
714     } catch (Exception e) {
715       log.error("Couldn't stop jetty server.", e);
716     }
717 
718     // adaptors
719 
720     synchronized (adaptorsByName) {
721       // shut down each adaptor
722       for (Adaptor a : adaptorsByName.values()) {
723         try {
724           a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
725         } catch (AdaptorException e) {
726           log.warn("failed to cleanly stop " + a, e);
727         }
728       }
729     }
730     if (checkpointer != null) {
731       checkpointer.cancel();
732       try {
733         if (needNewCheckpoint)
734           writeCheckpoint(); // write a last checkpoint here, before stopping
735       } catch (IOException e) {
736         log.debug(ExceptionUtil.getStackTrace(e));
737       }
738     }
739     adaptorsByName.clear();
740     adaptorPositions.clear();
741     adaptorStatsManager.clear();
742     agent.stop();
743     if (force)
744       return;
745   }
746 
747   /**
748    * Set agent into stop state.
749    */
750   private void stop() {
751     stopped = true;
752   }
753 
754   /**
755    * Check if agent is in stop state.
756    * @return true if agent is in stop state.
757    */
758   private boolean isStopped() {
759     return stopped;
760   }
761 
762   /**
763    * Returns the control socket for this agent.
764    */
765   private AgentControlSocketListener getControlSock() {
766     return controlSock;
767   }
768 
769   public String getAdaptorName(Adaptor initiator) {
770     Offset o = adaptorPositions.get(initiator);
771     if(o != null)
772       return o.id;
773     else return null;
774   }
775 }