This project has retired. For details please refer to its
Attic page.
ChukwaHttpSender xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.sender;
20
21
22 import java.io.BufferedReader;
23 import java.io.ByteArrayInputStream;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.io.OutputStream;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31 import java.util.List;
32
33 import org.apache.commons.httpclient.HttpClient;
34 import org.apache.commons.httpclient.HttpException;
35 import org.apache.commons.httpclient.HttpMethod;
36 import org.apache.commons.httpclient.HttpMethodBase;
37 import org.apache.commons.httpclient.HttpMethodRetryHandler;
38 import org.apache.commons.httpclient.HttpStatus;
39 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
40 import org.apache.commons.httpclient.methods.PostMethod;
41 import org.apache.commons.httpclient.methods.RequestEntity;
42 import org.apache.commons.httpclient.params.HttpMethodParams;
43 import org.apache.hadoop.chukwa.Chunk;
44 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
45 import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.io.DataOutputBuffer;
48 import org.apache.hadoop.io.compress.CompressionCodec;
49 import org.apache.hadoop.io.compress.CompressionOutputStream;
50 import org.apache.hadoop.util.ReflectionUtils;
51 import org.apache.log4j.Logger;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class ChukwaHttpSender implements ChukwaSender {
72 final int MAX_RETRIES_PER_COLLECTOR;
73 final int SENDER_RETRIES;
74 final int WAIT_FOR_COLLECTOR_REBOOT;
75 final int COLLECTOR_TIMEOUT;
76
77 public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
78
79
80 static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
81
82 static Logger log = Logger.getLogger(ChukwaHttpSender.class);
83 static HttpClient client = null;
84 static MultiThreadedHttpConnectionManager connectionManager = null;
85 String currCollector = null;
86 int postID = 0;
87
88 protected Iterator<String> collectors;
89
90 static boolean COMPRESS;
91 static String CODEC_NAME;
92 static CompressionCodec codec;
93
94 static {
95 connectionManager = new MultiThreadedHttpConnectionManager();
96 client = new HttpClient(connectionManager);
97 connectionManager.closeIdleConnections(1000);
98 }
99
100 public static class CommitListEntry {
101 public Adaptor adaptor;
102 public long uuid;
103 public long start;
104 public CommitListEntry(Adaptor a, long uuid, long start) {
105 adaptor = a;
106 this.uuid = uuid;
107 this.start = start;
108 }
109 }
110
111
112 static class BuffersRequestEntity implements RequestEntity {
113 List<DataOutputBuffer> buffers;
114
115 public BuffersRequestEntity(List<DataOutputBuffer> buf) {
116 buffers = buf;
117 }
118
119 private long getUncompressedContentLenght(){
120 long len = 4;
121 for (DataOutputBuffer b : buffers)
122 len += b.getLength();
123 return len;
124 }
125
126 public long getContentLength() {
127 if( COMPRESS) {
128 return -1;
129 }
130 else {
131 return getUncompressedContentLenght();
132 }
133 }
134
135 public String getContentType() {
136 return "application/octet-stream";
137 }
138
139 public boolean isRepeatable() {
140 return true;
141 }
142
143 private void doWriteRequest( DataOutputStream out ) throws IOException {
144 out.writeInt(buffers.size());
145 for (DataOutputBuffer b : buffers)
146 out.write(b.getData(), 0, b.getLength());
147 }
148
149 public void writeRequest(OutputStream out) throws IOException {
150 if( COMPRESS) {
151 CompressionOutputStream cos = codec.createOutputStream(out);
152 DataOutputStream dos = new DataOutputStream( cos);
153 doWriteRequest( dos);
154 cos.finish();
155 }
156 else {
157 DataOutputStream dos = new DataOutputStream( out);
158 doWriteRequest( dos);
159 }
160 }
161 }
162
163 public ChukwaHttpSender(Configuration c) {
164
165 ArrayList<String> tmp = new ArrayList<String>();
166 this.collectors = tmp.iterator();
167
168 MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
169 SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
170 WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
171 20 * 1000);
172 COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
173 COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
174 if( COMPRESS) {
175 CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
176 Class<?> codecClass = null;
177 try {
178 codecClass = Class.forName( CODEC_NAME);
179 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
180 log.info("codec " + CODEC_NAME + " loaded for network compression");
181 } catch (ClassNotFoundException e) {
182 log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
183 COMPRESS = false;
184 }
185 }
186 }
187
188
189
190
191
192
193 public void setCollectors(Iterator<String> collectors) {
194 this.collectors = collectors;
195
196
197 if (currCollector == null) {
198 if (collectors.hasNext()) {
199 currCollector = collectors.next();
200 } else
201 log.error("No collectors to try in send(), won't even try to do doPost()");
202 }
203 }
204
205
206
207
208
209
210
211 @Override
212 public List<CommitListEntry> send(List<Chunk> toSend)
213 throws InterruptedException, IOException {
214 List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
215 List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
216
217 int thisPost = postID++;
218 int toSendSize = toSend.size();
219 log.info("collected " + toSendSize + " chunks for post_"+thisPost);
220
221
222
223 for (Chunk c : toSend) {
224 DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
225 try {
226 c.write(b);
227 } catch (IOException err) {
228 log.error("serialization threw IOException", err);
229 }
230 serializedEvents.add(b);
231
232
233
234 log.info("chunk seqID:"+c.getSeqID());
235 commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
236 c.getSeqID() - c.getData().length));
237 }
238 toSend.clear();
239
240
241 RequestEntity postData = new BuffersRequestEntity(serializedEvents);
242
243 PostMethod method = new PostMethod();
244 method.setRequestEntity(postData);
245 StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
246 sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
247 if( COMPRESS) {
248 sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
249 .append( " of uncompressed data");
250 }
251 else {
252 sb.append( postData.getContentLength());
253 }
254 log.info( sb);
255
256 List<CommitListEntry> results = postAndParseResponse(method, commitResults);
257 log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
258 return results;
259 }
260
261
262
263
264
265
266
267
268
269 public List<CommitListEntry> postAndParseResponse(PostMethod method,
270 List<CommitListEntry> expectedCommitResults)
271 throws IOException, InterruptedException{
272 reliablySend(method, "chukwa");
273 return expectedCommitResults;
274 }
275
276
277
278
279
280
281
282
283 protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
284 int retries = SENDER_RETRIES;
285 while (currCollector != null) {
286
287 try {
288
289
290 List<String> responses = doRequest(method, currCollector+ pathSuffix);
291
292 retries = SENDER_RETRIES;
293
294 return responses;
295 } catch (Throwable e) {
296 log.error("Http post exception on "+ currCollector +": "+ e.toString());
297 log.debug("Http post exception on "+ currCollector, e);
298 ChukwaHttpSender.metrics.httpThrowable.inc();
299 if (collectors.hasNext()) {
300 ChukwaHttpSender.metrics.collectorRollover.inc();
301 boolean repeatPost = failedCollector(currCollector);
302 currCollector = collectors.next();
303 if(repeatPost)
304 log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
305 + currCollector);
306 else {
307 log.info("Using " + currCollector + " in the future, but not retrying this post");
308 break;
309 }
310 } else {
311 if (retries > 0) {
312 log.warn("No more collectors to try rolling over to; waiting "
313 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
314 + " retries left)");
315 Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
316 retries--;
317 } else {
318 log.error("No more collectors to try rolling over to; aborting post");
319 throw new IOException("no collectors");
320 }
321 }
322 } finally {
323
324 method.releaseConnection();
325 }
326 }
327 return new ArrayList<String>();
328 }
329
330
331
332
333
334
335 protected boolean failedCollector(String downCollector) {
336 log.debug("declaring "+ downCollector + " down");
337 return true;
338 }
339
340
341
342
343
344
345 protected List<String> doRequest(HttpMethodBase method, String dest)
346 throws IOException, HttpException {
347
348 HttpMethodParams pars = method.getParams();
349 pars.setParameter(HttpMethodParams.RETRY_HANDLER,
350 (Object) new HttpMethodRetryHandler() {
351 public boolean retryMethod(HttpMethod m, IOException e, int exec) {
352 return !(e instanceof java.net.ConnectException)
353 && (exec < MAX_RETRIES_PER_COLLECTOR);
354 }
355 });
356
357 pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
358
359 method.setParams(pars);
360 method.setPath(dest);
361
362
363 ChukwaHttpSender.metrics.httpPost.inc();
364
365 int statusCode = client.executeMethod(method);
366
367 if (statusCode != HttpStatus.SC_OK) {
368 ChukwaHttpSender.metrics.httpException.inc();
369
370 if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
371 ChukwaHttpSender.metrics.httpTimeOutException.inc();
372 }
373
374 log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
375
376 throw new HttpException("got back a failure from server");
377 }
378
379 log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
380 + method.getResponseContentLength());
381
382
383 InputStream rstream = null;
384
385
386 byte[] resp_buf = method.getResponseBody();
387 rstream = new ByteArrayInputStream(resp_buf);
388 BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
389 String line;
390 List<String> resp = new ArrayList<String>();
391 while ((line = br.readLine()) != null) {
392 if (log.isDebugEnabled()) {
393 log.debug("response: " + line);
394 }
395 resp.add(line);
396 }
397 return resp;
398 }
399
400 @Override
401 public void stop() {
402 }
403 }