This project has retired. For details please refer to its
Attic page.
ChukwaHttpSender xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.sender;
20
21
22 import java.io.BufferedReader;
23 import java.io.ByteArrayInputStream;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.io.OutputStream;
29 import java.nio.charset.Charset;
30 import java.util.ArrayList;
31 import java.util.Iterator;
32 import java.util.List;
33
34 import org.apache.commons.httpclient.HttpClient;
35 import org.apache.commons.httpclient.HttpException;
36 import org.apache.commons.httpclient.HttpMethod;
37 import org.apache.commons.httpclient.HttpMethodBase;
38 import org.apache.commons.httpclient.HttpMethodRetryHandler;
39 import org.apache.commons.httpclient.HttpStatus;
40 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
41 import org.apache.commons.httpclient.methods.PostMethod;
42 import org.apache.commons.httpclient.methods.RequestEntity;
43 import org.apache.commons.httpclient.params.HttpMethodParams;
44 import org.apache.hadoop.chukwa.Chunk;
45 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
46 import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.io.DataOutputBuffer;
49 import org.apache.hadoop.io.compress.CompressionCodec;
50 import org.apache.hadoop.io.compress.CompressionOutputStream;
51 import org.apache.hadoop.util.ReflectionUtils;
52 import org.apache.log4j.Logger;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class ChukwaHttpSender implements ChukwaSender {
73 final int MAX_RETRIES_PER_COLLECTOR;
74 final int SENDER_RETRIES;
75 final int WAIT_FOR_COLLECTOR_REBOOT;
76 final int COLLECTOR_TIMEOUT;
77
78 public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
79
80
81 static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
82
83 static Logger log = Logger.getLogger(ChukwaHttpSender.class);
84 static HttpClient client = null;
85 static MultiThreadedHttpConnectionManager connectionManager = null;
86 String currCollector = null;
87 int postID = 0;
88
89 protected Iterator<String> collectors;
90
91 boolean COMPRESS;
92 String CODEC_NAME;
93 CompressionCodec codec;
94
95 static {
96 connectionManager = new MultiThreadedHttpConnectionManager();
97 client = new HttpClient(connectionManager);
98 connectionManager.closeIdleConnections(1000);
99 }
100
101 public static class CommitListEntry {
102 public Adaptor adaptor;
103 public long uuid;
104 public long start;
105 public CommitListEntry(Adaptor a, long uuid, long start) {
106 adaptor = a;
107 this.uuid = uuid;
108 this.start = start;
109 }
110 }
111
112
113 static class BuffersRequestEntity implements RequestEntity {
114 List<DataOutputBuffer> buffers;
115 boolean compress;
116 CompressionCodec codec;
117
118 public BuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) {
119 buffers = buf;
120 this.compress = compress;
121 this.codec = codec;
122 }
123
124 private long getUncompressedContentLenght(){
125 long len = 4;
126 for (DataOutputBuffer b : buffers)
127 len += b.getLength();
128 return len;
129 }
130
131 public long getContentLength() {
132 if(compress) {
133 return -1;
134 }
135 else {
136 return getUncompressedContentLenght();
137 }
138 }
139
140 public String getContentType() {
141 return "application/octet-stream";
142 }
143
144 public boolean isRepeatable() {
145 return true;
146 }
147
148 private void doWriteRequest( DataOutputStream out ) throws IOException {
149 out.writeInt(buffers.size());
150 for (DataOutputBuffer b : buffers)
151 out.write(b.getData(), 0, b.getLength());
152 }
153
154 public void writeRequest(OutputStream out) throws IOException {
155 if(compress) {
156 CompressionOutputStream cos = codec.createOutputStream(out);
157 DataOutputStream dos = new DataOutputStream( cos);
158 doWriteRequest( dos);
159 cos.finish();
160 }
161 else {
162 DataOutputStream dos = new DataOutputStream( out);
163 doWriteRequest( dos);
164 }
165 }
166 }
167
168 public ChukwaHttpSender(Configuration c) {
169
170 ArrayList<String> tmp = new ArrayList<String>();
171 this.collectors = tmp.iterator();
172
173 MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
174 SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
175 WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
176 20 * 1000);
177 COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
178 COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
179 if( COMPRESS) {
180 CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
181 Class<?> codecClass = null;
182 try {
183 codecClass = Class.forName( CODEC_NAME);
184 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
185 log.info("codec " + CODEC_NAME + " loaded for network compression");
186 } catch (ClassNotFoundException e) {
187 log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
188 COMPRESS = false;
189 }
190 }
191 }
192
193
194
195
196
197
198 public void setCollectors(Iterator<String> collectors) {
199 this.collectors = collectors;
200
201
202 if (currCollector == null) {
203 if (collectors.hasNext()) {
204 currCollector = collectors.next();
205 } else
206 log.error("No collectors to try in send(), won't even try to do doPost()");
207 }
208 }
209
210
211
212
213
214
215
216 @Override
217 public List<CommitListEntry> send(List<Chunk> toSend)
218 throws InterruptedException, IOException {
219 List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
220 List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
221
222 int thisPost = postID++;
223 int toSendSize = toSend.size();
224 log.info("collected " + toSendSize + " chunks for post_"+thisPost);
225
226
227
228 for (Chunk c : toSend) {
229 DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
230 try {
231 c.write(b);
232 } catch (IOException err) {
233 log.error("serialization threw IOException", err);
234 }
235 serializedEvents.add(b);
236
237
238
239 log.info("chunk seqID:"+c.getSeqID());
240 commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
241 c.getSeqID() - c.getData().length));
242 }
243 toSend.clear();
244
245
246 RequestEntity postData = new BuffersRequestEntity(serializedEvents, COMPRESS, codec);
247
248 PostMethod method = new PostMethod();
249 method.setRequestEntity(postData);
250 StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
251 sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
252 if( COMPRESS) {
253 sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
254 .append( " of uncompressed data");
255 }
256 else {
257 sb.append( postData.getContentLength());
258 }
259 log.info( sb);
260
261 List<CommitListEntry> results = postAndParseResponse(method, commitResults);
262 log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
263 return results;
264 }
265
266
267
268
269
270
271
272
273
274 public List<CommitListEntry> postAndParseResponse(PostMethod method,
275 List<CommitListEntry> expectedCommitResults)
276 throws IOException, InterruptedException{
277 reliablySend(method, "chukwa");
278 return expectedCommitResults;
279 }
280
281
282
283
284
285
286
287
288 protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
289 int retries = SENDER_RETRIES;
290 while (currCollector != null) {
291
292 try {
293
294
295 List<String> responses = doRequest(method, currCollector+ pathSuffix);
296
297 retries = SENDER_RETRIES;
298
299 return responses;
300 } catch (Throwable e) {
301 log.error("Http post exception on "+ currCollector +": "+ e.toString());
302 log.debug("Http post exception on "+ currCollector, e);
303 ChukwaHttpSender.metrics.httpThrowable.inc();
304 if (collectors.hasNext()) {
305 ChukwaHttpSender.metrics.collectorRollover.inc();
306 boolean repeatPost = failedCollector(currCollector);
307 currCollector = collectors.next();
308 if(repeatPost)
309 log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
310 + currCollector);
311 else {
312 log.info("Using " + currCollector + " in the future, but not retrying this post");
313 break;
314 }
315 } else {
316 if (retries > 0) {
317 log.warn("No more collectors to try rolling over to; waiting "
318 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
319 + " retries left)");
320 Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
321 retries--;
322 } else {
323 log.error("No more collectors to try rolling over to; aborting post");
324 throw new IOException("no collectors");
325 }
326 }
327 } finally {
328
329 method.releaseConnection();
330 }
331 }
332 return new ArrayList<String>();
333 }
334
335
336
337
338
339
340 protected boolean failedCollector(String downCollector) {
341 log.debug("declaring "+ downCollector + " down");
342 return true;
343 }
344
345
346
347
348
349
350 protected List<String> doRequest(HttpMethodBase method, String dest)
351 throws IOException, HttpException {
352
353 HttpMethodParams pars = method.getParams();
354 pars.setParameter(HttpMethodParams.RETRY_HANDLER,
355 (Object) new HttpMethodRetryHandler() {
356 public boolean retryMethod(HttpMethod m, IOException e, int exec) {
357 return !(e instanceof java.net.ConnectException)
358 && (exec < MAX_RETRIES_PER_COLLECTOR);
359 }
360 });
361
362 pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT));
363
364 method.setParams(pars);
365 method.setPath(dest);
366
367
368 ChukwaHttpSender.metrics.httpPost.inc();
369
370 int statusCode = client.executeMethod(method);
371
372 if (statusCode != HttpStatus.SC_OK) {
373 ChukwaHttpSender.metrics.httpException.inc();
374
375 if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
376 ChukwaHttpSender.metrics.httpTimeOutException.inc();
377 }
378
379 log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
380
381 throw new HttpException("got back a failure from server");
382 }
383
384 log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
385 + method.getResponseContentLength());
386
387
388 InputStream rstream = null;
389
390
391 byte[] resp_buf = method.getResponseBody();
392 rstream = new ByteArrayInputStream(resp_buf);
393 BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8")));
394 String line;
395 List<String> resp = new ArrayList<String>();
396 while ((line = br.readLine()) != null) {
397 if (log.isDebugEnabled()) {
398 log.debug("response: " + line);
399 }
400 resp.add(line);
401 }
402 return resp;
403 }
404
405 @Override
406 public void stop() {
407 }
408 }