This project has retired. For details please refer to its
Attic page.
PipelineConnector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.connector;
20
21
22
23
24
25
26
27
28
29
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Timer;
33 import java.util.TimerTask;
34
35 import org.apache.hadoop.chukwa.Chunk;
36 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
37 import org.apache.hadoop.chukwa.datacollection.DataFactory;
38 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
40 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
42 import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
43 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
44 import org.apache.hadoop.chukwa.util.DaemonWatcher;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.log4j.Logger;
47
48 public class PipelineConnector implements Connector, Runnable {
49
50 static Logger log = Logger.getLogger(PipelineConnector.class);
51
52 Timer statTimer = null;
53 volatile int chunkCount = 0;
54
55 int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
56 int MIN_POST_INTERVAL = 5 * 1000;
57 public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
58 public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
59 public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
60
61 ChunkQueue chunkQueue;
62
63 private static volatile ChukwaAgent agent = null;
64
65 private volatile boolean stopMe = false;
66 protected ChukwaWriter writers = null;
67
68 public PipelineConnector() {
69
70 statTimer = new Timer();
71 statTimer.schedule(new TimerTask() {
72 public void run() {
73 int count = chunkCount;
74 chunkCount = 0;
75 log.info("# Data chunks sent since last report: " + count);
76 }
77 }, 100, 60 * 1000);
78 }
79
80 public void start() {
81 chunkQueue = DataFactory.getInstance().getEventQueue();
82 agent = ChukwaAgent.getAgent();
83 Configuration conf = agent.getConfiguration();
84 MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
85 MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
86 try {
87 writers = new PipelineStageWriter(conf);
88 (new Thread(this, "Pipeline connector thread")).start();
89 } catch(Exception e) {
90 log.error("Pipeline initialization error: ", e);
91 }
92 }
93
94 public void shutdown() {
95 stopMe = true;
96 try {
97 writers.close();
98 } catch (WriterException e) {
99 log.warn("Shutdown error: ",e);
100 }
101 }
102
103 public void run() {
104 log.info("PipelineConnector started at time:" + System.currentTimeMillis());
105
106 try {
107 long lastPost = System.currentTimeMillis();
108 while (!stopMe) {
109 List<Chunk> newQueue = new ArrayList<Chunk>();
110 try {
111
112 chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
113 } catch (InterruptedException e) {
114 log.warn("thread interrupted during addChunks(ChunkQueue)");
115 Thread.currentThread().interrupt();
116 break;
117 }
118 CommitStatus result = writers.add(newQueue);
119 if(result.equals(ChukwaWriter.COMMIT_OK)) {
120 chunkCount = newQueue.size();
121 for (Chunk c : newQueue) {
122 agent.reportCommit(c.getInitiator(), c.getSeqID());
123 }
124 }
125 long now = System.currentTimeMillis();
126 long delta = MIN_POST_INTERVAL - now + lastPost;
127 if(delta > 0) {
128 Thread.sleep(delta);
129 }
130 lastPost = now;
131 }
132 log.info("received stop() command so exiting run() loop to shutdown connector");
133 } catch (WriterException e) {
134 log.warn("PipelineStageWriter Exception: ", e);
135 } catch (OutOfMemoryError e) {
136 log.warn("Bailing out", e);
137 DaemonWatcher.bailout(-1);
138 } catch (InterruptedException e) {
139
140 log.warn("Bailing out", e);
141 DaemonWatcher.bailout(-1);
142 } catch (Throwable e) {
143 log.error("connector failed; shutting down agent: ", e);
144 throw new RuntimeException("Shutdown pipeline connector.");
145 }
146 }
147
148 @Override
149 public void reloadConfiguration() {
150 }
151
152 }