This project has retired. For details please refer to its
Attic page.
ChukwaAgent xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.io.BufferedReader;
23 import java.io.BufferedWriter;
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.FilenameFilter;
29 import java.io.IOException;
30 import java.io.InputStreamReader;
31 import java.io.OutputStreamWriter;
32 import java.io.PrintWriter;
33 import java.nio.charset.Charset;
34 import java.security.NoSuchAlgorithmException;
35 import java.util.*;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.regex.Matcher;
38 import java.util.regex.Pattern;
39
40 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
41 import org.apache.hadoop.chukwa.datacollection.DataFactory;
42 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
43 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
44 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
45 import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
46 import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
47 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
48 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
49 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
50 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
51 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
52 import org.apache.hadoop.chukwa.util.ChukwaUtil;
53 import org.apache.hadoop.chukwa.util.ExceptionUtil;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.log4j.Logger;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class ChukwaAgent implements AdaptorManager {
72
73 static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
74
75 private final static Logger log = Logger.getLogger(ChukwaAgent.class);
76 private OffsetStatsManager<Adaptor> adaptorStatsManager = null;
77 private Timer statsCollector = null;
78 private static Configuration conf = null;
79 private volatile static ChukwaAgent agent = null;
80 public Connector connector = null;
81 private boolean stopped = false;
82
83 private ChukwaAgent() {
84 agent = new ChukwaAgent(new ChukwaConfiguration());
85 }
86
87 private ChukwaAgent(Configuration conf) {
88 agent = this;
89 ChukwaAgent.conf = conf;
90
91
92 adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
93 adaptorsByName = new HashMap<String, Adaptor>();
94 checkpointNumber = 0;
95 stopped = false;
96 }
97
98 public static ChukwaAgent getAgent() {
99 if(agent == null || agent.isStopped()) {
100 agent = new ChukwaAgent();
101 }
102 return agent;
103 }
104
105 public static ChukwaAgent getAgent(Configuration conf) {
106 if(agent == null || agent.isStopped()) {
107 agent = new ChukwaAgent(conf);
108 }
109 return agent;
110 }
111
112 public void start() throws AlreadyRunningException {
113 boolean checkPointRestore = conf.getBoolean(
114 "chukwaAgent.checkpoint.enabled", true);
115 checkPointBaseName = conf.get("chukwaAgent.checkpoint.name",
116 "chukwa_checkpoint_");
117 final int checkPointIntervalMs = conf.getInt(
118 "chukwaAgent.checkpoint.interval", 5000);
119 final int statsIntervalMs = conf.getInt(
120 "chukwaAgent.stats.collection.interval", 10000);
121 int statsDataTTLMs = conf.getInt(
122 "chukwaAgent.stats.data.ttl", 1200000);
123
124 if (conf.get("chukwaAgent.checkpoint.dir") != null)
125 checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
126 else
127 checkPointRestore = false;
128
129 if (checkpointDir != null && !checkpointDir.exists()) {
130 boolean result = checkpointDir.mkdirs();
131 if(!result) {
132 log.error("Failed to create check point directory.");
133 }
134 }
135 String tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
136 DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
137
138 log.info("Config - CHECKPOINT_BASE_NAME: [" + checkPointBaseName + "]");
139 log.info("Config - checkpointDir: [" + checkpointDir + "]");
140 log.info("Config - CHECKPOINT_INTERVAL_MS: [" + checkPointIntervalMs
141 + "]");
142 log.info("Config - DO_CHECKPOINT_RESTORE: [" + checkPointRestore + "]");
143 log.info("Config - STATS_INTERVAL_MS: [" + statsIntervalMs + "]");
144 log.info("Config - tags: [" + tags + "]");
145
146 if (checkPointRestore) {
147 log.info("checkpoints are enabled, period is " + checkPointIntervalMs);
148 }
149
150 File initialAdaptors = null;
151 if (conf.get("chukwaAgent.initial_adaptors") != null)
152 initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
153
154 try {
155 if (checkPointRestore) {
156 restoreFromCheckpoint();
157 }
158 } catch (IOException e) {
159 log.warn("failed to restart from checkpoint: ", e);
160 }
161
162 try {
163 if (initialAdaptors != null && initialAdaptors.exists())
164 readAdaptorsFile(initialAdaptors);
165 } catch (IOException e) {
166 log.warn("couldn't read user-specified file "
167 + initialAdaptors.getAbsolutePath());
168 }
169
170 controlSock = new AgentControlSocketListener(this);
171 try {
172 controlSock.tryToBind();
173
174 controlSock.start();
175 log.info("control socket started on port " + controlSock.portno);
176 } catch (IOException e) {
177 log.info("failed to bind to socket; aborting agent launch", e);
178 throw new AlreadyRunningException();
179 }
180
181
182 try {
183 adaptorStatsManager = new OffsetStatsManager<Adaptor>(statsDataTTLMs);
184 statsCollector = new Timer("ChukwaAgent Stats Collector");
185
186 startHttpServer(conf);
187
188 statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
189 statsIntervalMs, statsIntervalMs);
190 } catch (Exception e) {
191 log.error("Couldn't start HTTP server", e);
192 throw new RuntimeException(e);
193 }
194
195
196
197 if (checkPointIntervalMs > 0 && checkpointDir != null) {
198 checkpointer = new Timer();
199 checkpointer.schedule(new CheckpointTask(), 0, checkPointIntervalMs);
200 }
201 }
202
203
204 public static class Offset {
205 public Offset(long l, String id) {
206 offset = l;
207 this.id = id;
208 }
209
210 final String id;
211 volatile long offset;
212 public long offset() {
213 return this.offset;
214 }
215
216 public String adaptorID() {
217 return id;
218 }
219 }
220
221 public static class AlreadyRunningException extends Exception {
222
223 private static final long serialVersionUID = 1L;
224
225 public AlreadyRunningException() {
226 super("Agent already running; aborting");
227 }
228 }
229
230 private static Map<Adaptor, Offset> adaptorPositions;
231
232
233
234 private static Map<String, Adaptor> adaptorsByName;
235
236 private File checkpointDir;
237
238 private String checkPointBaseName;
239
240
241 private Timer checkpointer;
242 private volatile boolean needNewCheckpoint = false;
243
244
245 private int checkpointNumber;
246
247
248 private AgentControlSocketListener controlSock;
249
250 public int getControllerPort() {
251 return controlSock.getPort();
252 }
253
254 public OffsetStatsManager<Adaptor> getAdaptorStatsManager() {
255 return adaptorStatsManager;
256 }
257
258
259
260
261
262 public static void main(String[] args) throws AdaptorException {
263
264 try {
265 if (args.length > 0 && args[0].equals("-help")) {
266 System.out.println("usage: LocalAgent [-noCheckPoint]"
267 + "[default collector URL]");
268 return;
269 }
270
271 Configuration conf = ChukwaUtil.readConfiguration();
272 agent = ChukwaAgent.getAgent(conf);
273 if (agent.anotherAgentIsRunning()) {
274 log.error("another agent is running (or port has been usurped). "
275 + "Bailing out now");
276 throw new AlreadyRunningException();
277 }
278
279 int uriArgNumber = 0;
280 if (args.length > 0) {
281 if (args[uriArgNumber].equals("local")) {
282 agent.connector = new ConsoleOutConnector(agent);
283 } else {
284 if (!args[uriArgNumber].contains("://")) {
285 args[uriArgNumber] = "http://" + args[uriArgNumber];
286 }
287 agent.connector = new HttpConnector(agent, args[uriArgNumber]);
288 }
289 } else {
290 String connectorType = conf.get("chukwa.agent.connector",
291 "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
292 agent.connector = (Connector) Class.forName(connectorType).newInstance();
293 }
294 agent.start();
295 agent.connector.start();
296
297 log.info("local agent started on port " + agent.getControlSock().portno);
298 System.out.close();
299 System.err.close();
300 } catch (AlreadyRunningException e) {
301 log.error("agent started already on this machine with same portno;"
302 + " bailing out");
303 System.out
304 .println("agent started already on this machine with same portno;"
305 + " bailing out");
306 return;
307 } catch (Exception e) {
308 e.printStackTrace();
309 }
310 }
311
312 private boolean anotherAgentIsRunning() {
313 boolean result = false;
314 if(controlSock!=null) {
315 result = !controlSock.isBound();
316 }
317 return result;
318 }
319
320
321
322
323 @Override
324 public int adaptorCount() {
325 synchronized(adaptorsByName) {
326 return adaptorsByName.size();
327 }
328 }
329
330 private void startHttpServer(Configuration conf) throws Exception {
331 ChukwaRestServer.startInstance(conf);
332 }
333
334 private void stopHttpServer() throws Exception {
335 ChukwaRestServer.stopInstance();
336 }
337
338
339
340
341 private class StatsCollectorTask extends TimerTask {
342
343 public void run() {
344 long now = System.currentTimeMillis();
345
346 for(String adaptorId : getAdaptorList().keySet()) {
347 Adaptor adaptor = getAdaptor(adaptorId);
348 if(adaptor == null) continue;
349
350 Offset offset = adaptorPositions.get(adaptor);
351 if(offset == null) continue;
352
353 adaptorStatsManager.addOffsetDataPoint(adaptor, offset.offset, now);
354 }
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367
368 private Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+"
369
370
371
372 + "(?:"
373 + "([^\\s=]+)"
374 + "\\s*=\\s*"
375 + ")?"
376 + "([^\\s=]+)\\s+"
377 + "(\\S+)\\s+"
378 + "(?:"
379 + "(.*?)\\s+"
380
381 + ")?"
382 + "(\\d+)\\s*");
383
384
385
386
387
388
389
390
391 public String processAddCommand(String cmd) {
392 try {
393 return processAddCommandE(cmd);
394 } catch(AdaptorException e) {
395 return null;
396 }
397 }
398
399
400 public String processAddCommandE(String cmd) throws AdaptorException {
401 Matcher m = addCmdPattern.matcher(cmd);
402 if (m.matches()) {
403 long offset;
404 try {
405 offset = Long.parseLong(m.group(5));
406 } catch (NumberFormatException e) {
407 log.warn("malformed line " + cmd);
408 throw new AdaptorException("bad input syntax");
409 }
410
411 String adaptorID = m.group(1);
412 String adaptorClassName = m.group(2);
413 String dataType = m.group(3);
414 String params = m.group(4);
415 if (params == null)
416 params = "";
417
418 Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
419 if (adaptor == null) {
420 log.warn("Error creating adaptor of class " + adaptorClassName);
421 throw new AdaptorException("Can't load class " + adaptorClassName);
422 }
423 String coreParams = adaptor.parseArgs(dataType,params,this);
424 if(coreParams == null) {
425 log.warn("invalid params for adaptor: " + params);
426 throw new AdaptorException("invalid params for adaptor: " + params);
427 }
428
429 if(adaptorID == null) {
430 try {
431 adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
432 } catch(NoSuchAlgorithmException e) {
433 log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
434 shutdown(true);
435 }
436 } else if(!adaptorID.startsWith("adaptor_"))
437 adaptorID = "adaptor_"+adaptorID;
438
439 synchronized (adaptorsByName) {
440
441 if(adaptorsByName.containsKey(adaptorID))
442 return adaptorID;
443 adaptorsByName.put(adaptorID, adaptor);
444 adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
445 needNewCheckpoint = true;
446 try {
447 adaptor.start(adaptorID, dataType, offset, DataFactory
448 .getInstance().getEventQueue());
449 log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
450 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
451 ChukwaAgent.agentMetrics.addedAdaptor.inc();
452 return adaptorID;
453
454 } catch (Exception e) {
455 Adaptor failed = adaptorsByName.remove(adaptorID);
456 adaptorPositions.remove(failed);
457 adaptorStatsManager.remove(failed);
458 log.warn("failed to start adaptor", e);
459 if(e instanceof AdaptorException)
460 throw (AdaptorException)e;
461 }
462 }
463 } else if (cmd.length() > 0)
464 log.warn("only 'add' command supported in config files; cmd was: " + cmd);
465
466
467 return null;
468 }
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484 private boolean restoreFromCheckpoint() throws IOException {
485 synchronized (checkpointDir) {
486 String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
487 public boolean accept(File dir, String name) {
488 return name.startsWith(checkPointBaseName);
489 }
490 });
491
492 if (checkpointNames == null) {
493 log.error("Unable to list files in checkpoint dir");
494 return false;
495 } else if (checkpointNames.length == 0) {
496 log.info("No checkpoints found in " + checkpointDir);
497 return false;
498 } else if (checkpointNames.length > 2) {
499 log.warn("expected at most two checkpoint files in " + checkpointDir
500 + "; saw " + checkpointNames.length);
501 }
502
503 String lowestName = null;
504 int lowestIndex = Integer.MAX_VALUE;
505 for (String n : checkpointNames) {
506 int index = Integer
507 .parseInt(n.substring(checkPointBaseName.length()));
508 if (index < lowestIndex) {
509 lowestName = n;
510 lowestIndex = index;
511 }
512 }
513
514 checkpointNumber = lowestIndex + 1;
515 File checkpoint = new File(checkpointDir, lowestName);
516 readAdaptorsFile(checkpoint);
517 }
518 return true;
519 }
520
521 private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
522 IOException {
523 log.info("starting adaptors listed in " + checkpoint.getAbsolutePath());
524 BufferedReader br = new BufferedReader(new InputStreamReader(
525 new FileInputStream(checkpoint), Charset.forName("UTF-8")));
526 String cmd = null;
527 while ((cmd = br.readLine()) != null)
528 processAddCommand(cmd);
529 br.close();
530 }
531
532
533
534
535
536
537 private void writeCheckpoint() throws IOException {
538 needNewCheckpoint = false;
539 synchronized (checkpointDir) {
540 log.info("writing checkpoint " + checkpointNumber);
541
542 FileOutputStream fos = new FileOutputStream(new File(checkpointDir,
543 checkPointBaseName + checkpointNumber));
544 PrintWriter out = new PrintWriter(new BufferedWriter(
545 new OutputStreamWriter(fos, Charset.forName("UTF-8"))));
546
547 for (Map.Entry<String, String> stat : getAdaptorList().entrySet()) {
548 out.println("ADD "+ stat.getKey()+ " = " + stat.getValue());
549 }
550
551 out.close();
552 File lastCheckpoint = new File(checkpointDir, checkPointBaseName
553 + (checkpointNumber - 1));
554 log.debug("hopefully removing old checkpoint file "
555 + lastCheckpoint.getAbsolutePath());
556 boolean result = lastCheckpoint.delete();
557 if(!result) {
558 log.warn("Unable to delete lastCheckpoint file: "+lastCheckpoint.getAbsolutePath());
559 }
560 checkpointNumber++;
561 }
562 }
563
564 public String reportCommit(Adaptor src, long uuid) {
565 needNewCheckpoint = true;
566 Offset o = adaptorPositions.get(src);
567 if (o != null) {
568 synchronized (o) {
569
570 if (uuid > o.offset)
571 o.offset = uuid;
572 }
573 log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
574 if(src instanceof NotifyOnCommitAdaptor) {
575 ((NotifyOnCommitAdaptor) src).committed(uuid);
576 }
577 return o.id;
578 } else {
579 log.warn("got commit up to " + uuid + " for adaptor " + src
580 + " that doesn't appear to be running: " + adaptorCount()
581 + " total");
582 return null;
583 }
584 }
585
586 private class CheckpointTask extends TimerTask {
587 public void run() {
588 try {
589 if (needNewCheckpoint) {
590 writeCheckpoint();
591 }
592 } catch (IOException e) {
593 log.warn("failed to write checkpoint", e);
594 }
595 }
596 }
597
598
599 private String formatAdaptorStatus(Adaptor a) {
600 return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() +
601 " " + adaptorPositions.get(a).offset;
602 }
603
604
605
606
607
608
609 public Map<String, String> getAdaptorList() {
610 Map<String, String> adaptors = new HashMap<String, String>(adaptorsByName.size());
611 synchronized (adaptorsByName) {
612 for (Map.Entry<String, Adaptor> a : adaptorsByName.entrySet()) {
613 adaptors.put(a.getKey(), formatAdaptorStatus(a.getValue()));
614 }
615 }
616 return adaptors;
617 }
618
619
620 public long stopAdaptor(String name, boolean gracefully) {
621 if (gracefully)
622 return stopAdaptor(name, AdaptorShutdownPolicy.GRACEFULLY);
623 else
624 return stopAdaptor(name, AdaptorShutdownPolicy.HARD_STOP);
625 }
626
627
628
629
630
631
632
633
634
635
636
637
638
639 public long stopAdaptor(String name, AdaptorShutdownPolicy shutdownMode) {
640 Adaptor toStop;
641 long offset = -1;
642
643
644
645
646 synchronized (adaptorsByName) {
647 toStop = adaptorsByName.remove(name);
648 }
649 if (toStop == null) {
650 log.warn("trying to stop " + name + " that isn't running");
651 return offset;
652 } else {
653 adaptorPositions.remove(toStop);
654 adaptorStatsManager.remove(toStop);
655 }
656 ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
657 ChukwaAgent.agentMetrics.removedAdaptor.inc();
658
659 try {
660 offset = toStop.shutdown(shutdownMode);
661 log.info("shutdown ["+ shutdownMode + "] on " + name + ", "
662 + toStop.getCurrentStatus());
663 } catch (AdaptorException e) {
664 log.error("adaptor failed to stop cleanly", e);
665 } finally {
666 needNewCheckpoint = true;
667 }
668 return offset;
669 }
670
671 @Override
672 public Configuration getConfiguration() {
673 return conf;
674 }
675
676 public static Configuration getStaticConfiguration() {
677 return conf;
678 }
679
680 @Override
681 public Adaptor getAdaptor(String name) {
682 synchronized(adaptorsByName) {
683 return adaptorsByName.get(name);
684 }
685 }
686
687 public Offset offset(Adaptor a) {
688 Offset o = adaptorPositions.get(a);
689 return o;
690 }
691
692 public Connector getConnector() {
693 return connector;
694 }
695
696 public void shutdown() {
697 shutdown(false);
698 }
699
700
701
702
703
704
705 public void shutdown(boolean exit) {
706 controlSock.shutdown();
707
708 if (statsCollector != null) {
709 statsCollector.cancel();
710 }
711
712 try {
713 stopHttpServer();
714 } catch (Exception e) {
715 log.error("Couldn't stop jetty server.", e);
716 }
717
718
719
720 synchronized (adaptorsByName) {
721
722 for (Adaptor a : adaptorsByName.values()) {
723 try {
724 a.shutdown(AdaptorShutdownPolicy.HARD_STOP);
725 } catch (AdaptorException e) {
726 log.warn("failed to cleanly stop " + a, e);
727 }
728 }
729 }
730 if (checkpointer != null) {
731 checkpointer.cancel();
732 try {
733 if (needNewCheckpoint)
734 writeCheckpoint();
735 } catch (IOException e) {
736 log.debug(ExceptionUtil.getStackTrace(e));
737 }
738 }
739 adaptorsByName.clear();
740 adaptorPositions.clear();
741 adaptorStatsManager.clear();
742 agent.stop();
743 if (exit)
744 return;
745 }
746
747
748
749
750 private void stop() {
751 stopped = true;
752 }
753
754
755
756
757
758 private boolean isStopped() {
759 return stopped;
760 }
761
762
763
764
765 private AgentControlSocketListener getControlSock() {
766 return controlSock;
767 }
768
769 public String getAdaptorName(Adaptor initiator) {
770 Offset o = adaptorPositions.get(initiator);
771 if(o != null)
772 return o.id;
773 else return null;
774 }
775 }