This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.ByteArrayInputStream;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.io.OutputStream;
29  import java.util.ArrayList;
30  import java.util.Iterator;
31  import java.util.List;
32  
33  import org.apache.commons.httpclient.HttpClient;
34  import org.apache.commons.httpclient.HttpException;
35  import org.apache.commons.httpclient.HttpMethod;
36  import org.apache.commons.httpclient.HttpMethodBase;
37  import org.apache.commons.httpclient.HttpMethodRetryHandler;
38  import org.apache.commons.httpclient.HttpStatus;
39  import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
40  import org.apache.commons.httpclient.methods.PostMethod;
41  import org.apache.commons.httpclient.methods.RequestEntity;
42  import org.apache.commons.httpclient.params.HttpMethodParams;
43  import org.apache.hadoop.chukwa.Chunk;
44  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
45  import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.io.DataOutputBuffer;
48  import org.apache.hadoop.io.compress.CompressionCodec;
49  import org.apache.hadoop.io.compress.CompressionOutputStream;
50  import org.apache.hadoop.util.ReflectionUtils;
51  import org.apache.log4j.Logger;
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  public class ChukwaHttpSender implements ChukwaSender {
72    final int MAX_RETRIES_PER_COLLECTOR; 
73    final int SENDER_RETRIES;
74    final int WAIT_FOR_COLLECTOR_REBOOT;
75    final int COLLECTOR_TIMEOUT;
76    
77    public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
78    
79  
80    static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
81    
82    static Logger log = Logger.getLogger(ChukwaHttpSender.class);
83    static HttpClient client = null;
84    static MultiThreadedHttpConnectionManager connectionManager = null;
85    String currCollector = null;
86    int postID = 0;
87  
88    protected Iterator<String> collectors;
89    
90    static boolean COMPRESS;
91    static String CODEC_NAME;
92    static CompressionCodec codec;
93  
94    static {
95      connectionManager = new MultiThreadedHttpConnectionManager();
96      client = new HttpClient(connectionManager);
97      connectionManager.closeIdleConnections(1000);
98    }
99  
100   public static class CommitListEntry {
101     public Adaptor adaptor;
102     public long uuid;
103     public long start; 
104     public CommitListEntry(Adaptor a, long uuid, long start) {
105       adaptor = a;
106       this.uuid = uuid;
107       this.start = start;
108     }
109   }
110 
111   
112   static class BuffersRequestEntity implements RequestEntity {
113     List<DataOutputBuffer> buffers;
114 
115     public BuffersRequestEntity(List<DataOutputBuffer> buf) {
116       buffers = buf;
117     }
118     
119     private long getUncompressedContentLenght(){
120         long len = 4;
121         for (DataOutputBuffer b : buffers)
122           len += b.getLength();
123         return len;
124     }
125 
126     public long getContentLength() {
127     	if( COMPRESS) {
128     		return -1;
129     	}
130     	else {
131     		return getUncompressedContentLenght();
132     	}
133     }
134 
135     public String getContentType() {
136       return "application/octet-stream";
137     }
138 
139     public boolean isRepeatable() {
140       return true;
141     }
142 
143     private void doWriteRequest( DataOutputStream out ) throws IOException {
144         out.writeInt(buffers.size());
145         for (DataOutputBuffer b : buffers)
146       	 out.write(b.getData(), 0, b.getLength());
147     }
148     
149     public void writeRequest(OutputStream out) throws IOException {
150       if( COMPRESS) {
151           CompressionOutputStream cos = codec.createOutputStream(out);
152           DataOutputStream dos = new DataOutputStream( cos);
153           doWriteRequest( dos);
154           cos.finish();
155       }
156       else {
157           DataOutputStream dos = new DataOutputStream( out);
158           doWriteRequest( dos);
159       }
160     }
161   }
162 
163   public ChukwaHttpSender(Configuration c) {
164     
165     ArrayList<String> tmp = new ArrayList<String>();
166     this.collectors = tmp.iterator();
167 
168     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
169     SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
170     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
171         20 * 1000);
172     COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
173     COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
174     if( COMPRESS) {
175 	    CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
176 	    Class<?> codecClass = null;
177 	    try {
178 			codecClass = Class.forName( CODEC_NAME);
179 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
180 			log.info("codec " + CODEC_NAME + " loaded for network compression");
181 		} catch (ClassNotFoundException e) {
182 			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
183 			COMPRESS = false;
184 		}
185     }
186   }
187 
188   
189 
190 
191 
192 
193   public void setCollectors(Iterator<String> collectors) {
194     this.collectors = collectors;
195     
196     
197     if (currCollector == null) {
198       if (collectors.hasNext()) {
199         currCollector = collectors.next();
200       } else
201         log.error("No collectors to try in send(), won't even try to do doPost()");
202     }
203   }
204 
205   
206 
207 
208 
209 
210 
211   @Override
212   public List<CommitListEntry> send(List<Chunk> toSend)
213       throws InterruptedException, IOException {
214     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
215     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
216 
217     int thisPost = postID++;
218     int toSendSize = toSend.size();
219     log.info("collected " + toSendSize + " chunks for post_"+thisPost);
220 
221     
222     
223     for (Chunk c : toSend) {
224       DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
225       try {
226         c.write(b);
227       } catch (IOException err) {
228         log.error("serialization threw IOException", err);
229       }
230       serializedEvents.add(b);
231       
232       
233       
234       log.info("chunk seqID:"+c.getSeqID());
235       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
236          c.getSeqID() - c.getData().length));
237     }
238     toSend.clear();
239 
240     
241     RequestEntity postData = new BuffersRequestEntity(serializedEvents);
242 
243     PostMethod method = new PostMethod();
244     method.setRequestEntity(postData);
245     StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
246     sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
247 	if( COMPRESS) {
248 		sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
249 			.append( " of uncompressed data");
250 	}
251 	else {
252 		sb.append( postData.getContentLength());
253 	}
254 	log.info( sb);
255 
256     List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
257     log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
258     return results;
259   }
260   
261   
262 
263 
264 
265 
266 
267 
268 
269   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
270         List<CommitListEntry> expectedCommitResults)
271   throws IOException, InterruptedException{
272     reliablySend(method, "chukwa"); 
273     return expectedCommitResults;
274   }
275 
276   
277 
278 
279 
280 
281 
282 
283   protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
284     int retries = SENDER_RETRIES;
285     while (currCollector != null) {
286       
287       try {
288 
289         
290         List<String> responses = doRequest(method, currCollector+ pathSuffix);
291 
292         retries = SENDER_RETRIES; 
293 
294         return responses;
295       } catch (Throwable e) {
296         log.error("Http post exception on "+ currCollector +": "+ e.toString());
297         log.debug("Http post exception on "+ currCollector, e);
298         ChukwaHttpSender.metrics.httpThrowable.inc();
299         if (collectors.hasNext()) {
300           ChukwaHttpSender.metrics.collectorRollover.inc();
301           boolean repeatPost = failedCollector(currCollector);
302           currCollector = collectors.next();
303           if(repeatPost)
304             log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
305                 + currCollector);
306           else {
307             log.info("Using " + currCollector + " in the future, but not retrying this post");
308             break;
309           }
310         } else {
311           if (retries > 0) {
312             log.warn("No more collectors to try rolling over to; waiting "
313                 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
314                 + " retries left)");
315             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
316             retries--;
317           } else {
318             log.error("No more collectors to try rolling over to; aborting post");
319             throw new IOException("no collectors");
320           }
321         }
322       } finally {
323         
324         method.releaseConnection();
325       }
326     } 
327     return new ArrayList<String>();
328   }
329 
330   
331 
332 
333 
334 
335   protected boolean failedCollector(String downCollector) {
336     log.debug("declaring "+ downCollector + " down");
337     return true;
338   }
339 
340   
341 
342 
343 
344 
345   protected List<String> doRequest(HttpMethodBase method, String dest)
346       throws IOException, HttpException {
347 
348     HttpMethodParams pars = method.getParams();
349     pars.setParameter(HttpMethodParams.RETRY_HANDLER,
350         (Object) new HttpMethodRetryHandler() {
351           public boolean retryMethod(HttpMethod m, IOException e, int exec) {
352             return !(e instanceof java.net.ConnectException)
353                 && (exec < MAX_RETRIES_PER_COLLECTOR);
354           }
355         });
356 
357     pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
358 
359     method.setParams(pars);
360     method.setPath(dest);
361 
362     
363     ChukwaHttpSender.metrics.httpPost.inc();
364     
365     int statusCode = client.executeMethod(method);
366 
367     if (statusCode != HttpStatus.SC_OK) {
368       ChukwaHttpSender.metrics.httpException.inc();
369       
370       if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
371         ChukwaHttpSender.metrics.httpTimeOutException.inc();
372       }
373       
374       log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
375       
376       throw new HttpException("got back a failure from server");
377     }
378     
379     log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
380             + method.getResponseContentLength());
381 
382     
383     InputStream rstream = null;
384 
385     
386     byte[] resp_buf = method.getResponseBody();
387     rstream = new ByteArrayInputStream(resp_buf);
388     BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
389     String line;
390     List<String> resp = new ArrayList<String>();
391     while ((line = br.readLine()) != null) {
392       if (log.isDebugEnabled()) {
393         log.debug("response: " + line);
394       }
395       resp.add(line);
396     }
397     return resp;
398   }
399 
400   @Override
401   public void stop() {
402   }
403 }