1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.sender;
202122import java.io.BufferedReader;
23import java.io.ByteArrayInputStream;
24import java.io.DataOutputStream;
25import java.io.IOException;
26import java.io.InputStream;
27import java.io.InputStreamReader;
28import java.io.OutputStream;
29import java.nio.charset.Charset;
30import java.util.ArrayList;
31import java.util.Iterator;
32import java.util.List;
3334import org.apache.commons.httpclient.HttpClient;
35import org.apache.commons.httpclient.HttpException;
36import org.apache.commons.httpclient.HttpMethod;
37import org.apache.commons.httpclient.HttpMethodBase;
38import org.apache.commons.httpclient.HttpMethodRetryHandler;
39import org.apache.commons.httpclient.HttpStatus;
40import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
41import org.apache.commons.httpclient.methods.PostMethod;
42import org.apache.commons.httpclient.methods.RequestEntity;
43import org.apache.commons.httpclient.params.HttpMethodParams;
44import org.apache.hadoop.chukwa.Chunk;
45import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
46import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
47import org.apache.hadoop.conf.Configuration;
48import org.apache.hadoop.io.DataOutputBuffer;
49import org.apache.hadoop.io.compress.CompressionCodec;
50import org.apache.hadoop.io.compress.CompressionOutputStream;
51import org.apache.hadoop.util.ReflectionUtils;
52import org.apache.log4j.Logger;
5354/**55 * Encapsulates all of the http setup and connection details needed for chunks56 * to be delivered to a collector.57 * 58 * This class should encapsulate the details of the low level data formatting.59 * The Connector is responsible for picking what to send and to whom;60 * retry policy is encoded in the collectors iterator.61 * 62 * This class is not thread safe. Synchronization is the caller's responsibility.63 * 64 * <p>65 * On error, tries the list of available collectors, pauses for a minute, and66 * then repeats.67 * </p>68 * <p>69 * Will wait forever for collectors to come up.70 * </p>71 */72publicclassChukwaHttpSenderimplementsChukwaSender {
73finalint MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client74finalint SENDER_RETRIES;
75finalint WAIT_FOR_COLLECTOR_REBOOT;
76finalint COLLECTOR_TIMEOUT;
7778publicstaticfinal String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
79// FIXME: this should really correspond to the timer in RetryListOfCollectors8081staticfinalHttpSenderMetrics metrics = newHttpSenderMetrics("chukwaAgent", "httpSender");
8283static Logger log = Logger.getLogger(ChukwaHttpSender.class);
84static HttpClient client = null;
85static MultiThreadedHttpConnectionManager connectionManager = null;
86 String currCollector = null;
87int postID = 0;
8889protected Iterator<String> collectors;
9091boolean COMPRESS;
92 String CODEC_NAME;
93 CompressionCodec codec;
9495static {
96 connectionManager = new MultiThreadedHttpConnectionManager();
97 client = new HttpClient(connectionManager);
98 connectionManager.closeIdleConnections(1000);
99 }
100101publicstaticclassCommitListEntry {
102publicAdaptor adaptor;
103publiclong uuid;
104publiclong start; //how many bytes of stream105publicCommitListEntry(Adaptor a, long uuid, long start) {
106 adaptor = a;
107this.uuid = uuid;
108this.start = start;
109 }
110 }
111112// FIXME: probably we're better off with an EventListRequestEntity113staticclassBuffersRequestEntityimplements RequestEntity {
114 List<DataOutputBuffer> buffers;
115boolean compress;
116 CompressionCodec codec;
117118publicBuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) {
119 buffers = buf;
120this.compress = compress;
121this.codec = codec;
122 }
123124privatelong getUncompressedContentLenght(){
125long len = 4;// first we send post length, then buffers126for (DataOutputBuffer b : buffers)
127 len += b.getLength();
128return len;
129 }
130131publiclong getContentLength() {
132if(compress) {
133return -1;
134 }
135else {
136return getUncompressedContentLenght();
137 }
138 }
139140public String getContentType() {
141return"application/octet-stream";
142 }
143144publicboolean isRepeatable() {
145returntrue;
146 }
147148privatevoid doWriteRequest( DataOutputStream out ) throws IOException {
149 out.writeInt(buffers.size());
150for (DataOutputBuffer b : buffers)
151 out.write(b.getData(), 0, b.getLength());
152 }
153154publicvoid writeRequest(OutputStream out) throws IOException {
155if(compress) {
156 CompressionOutputStream cos = codec.createOutputStream(out);
157 DataOutputStream dos = new DataOutputStream( cos);
158 doWriteRequest( dos);
159 cos.finish();
160 }
161else {
162 DataOutputStream dos = new DataOutputStream( out);
163 doWriteRequest( dos);
164 }
165 }
166 }
167168publicChukwaHttpSender(Configuration c) {
169// setup default collector170 ArrayList<String> tmp = new ArrayList<String>();
171this.collectors = tmp.iterator();
172173 MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
174 SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
175 WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
176 20 * 1000);
177 COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
178 COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
179if( COMPRESS) {
180 CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
181 Class<?> codecClass = null;
182try {
183 codecClass = Class.forName( CODEC_NAME);
184 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
185 log.info("codec " + CODEC_NAME + " loaded for network compression");
186 } catch (ClassNotFoundException e) {
187 log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
188 COMPRESS = false;
189 }
190 }
191 }
192193/**194 * Set up a list of connectors for this client to send {@link Chunk}s to195 * 196 * @param collectors is a list of collectors197 */198publicvoid setCollectors(Iterator<String> collectors) {
199this.collectors = collectors;
200// setup a new destination from our list of collectors if one hasn't been201// set up202if (currCollector == null) {
203if (collectors.hasNext()) {
204 currCollector = collectors.next();
205 } else206 log.error("No collectors to try in send(), won't even try to do doPost()");
207 }
208 }
209210/**211 * grab all of the chunks currently in the chunkQueue, stores a copy of them212 * locally, calculates their size, sets them up213 * 214 * @return array of chunk id's which were ACKed by collector215 */216 @Override
217public List<CommitListEntry> send(List<Chunk> toSend)
218throws InterruptedException, IOException {
219 List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
220 List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
221222int thisPost = postID++;
223int toSendSize = toSend.size();
224 log.info("collected " + toSendSize + " chunks for post_"+thisPost);
225226// Serialize each chunk in turn into it's own DataOutputBuffer and add that227// buffer to serializedEvents228for (Chunk c : toSend) {
229 DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
230try {
231 c.write(b);
232 } catch (IOException err) {
233 log.error("serialization threw IOException", err);
234 }
235 serializedEvents.add(b);
236// store a CLE for this chunk which we will use to ack this chunk to the237// caller of send()238// (e.g. the agent will use the list of CLE's for checkpointing)239 log.info("chunk seqID:"+c.getSeqID());
240 commitResults.add(newCommitListEntry(c.getInitiator(), c.getSeqID(),
241 c.getSeqID() - c.getData().length));
242 }
243 toSend.clear();
244245// collect all serialized chunks into a single buffer to send246 RequestEntity postData = newBuffersRequestEntity(serializedEvents, COMPRESS, codec);
247248 PostMethod method = new PostMethod();
249 method.setRequestEntity(postData);
250 StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
251 sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
252if( COMPRESS) {
253 sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
254 .append( " of uncompressed data");
255 }
256else {
257 sb.append( postData.getContentLength());
258 }
259 log.info( sb);
260261 List<CommitListEntry> results = postAndParseResponse(method, commitResults);
262 log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
263return results;
264 }
265266/**267 * 268 * @param method the data to push269 * @param expectedCommitResults the list270 * @return the list of committed chunks271 * @throws IOException if error writing272 * @throws InterruptedException if shutdown has been initiated273 */274public List<CommitListEntry> postAndParseResponse(PostMethod method,
275 List<CommitListEntry> expectedCommitResults)
276throws IOException, InterruptedException{
277 reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here278return expectedCommitResults;
279 }
280281/**282 * Responsible for executing the supplied method on at least one collector283 * @param method is HTTP method284 * @return the list of commited status285 * @throws InterruptedException if shutdown has been initiated286 * @throws IOException if no collector responds with an OK287 */288protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
289int retries = SENDER_RETRIES;
290while (currCollector != null) {
291// need to pick a destination here292try {
293294// send it across the network 295 List<String> responses = doRequest(method, currCollector+ pathSuffix);
296297 retries = SENDER_RETRIES; // reset count on success298299return responses;
300 } catch (Throwable e) {
301 log.error("Http post exception on "+ currCollector +": "+ e.toString());
302 log.debug("Http post exception on "+ currCollector, e);
303 ChukwaHttpSender.metrics.httpThrowable.inc();
304if (collectors.hasNext()) {
305 ChukwaHttpSender.metrics.collectorRollover.inc();
306boolean repeatPost = failedCollector(currCollector);
307 currCollector = collectors.next();
308if(repeatPost)
309 log.info("Found a new collector to roll over to, retrying HTTP Post to collector "310 + currCollector);
311else {
312 log.info("Using " + currCollector + " in the future, but not retrying this post");
313break;
314 }
315 } else {
316if (retries > 0) {
317 log.warn("No more collectors to try rolling over to; waiting "318 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
319 + " retries left)");
320 Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
321 retries--;
322 } else {
323 log.error("No more collectors to try rolling over to; aborting post");
324thrownew IOException("no collectors");
325 }
326 }
327 } finally {
328// be sure the connection is released back to the connection manager329 method.releaseConnection();
330 }
331 } // end retry loop332returnnew ArrayList<String>();
333 }
334335/**336 * A hook for taking action when a collector is declared failed.337 * Returns whether to retry current post, or junk it338 * @param downCollector339 */340protectedboolean failedCollector(String downCollector) {
341 log.debug("declaring "+ downCollector + " down");
342returntrue;
343 }
344345/**346 * Responsible for performing a single operation to a specified collector URL.347 * 348 * @param dest the URL being requested. (Including hostname)349 */350protected List<String> doRequest(HttpMethodBase method, String dest)
351throws IOException, HttpException {
352353 HttpMethodParams pars = method.getParams();
354 pars.setParameter(HttpMethodParams.RETRY_HANDLER,
355 (Object) new HttpMethodRetryHandler() {
356publicboolean retryMethod(HttpMethod m, IOException e, int exec) {
357return !(e instanceof java.net.ConnectException)
358 && (exec < MAX_RETRIES_PER_COLLECTOR);
359 }
360 });
361362 pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT));
363364 method.setParams(pars);
365 method.setPath(dest);
366367// Send POST request368 ChukwaHttpSender.metrics.httpPost.inc();
369370int statusCode = client.executeMethod(method);
371372if (statusCode != HttpStatus.SC_OK) {
373 ChukwaHttpSender.metrics.httpException.inc();
374375if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
376 ChukwaHttpSender.metrics.httpTimeOutException.inc();
377 }
378379 log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
380// do something aggressive here381thrownew HttpException("got back a failure from server");
382 }
383// implicitly "else"384 log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "385 + method.getResponseContentLength());
386387// FIXME: should parse acks here388 InputStream rstream = null;
389390// Get the response body391 byte[] resp_buf = method.getResponseBody();
392 rstream = new ByteArrayInputStream(resp_buf);
393 BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8")));
394 String line;
395 List<String> resp = new ArrayList<String>();
396while ((line = br.readLine()) != null) {
397if (log.isDebugEnabled()) {
398 log.debug("response: " + line);
399 }
400 resp.add(line);
401 }
402return resp;
403 }
404405 @Override
406publicvoid stop() {
407 }
408 }