This project has retired. For details please refer to its Attic page.
ChukwaHttpSender xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.ByteArrayInputStream;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.io.OutputStream;
29  import java.util.ArrayList;
30  import java.util.Iterator;
31  import java.util.List;
32  
33  import org.apache.commons.httpclient.HttpClient;
34  import org.apache.commons.httpclient.HttpException;
35  import org.apache.commons.httpclient.HttpMethod;
36  import org.apache.commons.httpclient.HttpMethodBase;
37  import org.apache.commons.httpclient.HttpMethodRetryHandler;
38  import org.apache.commons.httpclient.HttpStatus;
39  import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
40  import org.apache.commons.httpclient.methods.PostMethod;
41  import org.apache.commons.httpclient.methods.RequestEntity;
42  import org.apache.commons.httpclient.params.HttpMethodParams;
43  import org.apache.hadoop.chukwa.Chunk;
44  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
45  import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.io.DataOutputBuffer;
48  import org.apache.hadoop.io.compress.CompressionCodec;
49  import org.apache.hadoop.io.compress.CompressionOutputStream;
50  import org.apache.hadoop.util.ReflectionUtils;
51  import org.apache.log4j.Logger;
52  
53  /**
54   * Encapsulates all of the http setup and connection details needed for chunks
55   * to be delivered to a collector.
56   * 
57   * This class should encapsulate the details of the low level data formatting.
58   * The Connector is responsible for picking what to send and to whom;
59   * retry policy is encoded in the collectors iterator.
60   * 
61   * This class is not thread safe. Synchronization is the caller's responsibility.
62   * 
63   * <p>
64   * On error, tries the list of available collectors, pauses for a minute, and
65   * then repeats.
66   * </p>
67   * <p>
68   * Will wait forever for collectors to come up.
69   * </p>
70   */
71  public class ChukwaHttpSender implements ChukwaSender {
72    final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
73    final int SENDER_RETRIES;
74    final int WAIT_FOR_COLLECTOR_REBOOT;
75    final int COLLECTOR_TIMEOUT;
76    
77    public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
78    // FIXME: this should really correspond to the timer in RetryListOfCollectors
79  
80    static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
81    
82    static Logger log = Logger.getLogger(ChukwaHttpSender.class);
83    static HttpClient client = null;
84    static MultiThreadedHttpConnectionManager connectionManager = null;
85    String currCollector = null;
86    int postID = 0;
87  
88    protected Iterator<String> collectors;
89    
90    static boolean COMPRESS;
91    static String CODEC_NAME;
92    static CompressionCodec codec;
93  
94    static {
95      connectionManager = new MultiThreadedHttpConnectionManager();
96      client = new HttpClient(connectionManager);
97      connectionManager.closeIdleConnections(1000);
98    }
99  
100   public static class CommitListEntry {
101     public Adaptor adaptor;
102     public long uuid;
103     public long start; //how many bytes of stream
104     public CommitListEntry(Adaptor a, long uuid, long start) {
105       adaptor = a;
106       this.uuid = uuid;
107       this.start = start;
108     }
109   }
110 
111   // FIXME: probably we're better off with an EventListRequestEntity
112   static class BuffersRequestEntity implements RequestEntity {
113     List<DataOutputBuffer> buffers;
114 
115     public BuffersRequestEntity(List<DataOutputBuffer> buf) {
116       buffers = buf;
117     }
118     
119     private long getUncompressedContentLenght(){
120         long len = 4;// first we send post length, then buffers
121         for (DataOutputBuffer b : buffers)
122           len += b.getLength();
123         return len;
124     }
125 
126     public long getContentLength() {
127     	if( COMPRESS) {
128     		return -1;
129     	}
130     	else {
131     		return getUncompressedContentLenght();
132     	}
133     }
134 
135     public String getContentType() {
136       return "application/octet-stream";
137     }
138 
139     public boolean isRepeatable() {
140       return true;
141     }
142 
143     private void doWriteRequest( DataOutputStream out ) throws IOException {
144         out.writeInt(buffers.size());
145         for (DataOutputBuffer b : buffers)
146       	 out.write(b.getData(), 0, b.getLength());
147     }
148     
149     public void writeRequest(OutputStream out) throws IOException {
150       if( COMPRESS) {
151           CompressionOutputStream cos = codec.createOutputStream(out);
152           DataOutputStream dos = new DataOutputStream( cos);
153           doWriteRequest( dos);
154           cos.finish();
155       }
156       else {
157           DataOutputStream dos = new DataOutputStream( out);
158           doWriteRequest( dos);
159       }
160     }
161   }
162 
163   public ChukwaHttpSender(Configuration c) {
164     // setup default collector
165     ArrayList<String> tmp = new ArrayList<String>();
166     this.collectors = tmp.iterator();
167 
168     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
169     SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
170     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
171         20 * 1000);
172     COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
173     COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
174     if( COMPRESS) {
175 	    CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
176 	    Class<?> codecClass = null;
177 	    try {
178 			codecClass = Class.forName( CODEC_NAME);
179 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
180 			log.info("codec " + CODEC_NAME + " loaded for network compression");
181 		} catch (ClassNotFoundException e) {
182 			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
183 			COMPRESS = false;
184 		}
185     }
186   }
187 
188   /**
189    * Set up a list of connectors for this client to send {@link Chunk}s to
190    * 
191    * @param collectors
192    */
193   public void setCollectors(Iterator<String> collectors) {
194     this.collectors = collectors;
195     // setup a new destination from our list of collectors if one hasn't been
196     // set up
197     if (currCollector == null) {
198       if (collectors.hasNext()) {
199         currCollector = collectors.next();
200       } else
201         log.error("No collectors to try in send(), won't even try to do doPost()");
202     }
203   }
204 
205   /**
206    * grab all of the chunks currently in the chunkQueue, stores a copy of them
207    * locally, calculates their size, sets them up
208    * 
209    * @return array of chunk id's which were ACKed by collector
210    */
211   @Override
212   public List<CommitListEntry> send(List<Chunk> toSend)
213       throws InterruptedException, IOException {
214     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
215     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
216 
217     int thisPost = postID++;
218     int toSendSize = toSend.size();
219     log.info("collected " + toSendSize + " chunks for post_"+thisPost);
220 
221     // Serialize each chunk in turn into it's own DataOutputBuffer and add that
222     // buffer to serializedEvents
223     for (Chunk c : toSend) {
224       DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
225       try {
226         c.write(b);
227       } catch (IOException err) {
228         log.error("serialization threw IOException", err);
229       }
230       serializedEvents.add(b);
231       // store a CLE for this chunk which we will use to ack this chunk to the
232       // caller of send()
233       // (e.g. the agent will use the list of CLE's for checkpointing)
234       log.info("chunk seqID:"+c.getSeqID());
235       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
236          c.getSeqID() - c.getData().length));
237     }
238     toSend.clear();
239 
240     // collect all serialized chunks into a single buffer to send
241     RequestEntity postData = new BuffersRequestEntity(serializedEvents);
242 
243     PostMethod method = new PostMethod();
244     method.setRequestEntity(postData);
245     StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
246     sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
247 	if( COMPRESS) {
248 		sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
249 			.append( " of uncompressed data");
250 	}
251 	else {
252 		sb.append( postData.getContentLength());
253 	}
254 	log.info( sb);
255 
256     List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
257     log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
258     return results;
259   }
260   
261   /**
262    * 
263    * @param method the data to push
264    * @param expectedCommitResults the list
265    * @return the list of committed chunks
266    * @throws IOException
267    * @throws InterruptedException
268    */
269   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
270         List<CommitListEntry> expectedCommitResults)
271   throws IOException, InterruptedException{
272     reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
273     return expectedCommitResults;
274   }
275 
276   /**
277    *  Responsible for executing the supplied method on at least one collector
278    * @param method
279    * @return
280    * @throws InterruptedException
281    * @throws IOException if no collector responds with an OK
282    */
283   protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
284     int retries = SENDER_RETRIES;
285     while (currCollector != null) {
286       // need to pick a destination here
287       try {
288 
289         // send it across the network    
290         List<String> responses = doRequest(method, currCollector+ pathSuffix);
291 
292         retries = SENDER_RETRIES; // reset count on success
293 
294         return responses;
295       } catch (Throwable e) {
296         log.error("Http post exception on "+ currCollector +": "+ e.toString());
297         log.debug("Http post exception on "+ currCollector, e);
298         ChukwaHttpSender.metrics.httpThrowable.inc();
299         if (collectors.hasNext()) {
300           ChukwaHttpSender.metrics.collectorRollover.inc();
301           boolean repeatPost = failedCollector(currCollector);
302           currCollector = collectors.next();
303           if(repeatPost)
304             log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
305                 + currCollector);
306           else {
307             log.info("Using " + currCollector + " in the future, but not retrying this post");
308             break;
309           }
310         } else {
311           if (retries > 0) {
312             log.warn("No more collectors to try rolling over to; waiting "
313                 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
314                 + " retries left)");
315             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
316             retries--;
317           } else {
318             log.error("No more collectors to try rolling over to; aborting post");
319             throw new IOException("no collectors");
320           }
321         }
322       } finally {
323         // be sure the connection is released back to the connection manager
324         method.releaseConnection();
325       }
326     } // end retry loop
327     return new ArrayList<String>();
328   }
329 
330   /**
331    * A hook for taking action when a collector is declared failed.
332    * Returns whether to retry current post, or junk it
333    * @param downCollector
334    */
335   protected boolean failedCollector(String downCollector) {
336     log.debug("declaring "+ downCollector + " down");
337     return true;
338   }
339 
340   /**
341    * Responsible for performing a single operation to a specified collector URL.
342    * 
343    * @param dest the URL being requested. (Including hostname)
344    */
345   protected List<String> doRequest(HttpMethodBase method, String dest)
346       throws IOException, HttpException {
347 
348     HttpMethodParams pars = method.getParams();
349     pars.setParameter(HttpMethodParams.RETRY_HANDLER,
350         (Object) new HttpMethodRetryHandler() {
351           public boolean retryMethod(HttpMethod m, IOException e, int exec) {
352             return !(e instanceof java.net.ConnectException)
353                 && (exec < MAX_RETRIES_PER_COLLECTOR);
354           }
355         });
356 
357     pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
358 
359     method.setParams(pars);
360     method.setPath(dest);
361 
362     // Send POST request
363     ChukwaHttpSender.metrics.httpPost.inc();
364     
365     int statusCode = client.executeMethod(method);
366 
367     if (statusCode != HttpStatus.SC_OK) {
368       ChukwaHttpSender.metrics.httpException.inc();
369       
370       if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
371         ChukwaHttpSender.metrics.httpTimeOutException.inc();
372       }
373       
374       log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
375       // do something aggressive here
376       throw new HttpException("got back a failure from server");
377     }
378     // implicitly "else"
379     log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
380             + method.getResponseContentLength());
381 
382     // FIXME: should parse acks here
383     InputStream rstream = null;
384 
385     // Get the response body
386     byte[] resp_buf = method.getResponseBody();
387     rstream = new ByteArrayInputStream(resp_buf);
388     BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
389     String line;
390     List<String> resp = new ArrayList<String>();
391     while ((line = br.readLine()) != null) {
392       if (log.isDebugEnabled()) {
393         log.debug("response: " + line);
394       }
395       resp.add(line);
396     }
397     return resp;
398   }
399 
400   @Override
401   public void stop() {
402   }
403 }