This project has retired. For details please refer to its
Attic page.
HttpConnector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.connector.http;
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Timer;
43 import java.util.TimerTask;
44 import org.apache.hadoop.chukwa.Chunk;
45 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
46 import org.apache.hadoop.chukwa.datacollection.DataFactory;
47 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
48 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
49 import org.apache.hadoop.chukwa.datacollection.sender.*;
50 import org.apache.hadoop.chukwa.util.DaemonWatcher;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.log4j.Logger;
53
54 public class HttpConnector implements Connector, Runnable {
55
56 static Logger log = Logger.getLogger(HttpConnector.class);
57
58 Timer statTimer = null;
59 volatile int chunkCount = 0;
60
61 int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
62 int MIN_POST_INTERVAL = 5 * 1000;
63 public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
64 public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
65 public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
66
67 boolean ASYNC_ACKS = false;
68
69 ChunkQueue chunkQueue;
70
71 ChukwaAgent agent;
72 String argDestination = null;
73
74 private volatile boolean stopMe = false;
75 private Iterator<String> collectors = null;
76 protected ChukwaSender connectorClient = null;
77
78 {
79 statTimer = new Timer();
80 statTimer.schedule(new TimerTask() {
81 public void run() {
82 int count = chunkCount;
83 chunkCount = 0;
84 log.info("# http chunks ACK'ed since last report: " + count);
85 }
86 }, 100, 60 * 1000);
87 }
88
89 public HttpConnector(ChukwaAgent agent) {
90 this.agent = agent;
91 }
92
93 public HttpConnector(ChukwaAgent agent, String destination) {
94 this.agent = agent;
95 this.argDestination = destination;
96
97 log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
98 + destination);
99 }
100
101 public void start() {
102
103 chunkQueue = DataFactory.getInstance().getEventQueue();
104 Configuration conf = agent.getConfiguration();
105 MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
106 MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
107 ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
108 (new Thread(this, "HTTP post thread")).start();
109 }
110
111 public void shutdown() {
112 stopMe = true;
113 connectorClient.stop();
114 }
115
116 public void run() {
117 log.info("HttpConnector started at time:" + System.currentTimeMillis());
118
119
120 try {
121 if(collectors == null)
122 collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
123 } catch (IOException e) {
124 log.error("Failed to retrieve list of collectors from "
125 + "conf/collectors file", e);
126 }
127
128 if(ASYNC_ACKS) {
129 try {
130 connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
131 } catch(IOException e) {
132 log.fatal("can't read AsycAck hostlist file, exiting");
133 agent.shutdown(true);
134 }
135 } else
136 connectorClient = new ChukwaHttpSender(agent.getConfiguration());
137
138 if (argDestination != null) {
139 ArrayList<String> tmp = new ArrayList<String>();
140 tmp.add(argDestination);
141 collectors = tmp.iterator();
142 log.info("using collector specified at agent runtime: " + argDestination);
143 } else
144 log.info("using collectors from collectors file");
145
146 if (collectors == null || !collectors.hasNext()) {
147 log.error("No collectors specified, exiting (and taking agent with us).");
148 agent.shutdown(true);
149 return;
150 }
151
152 connectorClient.setCollectors(collectors);
153
154
155 try {
156 long lastPost = System.currentTimeMillis();
157 while (!stopMe) {
158 List<Chunk> newQueue = new ArrayList<Chunk>();
159 try {
160
161 chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
162
163
164 } catch (InterruptedException e) {
165 System.out.println("thread interrupted during addChunks(ChunkQueue)");
166 Thread.currentThread().interrupt();
167 break;
168 }
169 List<ChukwaHttpSender.CommitListEntry> results = connectorClient
170 .send(newQueue);
171
172 for (ChukwaHttpSender.CommitListEntry cle : results) {
173 agent.reportCommit(cle.adaptor, cle.uuid);
174 chunkCount++;
175 }
176
177 long now = System.currentTimeMillis();
178 long delta = MIN_POST_INTERVAL - now + lastPost;
179 if(delta > 0) {
180 Thread.sleep(delta);
181 }
182 lastPost = now;
183 }
184 log.info("received stop() command so exiting run() loop to shutdown connector");
185 } catch (OutOfMemoryError e) {
186 log.warn("Bailing out", e);
187 DaemonWatcher.bailout(-1);
188 } catch (InterruptedException e) {
189
190 log.warn("Bailing out", e);
191 DaemonWatcher.bailout(-1);
192 } catch (java.io.IOException e) {
193 log.error("connector failed; shutting down agent");
194 agent.shutdown(true);
195 }
196 }
197
198 @Override
199 public void reloadConfiguration() {
200 Iterator<String> destinations = null;
201
202
203 try {
204 destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
205 } catch (IOException e) {
206 log.error("Failed to retreive list of collectors from conf/collectors file", e);
207 }
208 if (destinations != null && destinations.hasNext()) {
209 collectors = destinations;
210 connectorClient.setCollectors(collectors);
211 log.info("Resetting collectors");
212 }
213 }
214
215 public ChukwaSender getSender() {
216 return connectorClient;
217 }
218
219 public void setCollectors(Iterator<String> list) {
220 collectors = list;
221 }
222 }