This project has retired. For details please refer to its Attic page.
ChukwaHttpSender xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.ByteArrayInputStream;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.io.OutputStream;
29  import java.nio.charset.Charset;
30  import java.util.ArrayList;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import org.apache.commons.httpclient.HttpClient;
35  import org.apache.commons.httpclient.HttpException;
36  import org.apache.commons.httpclient.HttpMethod;
37  import org.apache.commons.httpclient.HttpMethodBase;
38  import org.apache.commons.httpclient.HttpMethodRetryHandler;
39  import org.apache.commons.httpclient.HttpStatus;
40  import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
41  import org.apache.commons.httpclient.methods.PostMethod;
42  import org.apache.commons.httpclient.methods.RequestEntity;
43  import org.apache.commons.httpclient.params.HttpMethodParams;
44  import org.apache.hadoop.chukwa.Chunk;
45  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
46  import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.io.DataOutputBuffer;
49  import org.apache.hadoop.io.compress.CompressionCodec;
50  import org.apache.hadoop.io.compress.CompressionOutputStream;
51  import org.apache.hadoop.util.ReflectionUtils;
52  import org.apache.log4j.Logger;
53  
54  /**
55   * Encapsulates all of the http setup and connection details needed for chunks
56   * to be delivered to a collector.
57   * 
58   * This class should encapsulate the details of the low level data formatting.
59   * The Connector is responsible for picking what to send and to whom;
60   * retry policy is encoded in the collectors iterator.
61   * 
62   * This class is not thread safe. Synchronization is the caller's responsibility.
63   * 
64   * <p>
65   * On error, tries the list of available collectors, pauses for a minute, and
66   * then repeats.
67   * </p>
68   * <p>
69   * Will wait forever for collectors to come up.
70   * </p>
71   */
72  public class ChukwaHttpSender implements ChukwaSender {
73    final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
74    final int SENDER_RETRIES;
75    final int WAIT_FOR_COLLECTOR_REBOOT;
76    final int COLLECTOR_TIMEOUT;
77    
78    public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
79    // FIXME: this should really correspond to the timer in RetryListOfCollectors
80  
81    static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
82    
83    static Logger log = Logger.getLogger(ChukwaHttpSender.class);
84    static HttpClient client = null;
85    static MultiThreadedHttpConnectionManager connectionManager = null;
86    String currCollector = null;
87    int postID = 0;
88  
89    protected Iterator<String> collectors;
90    
91    boolean COMPRESS;
92    String CODEC_NAME;
93    CompressionCodec codec;
94  
95    static {
96      connectionManager = new MultiThreadedHttpConnectionManager();
97      client = new HttpClient(connectionManager);
98      connectionManager.closeIdleConnections(1000);
99    }
100 
101   public static class CommitListEntry {
102     public Adaptor adaptor;
103     public long uuid;
104     public long start; //how many bytes of stream
105     public CommitListEntry(Adaptor a, long uuid, long start) {
106       adaptor = a;
107       this.uuid = uuid;
108       this.start = start;
109     }
110   }
111 
112   // FIXME: probably we're better off with an EventListRequestEntity
113   static class BuffersRequestEntity implements RequestEntity {
114     List<DataOutputBuffer> buffers;
115     boolean compress;
116     CompressionCodec codec;
117 
118     public BuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) {
119       buffers = buf;
120       this.compress = compress;
121       this.codec = codec;
122     }
123     
124     private long getUncompressedContentLenght(){
125         long len = 4;// first we send post length, then buffers
126         for (DataOutputBuffer b : buffers)
127           len += b.getLength();
128         return len;
129     }
130 
131     public long getContentLength() {
132     	if(compress) {
133     		return -1;
134     	}
135     	else {
136     		return getUncompressedContentLenght();
137     	}
138     }
139 
140     public String getContentType() {
141       return "application/octet-stream";
142     }
143 
144     public boolean isRepeatable() {
145       return true;
146     }
147 
148     private void doWriteRequest( DataOutputStream out ) throws IOException {
149         out.writeInt(buffers.size());
150         for (DataOutputBuffer b : buffers)
151       	 out.write(b.getData(), 0, b.getLength());
152     }
153     
154     public void writeRequest(OutputStream out) throws IOException {
155       if(compress) {
156           CompressionOutputStream cos = codec.createOutputStream(out);
157           DataOutputStream dos = new DataOutputStream( cos);
158           doWriteRequest( dos);
159           cos.finish();
160       }
161       else {
162           DataOutputStream dos = new DataOutputStream( out);
163           doWriteRequest( dos);
164       }
165     }
166   }
167 
168   public ChukwaHttpSender(Configuration c) {
169     // setup default collector
170     ArrayList<String> tmp = new ArrayList<String>();
171     this.collectors = tmp.iterator();
172 
173     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
174     SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
175     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
176         20 * 1000);
177     COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
178     COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
179     if( COMPRESS) {
180 	    CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
181 	    Class<?> codecClass = null;
182 	    try {
183 			codecClass = Class.forName( CODEC_NAME);
184 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
185 			log.info("codec " + CODEC_NAME + " loaded for network compression");
186 		} catch (ClassNotFoundException e) {
187 			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
188 			COMPRESS = false;
189 		}
190     }
191   }
192 
193   /**
194    * Set up a list of connectors for this client to send {@link Chunk}s to
195    * 
196    * @param collectors is a list of collectors
197    */
198   public void setCollectors(Iterator<String> collectors) {
199     this.collectors = collectors;
200     // setup a new destination from our list of collectors if one hasn't been
201     // set up
202     if (currCollector == null) {
203       if (collectors.hasNext()) {
204         currCollector = collectors.next();
205       } else
206         log.error("No collectors to try in send(), won't even try to do doPost()");
207     }
208   }
209 
210   /**
211    * grab all of the chunks currently in the chunkQueue, stores a copy of them
212    * locally, calculates their size, sets them up
213    * 
214    * @return array of chunk id's which were ACKed by collector
215    */
216   @Override
217   public List<CommitListEntry> send(List<Chunk> toSend)
218       throws InterruptedException, IOException {
219     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
220     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
221 
222     int thisPost = postID++;
223     int toSendSize = toSend.size();
224     log.info("collected " + toSendSize + " chunks for post_"+thisPost);
225 
226     // Serialize each chunk in turn into it's own DataOutputBuffer and add that
227     // buffer to serializedEvents
228     for (Chunk c : toSend) {
229       DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
230       try {
231         c.write(b);
232       } catch (IOException err) {
233         log.error("serialization threw IOException", err);
234       }
235       serializedEvents.add(b);
236       // store a CLE for this chunk which we will use to ack this chunk to the
237       // caller of send()
238       // (e.g. the agent will use the list of CLE's for checkpointing)
239       log.info("chunk seqID:"+c.getSeqID());
240       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
241          c.getSeqID() - c.getData().length));
242     }
243     toSend.clear();
244 
245     // collect all serialized chunks into a single buffer to send
246     RequestEntity postData = new BuffersRequestEntity(serializedEvents, COMPRESS, codec);
247 
248     PostMethod method = new PostMethod();
249     method.setRequestEntity(postData);
250     StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
251     sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
252 	if( COMPRESS) {
253 		sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
254 			.append( " of uncompressed data");
255 	}
256 	else {
257 		sb.append( postData.getContentLength());
258 	}
259 	log.info( sb);
260 
261     List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
262     log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
263     return results;
264   }
265   
266   /**
267    * 
268    * @param method the data to push
269    * @param expectedCommitResults the list
270    * @return the list of committed chunks
271    * @throws IOException if error writing
272    * @throws InterruptedException if shutdown has been initiated
273    */
274   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
275         List<CommitListEntry> expectedCommitResults)
276   throws IOException, InterruptedException{
277     reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
278     return expectedCommitResults;
279   }
280 
281   /**
282    *  Responsible for executing the supplied method on at least one collector
283    * @param method is HTTP method
284    * @return the list of commited status
285    * @throws InterruptedException if shutdown has been initiated
286    * @throws IOException if no collector responds with an OK
287    */
288   protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
289     int retries = SENDER_RETRIES;
290     while (currCollector != null) {
291       // need to pick a destination here
292       try {
293 
294         // send it across the network    
295         List<String> responses = doRequest(method, currCollector+ pathSuffix);
296 
297         retries = SENDER_RETRIES; // reset count on success
298 
299         return responses;
300       } catch (Throwable e) {
301         log.error("Http post exception on "+ currCollector +": "+ e.toString());
302         log.debug("Http post exception on "+ currCollector, e);
303         ChukwaHttpSender.metrics.httpThrowable.inc();
304         if (collectors.hasNext()) {
305           ChukwaHttpSender.metrics.collectorRollover.inc();
306           boolean repeatPost = failedCollector(currCollector);
307           currCollector = collectors.next();
308           if(repeatPost)
309             log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
310                 + currCollector);
311           else {
312             log.info("Using " + currCollector + " in the future, but not retrying this post");
313             break;
314           }
315         } else {
316           if (retries > 0) {
317             log.warn("No more collectors to try rolling over to; waiting "
318                 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
319                 + " retries left)");
320             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
321             retries--;
322           } else {
323             log.error("No more collectors to try rolling over to; aborting post");
324             throw new IOException("no collectors");
325           }
326         }
327       } finally {
328         // be sure the connection is released back to the connection manager
329         method.releaseConnection();
330       }
331     } // end retry loop
332     return new ArrayList<String>();
333   }
334 
335   /**
336    * A hook for taking action when a collector is declared failed.
337    * Returns whether to retry current post, or junk it
338    * @param downCollector
339    */
340   protected boolean failedCollector(String downCollector) {
341     log.debug("declaring "+ downCollector + " down");
342     return true;
343   }
344 
345   /**
346    * Responsible for performing a single operation to a specified collector URL.
347    * 
348    * @param dest the URL being requested. (Including hostname)
349    */
350   protected List<String> doRequest(HttpMethodBase method, String dest)
351       throws IOException, HttpException {
352 
353     HttpMethodParams pars = method.getParams();
354     pars.setParameter(HttpMethodParams.RETRY_HANDLER,
355         (Object) new HttpMethodRetryHandler() {
356           public boolean retryMethod(HttpMethod m, IOException e, int exec) {
357             return !(e instanceof java.net.ConnectException)
358                 && (exec < MAX_RETRIES_PER_COLLECTOR);
359           }
360         });
361 
362     pars.setParameter(HttpMethodParams.SO_TIMEOUT, Integer.valueOf(COLLECTOR_TIMEOUT));
363 
364     method.setParams(pars);
365     method.setPath(dest);
366 
367     // Send POST request
368     ChukwaHttpSender.metrics.httpPost.inc();
369     
370     int statusCode = client.executeMethod(method);
371 
372     if (statusCode != HttpStatus.SC_OK) {
373       ChukwaHttpSender.metrics.httpException.inc();
374       
375       if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
376         ChukwaHttpSender.metrics.httpTimeOutException.inc();
377       }
378       
379       log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
380       // do something aggressive here
381       throw new HttpException("got back a failure from server");
382     }
383     // implicitly "else"
384     log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
385             + method.getResponseContentLength());
386 
387     // FIXME: should parse acks here
388     InputStream rstream = null;
389 
390     // Get the response body
391     byte[] resp_buf = method.getResponseBody();
392     rstream = new ByteArrayInputStream(resp_buf);
393     BufferedReader br = new BufferedReader(new InputStreamReader(rstream, Charset.forName("UTF-8")));
394     String line;
395     List<String> resp = new ArrayList<String>();
396     while ((line = br.readLine()) != null) {
397       if (log.isDebugEnabled()) {
398         log.debug("response: " + line);
399       }
400       resp.add(line);
401     }
402     return resp;
403   }
404 
405   @Override
406   public void stop() {
407   }
408 }