This project has retired. For details please refer to its
Attic page.
ChukwaAgent xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedReader;
23 import java.io.BufferedWriter;
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.FilenameFilter;
29 import java.io.IOException;
30 import java.io.InputStreamReader;
31 import java.io.OutputStreamWriter;
32 import java.io.PrintWriter;
33 import java.security.NoSuchAlgorithmException;
34 import java.util.*;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.regex.Matcher;
37 import java.util.regex.Pattern;
38
39 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
40 import org.apache.hadoop.chukwa.datacollection.DataFactory;
41 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
42 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
43 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
44 import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
45 import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
46 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
47 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
48 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
49 import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
50 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
51 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
52 import org.apache.hadoop.chukwa.util.ChukwaUtil;
53 import org.apache.hadoop.chukwa.util.DaemonWatcher;
54 import org.apache.hadoop.chukwa.util.ExceptionUtil;
55 import org.apache.hadoop.conf.Configuration;
56 import org.apache.hadoop.fs.Path;
57 import org.apache.log4j.Logger;
58 import org.mortbay.jetty.Server;
59 import org.mortbay.jetty.servlet.Context;
60 import org.mortbay.jetty.servlet.ServletHolder;
61 import org.mortbay.jetty.nio.SelectChannelConnector;
62 import org.mortbay.thread.BoundedThreadPool;
63
64 import com.sun.jersey.spi.container.servlet.ServletContainer;
65
66 import edu.berkeley.confspell.*;
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public class ChukwaAgent implements AdaptorManager {
83
84 static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
85
86 private static Logger log = Logger.getLogger(ChukwaAgent.class);
87 private static final int HTTP_SERVER_THREADS = 120;
88 private static Server jettyServer = null;
89 private OffsetStatsManager adaptorStatsManager = null;
90 private Timer statsCollector = null;
91 private static volatile Configuration conf = null;
92 private static volatile ChukwaAgent agent = null;
93 public Connector connector = null;
94
95 protected ChukwaAgent() throws AlreadyRunningException {
96 this(new ChukwaConfiguration());
97 }
98
99 public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
100 ChukwaAgent.agent = this;
101 this.conf = conf;
102
103
104
105 adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
106 adaptorsByName = new HashMap<String, Adaptor>();
107 checkpointNumber = 0;
108
109 boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
110 "chukwaAgent.checkpoint.enabled", true);
111 CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
112 "chukwa_checkpoint_");
113 final int CHECKPOINT_INTERVAL_MS = conf.getInt(
114 "chukwaAgent.checkpoint.interval", 5000);
115 final int STATS_INTERVAL_MS = conf.getInt(
116 "chukwaAgent.stats.collection.interval", 10000);
117 final int STATS_DATA_TTL_MS = conf.getInt(
118 "chukwaAgent.stats.data.ttl", 1200000);
119
120 if (conf.get("chukwaAgent.checkpoint.dir") != null)
121 checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
122 else
123 DO_CHECKPOINT_RESTORE = false;
124
125 if (checkpointDir != null && !checkpointDir.exists()) {
126 checkpointDir.mkdirs();
127 }
128 tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
129 DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
130
131 log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
132 log.info("Config - checkpointDir: [" + checkpointDir + "]");
133 log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
134 + "]");
135 log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
136 log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
137 log.info("Config - tags: [" + tags + "]");
138
139 if (DO_CHECKPOINT_RESTORE) {
140 log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
141 }
142
143 File initialAdaptors = null;
144 if (conf.get("chukwaAgent.initial_adaptors") != null)
145 initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
146
147 try {
148 if (DO_CHECKPOINT_RESTORE) {
149 restoreFromCheckpoint();
150 }
151 } catch (IOException e) {
152 log.warn("failed to restart from checkpoint: ", e);
153 }
154
155 try {
156 if (initialAdaptors != null && initialAdaptors.exists())
157 readAdaptorsFile(initialAdaptors);
158 } catch (IOException e) {
159 log.warn("couldn't read user-specified file "
160 + initialAdaptors.getAbsolutePath());
161 }
162
163 controlSock = new AgentControlSocketListener(this);
164 try {
165 controlSock.tryToBind();
166
167 controlSock.start();
168 log.info("control socket started on port " + controlSock.portno);
169
170
171 try {
172 this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
173 this.statsCollector = new Timer("ChukwaAgent Stats Collector");
174
175 startHttpServer(conf);
176
177 statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
178 STATS_INTERVAL_MS, STATS_INTERVAL_MS);
179 } catch (Exception e) {
180 log.error("Couldn't start HTTP server", e);
181 throw new RuntimeException(e);
182 }
183
184
185
186 if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
187 checkpointer = new Timer();
188 checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
189 }
190 } catch (IOException e) {
191 log.info("failed to bind to socket; aborting agent launch", e);
192 throw new AlreadyRunningException();
193 }
194
195 }
196
197 public static ChukwaAgent getAgent() {
198 if(agent == null) {
199 try {
200 agent = new ChukwaAgent();
201 } catch(AlreadyRunningException e) {
202 log.error("Chukwa Agent is already running", e);
203 agent = null;
204 }
205 }
206 return agent;
207 }
208
209
210 public static class Offset {
211 public Offset(long l, String id) {
212 offset = l;
213 this.id = id;
214 }
215
216 final String id;
217 volatile long offset;
218 public long offset() {
219 return this.offset;
220 }
221
222 public String adaptorID() {
223 return id;
224 }
225 }
226
227 public static class AlreadyRunningException extends Exception {
228
229 private static final long serialVersionUID = 1L;
230
231 public AlreadyRunningException() {
232 super("Agent already running; aborting");
233 }
234 }
235
236 private final Map<Adaptor, Offset> adaptorPositions;
237
238
239
240 private final Map<String, Adaptor> adaptorsByName;
241
242 private File checkpointDir;
243
244 private String CHECKPOINT_BASE_NAME;
245
246 private static String tags = "";
247
248 private Timer checkpointer;
249 private volatile boolean needNewCheckpoint = false;
250
251
252 private int checkpointNumber;
253
254
255 private final AgentControlSocketListener controlSock;
256
257 public int getControllerPort() {
258 return controlSock.getPort();
259 }
260
261 public OffsetStatsManager getAdaptorStatsManager() {
262 return adaptorStatsManager;
263 }
264
265
266
267
268
269 public static void main(String[] args) throws AdaptorException {
270
271 DaemonWatcher.createInstance("Agent");
272
273 try {
274 if (args.length > 0 && args[0].equals("-help")) {
275 System.out.println("usage: LocalAgent [-noCheckPoint]"
276 + "[default collector URL]");
277 System.exit(0);
278 }
279
280 conf = ChukwaUtil.readConfiguration();
281 agent = new ChukwaAgent(conf);
282
283 if (agent.anotherAgentIsRunning()) {
284 System.out
285 .println("another agent is running (or port has been usurped). "
286 + "Bailing out now");
287 throw new AlreadyRunningException();
288 }
289
290 int uriArgNumber = 0;
291 if (args.length > 0) {
292 if (args[uriArgNumber].equals("local")) {
293 agent.connector = new ConsoleOutConnector(agent);
294 } else {
295 if (!args[uriArgNumber].contains("://")) {
296 args[uriArgNumber] = "http://" + args[uriArgNumber];
297 }
298 agent.connector = new HttpConnector(agent, args[uriArgNumber]);
299 }
300 } else {
301 String connectorType = conf.get("chukwa.agent.connector",
302 "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
303 agent.connector = (Connector) Class.forName(connectorType).newInstance();
304 }
305 agent.connector.start();
306
307 log.info("local agent started on port " + agent.getControlSock().portno);
308
309
310 } catch (AlreadyRunningException e) {
311 log.error("agent started already on this machine with same portno;"
312 + " bailing out");
313 System.out
314 .println("agent started already on this machine with same portno;"
315 + " bailing out");
316 DaemonWatcher.bailout(-1);
317 System.exit(0);
318 } catch (Exception e) {
319 e.printStackTrace();
320 }
321 }
322
323 private boolean anotherAgentIsRunning() {
324 return !controlSock.isBound();
325 }
326
327
328
329
330 @Override
331 public int adaptorCount() {
332 synchronized(adaptorsByName) {
333 return adaptorsByName.size();
334 }
335 }
336
337 private void startHttpServer(Configuration conf) throws Exception {
338 int portNum = conf.getInt("chukwaAgent.http.port", 9090);
339 String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
340 StringBuilder jaxRsPackages = new StringBuilder(
341 "org.apache.hadoop.chukwa.datacollection.agent.rest");
342
343
344 if (jaxRsAddlPackages != null)
345 jaxRsPackages.append(';').append(jaxRsAddlPackages);
346
347
348 SelectChannelConnector jettyConnector = new SelectChannelConnector();
349 jettyConnector.setLowResourcesConnections(HTTP_SERVER_THREADS - 10);
350 jettyConnector.setLowResourceMaxIdleTime(1500);
351 jettyConnector.setPort(portNum);
352 jettyConnector.setReuseAddress(true);
353
354
355 jettyServer = new Server(portNum);
356 jettyServer.setConnectors(new org.mortbay.jetty.Connector[] { jettyConnector });
357 BoundedThreadPool pool = new BoundedThreadPool();
358 pool.setMaxThreads(HTTP_SERVER_THREADS);
359 jettyServer.setThreadPool(pool);
360
361
362 ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
363 servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
364 "com.sun.jersey.api.core.PackagesResourceConfig");
365 servletHolder.setInitParameter("com.sun.jersey.config.property.packages",
366 jaxRsPackages.toString());
367
368
369 Context root = new Context(jettyServer, "/rest/v2", Context.SESSIONS);
370 root.setAttribute("ChukwaAgent", this);
371 root.addServlet(servletHolder, "/*");
372 root.setAllowNullPathInfo(false);
373
374
375 jettyServer.start();
376 jettyServer.setStopAtShutdown(true);
377
378 log.info("started Chukwa http agent interface on port " + portNum);
379 }
380
381
382
383
384 private class StatsCollectorTask extends TimerTask {
385
386 public void run() {
387 long now = System.currentTimeMillis();
388
389 for(String adaptorId : getAdaptorList().keySet()) {
390 Adaptor adaptor = getAdaptor(adaptorId);
391 if(adaptor == null) continue;
392
393 Offset offset = adaptorPositions.get(adaptor);
394 if(offset == null) continue;
395
396 adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
397 }
398 }
399 }
400
401
402
403
404
405
406
407
408
409
410
411 private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+"
412
413
414
415 + "(?:"
416 + "([^\\s=]+)"
417 + "\\s*=\\s*"
418 + ")?"
419 + "([^\\s=]+)\\s+"
420 + "(\\S+)\\s+"
421 + "(?:"
422 + "(.*?)\\s+"
423
424 + ")?"
425 + "(\\d+)\\s*");
426
427
428
429
430
431
432
433
434 public String processAddCommand(String cmd) {
435 try {
436 return processAddCommandE(cmd);
437 } catch(AdaptorException e) {
438 return null;
439 }
440 }
441
442
443 public String processAddCommandE(String cmd) throws AdaptorException {
444 Matcher m = addCmdPattern.matcher(cmd);
445 if (m.matches()) {
446 long offset;
447 try {
448 offset = Long.parseLong(m.group(5));
449 } catch (NumberFormatException e) {
450 log.warn("malformed line " + cmd);
451 throw new AdaptorException("bad input syntax");
452 }
453
454 String adaptorID = m.group(1);
455 String adaptorClassName = m.group(2);
456 String dataType = m.group(3);
457 String params = m.group(4);
458 if (params == null)
459 params = "";
460
461 Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
462 if (adaptor == null) {
463 log.warn("Error creating adaptor of class " + adaptorClassName);
464 throw new AdaptorException("Can't load class " + adaptorClassName);
465 }
466 String coreParams = adaptor.parseArgs(dataType,params,this);
467 if(coreParams == null) {
468 log.warn("invalid params for adaptor: " + params);
469 throw new AdaptorException("invalid params for adaptor: " + params);
470 }
471
472 if(adaptorID == null) {
473 try {
474 adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
475 } catch(NoSuchAlgorithmException e) {
476 log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
477 shutdown(true);
478 }
479 } else if(!adaptorID.startsWith("adaptor_"))
480 adaptorID = "adaptor_"+adaptorID;
481
482 synchronized (adaptorsByName) {
483
484 if(adaptorsByName.containsKey(adaptorID))
485 return adaptorID;
486 adaptorsByName.put(adaptorID, adaptor);
487 adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
488 needNewCheckpoint = true;
489 try {
490 adaptor.start(adaptorID, dataType, offset, DataFactory
491 .getInstance().getEventQueue());
492 log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
493 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
494 ChukwaAgent.agentMetrics.addedAdaptor.inc();
495 return adaptorID;
496
497 } catch (Exception e) {
498 Adaptor failed = adaptorsByName.remove(adaptorID);
499 adaptorPositions.remove(failed);
500 adaptorStatsManager.remove(failed);
501 log.warn("failed to start adaptor", e);
502 if(e instanceof AdaptorException)
503 throw (AdaptorException)e;
504 }
505 }
506 } else if (cmd.length() > 0)
507 log.warn("only 'add' command supported in config files; cmd was: " + cmd);
508
509
510 return null;
511 }
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527 private boolean restoreFromCheckpoint() throws IOException {
528 synchronized (checkpointDir) {
529 String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
530 public boolean accept(File dir, String name) {
531 return name.startsWith(CHECKPOINT_BASE_NAME);
532 }
533 });
534
535 if (checkpointNames == null) {
536 log.error("Unable to list files in checkpoint dir");
537 return false;
538 }
539 if (checkpointNames.length == 0) {
540 log.info("No checkpoints found in " + checkpointDir);
541 return false;
542 }
543
544 if (checkpointNames.length > 2)
545 log.warn("expected at most two checkpoint files in " + checkpointDir
546 + "; saw " + checkpointNames.length);
547 else if (checkpointNames.length == 0)
548 return false;
549
550 String lowestName = null;
551 int lowestIndex = Integer.MAX_VALUE;
552 for (String n : checkpointNames) {
553 int index = Integer
554 .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
555 if (index < lowestIndex) {
556 lowestName = n;
557 lowestIndex = index;
558 }
559 }
560
561 checkpointNumber = lowestIndex + 1;
562 File checkpoint = new File(checkpointDir, lowestName);
563 readAdaptorsFile(checkpoint);
564 }
565 return true;
566 }
567
568 private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
569 IOException {
570 log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
571 BufferedReader br = new BufferedReader(new InputStreamReader(
572 new FileInputStream(checkpoint)));
573 String cmd = null;
574 while ((cmd = br.readLine()) != null)
575 processAddCommand(cmd);
576 br.close();
577 }
578
579
580
581
582
583
584 private void writeCheckpoint() throws IOException {
585 needNewCheckpoint = false;
586 synchronized (checkpointDir) {
587 log.info("writing checkpoint " + checkpointNumber);
588
589 FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
590 CHECKPOINT_BASE_NAME + checkpointNumber));
591 PrintWriter out = new PrintWriter(new BufferedWriter(
592 new OutputStreamWriter(fos)));
593
594 for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
595 out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
596 }
597
598 out.close();
599 File lastCheckpoint = new File(checkpointDir, CHECKPOINT_BASE_NAME
600 + (checkpointNumber - 1));
601 log.debug("hopefully removing old checkpoint file "
602 + lastCheckpoint.getAbsolutePath());
603 lastCheckpoint.delete();
604 checkpointNumber++;
605 }
606 }
607
608 public String reportCommit(Adaptor src, long uuid) {
609 needNewCheckpoint = true;
610 Offset o = adaptorPositions.get(src);
611 if (o != null) {
612 synchronized (o) {
613
614 if (uuid > o.offset)
615 o.offset = uuid;
616 }
617 log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
618 if(src instanceof NotifyOnCommitAdaptor) {
619 ((NotifyOnCommitAdaptor) src).committed(uuid);
620 }
621 return o.id;
622 } else {
623 log.warn("got commit up to " + uuid + " for adaptor " + src
624 + " that doesn't appear to be running: " + adaptorCount()
625 + " total");
626 return null;
627 }
628 }
629
630 private class CheckpointTask extends TimerTask {
631 public void run() {
632 try {
633 if (needNewCheckpoint) {
634 writeCheckpoint();
635 }
636 } catch (IOException e) {
637 log.warn("failed to write checkpoint", e);
638 }
639 }
640 }
641
642
643 private String formatAdaptorStatus(Adaptor a) {
644 return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() +
645 " " + adaptorPositions.get(a).offset;
646 }
647
648
649
650
651
652
653 public Map<String, String> getAdaptorList() {
654 Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
655 synchronized (adaptorsByName) {
656 for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
657 adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
658 }
659 }
660 return adaptors;
661 }
662
663
664 public long stopAdaptor(String name, boolean gracefully) {
665 if (gracefully)
666 return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
667 else
668 return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
669 }
670
671
672
673
674
675
676
677
678
679
680
681
682
683 public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
684 Adaptor toStop;
685 long offset = -1;
686
687
688
689
690 synchronized (adaptorsByName) {
691 toStop = adaptorsByName.remove(name);
692 }
693 if (toStop == null) {
694 log.warn("trying to stop " + name + " that isn't running");
695 return offset;
696 } else {
697 adaptorPositions.remove(toStop);
698 adaptorStatsManager.remove(toStop);
699 }
700 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
701 ChukwaAgent.agentMetrics.removedAdaptor.inc();
702
703 try {
704 offset = toStop.shutdown(shutdownMode);
705 log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
706 + toStop.getCurrentStatus());
707 } catch (AdaptorException e) {
708 log.error("adaptor failed to stop cleanly", e);
709 } finally {
710 needNewCheckpoint = true;
711 }
712 return offset;
713 }
714
715 @Override
716 public Configuration getConfiguration() {
717 return conf;
718 }
719
720 public static Configuration getStaticConfiguration() {
721 return conf;
722 }
723
724 @Override
725 public Adaptor getAdaptor(String name) {
726 synchronized(adaptorsByName) {
727 return adaptorsByName.get(name);
728 }
729 }
730
731 public Offset offset(Adaptor a) {
732 Offset o = adaptorPositions.get(a);
733 return o;
734 }
735
736 public Connector getConnector() {
737 return connector;
738 }
739
740 public void shutdown() {
741 shutdown(false);
742 }
743
744
745
746
747
748 public void shutdown(boolean exit) {
749 controlSock.shutdown();
750
751 if (statsCollector != null) {
752 statsCollector.cancel();
753 }
754
755 try {
756 jettyServer.stop();
757 } catch (Exception e) {
758 log.error("Couldn't stop jetty server.", e);
759 }
760
761
762
763 synchronized (adaptorsByName) {
764
765 for (Adaptor a : adaptorsByName.values()) {
766 try {
767 a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
768 } catch (AdaptorException e) {
769 log.warn("failed to cleanly stop " + a, e);
770 }
771 }
772 }
773 if (checkpointer != null) {
774 checkpointer.cancel();
775 try {
776 if (needNewCheckpoint)
777 writeCheckpoint();
778 } catch (IOException e) {
779 log.debug(ExceptionUtil.getStackTrace(e));
780 }
781 }
782 adaptorsByName.clear();
783 adaptorPositions.clear();
784 adaptorStatsManager.clear();
785 if (exit)
786 System.exit(0);
787 }
788
789
790
791
792 private AgentControlSocketListener getControlSock() {
793 return controlSock;
794 }
795
796 public String getAdaptorName(Adaptor initiator) {
797 Offset o = adaptorPositions.get(initiator);
798 if(o != null)
799 return o.id;
800 else return null;
801 }
802 }