This project has retired. For details please refer to its Attic page.
HttpConnector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.connector.http;
20  
21  
22  /**
23   * This class is responsible for setting up a {@link HttpConnectorClient} with a  collectors
24   * and then repeatedly calling its send function which encapsulates the work of setting up the
25   * connection with the appropriate collector and then collecting and sending the {@link Chunk}s 
26   * from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate
27   * the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able
28   * to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send
29   * chunks without an {@link LocalAgent} daemon.
30   * 
31   * * <p>
32   * On error, tries the list of available collectors, pauses for a minute, and then repeats.
33   * </p>
34   * <p> Will wait forever for collectors to come up. </p>
35  
36   */
37  
38  import java.io.IOException;
39  import java.util.ArrayList;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Timer;
43  import java.util.TimerTask;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.hadoop.chukwa.Chunk;
47  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
48  import org.apache.hadoop.chukwa.datacollection.DataFactory;
49  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
50  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
51  import org.apache.hadoop.chukwa.datacollection.sender.*;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.log4j.Logger;
54  
55  public class HttpConnector implements Connector, Runnable {
56  
57    static Logger log = Logger.getLogger(HttpConnector.class);
58  
59    Timer statTimer = null;
60    AtomicInteger chunkCount = new AtomicInteger();
61    
62    int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
63    int MIN_POST_INTERVAL = 5 * 1000;
64    public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
65    public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
66    public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
67  
68    boolean ASYNC_ACKS = false;
69    
70    ChunkQueue chunkQueue;
71  
72    ChukwaAgent agent;
73    String argDestination = null;
74  
75    private volatile boolean stopMe = false;
76    private Iterator<String> collectors = null;
77    protected ChukwaSender connectorClient = null;
78  
79    { //instance initializer block
80      statTimer = new Timer();
81      statTimer.schedule(new TimerTask() {
82        public void run() {
83          int count = chunkCount.get();
84          chunkCount.set(0);
85          log.info("# http chunks ACK'ed since last report: " + count);
86        }
87      }, 100, 60 * 1000);
88    }
89  
90    public HttpConnector(ChukwaAgent agent) {
91      this.agent = agent;
92    }
93  
94    public HttpConnector(ChukwaAgent agent, String destination) {
95      this.agent = agent;
96      this.argDestination = destination;
97  
98      log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
99          + destination);
100   }
101 
102   public void start() {
103 
104     chunkQueue = DataFactory.getInstance().getEventQueue();
105     Configuration conf = agent.getConfiguration();
106     MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
107     MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
108     ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
109     (new Thread(this, "HTTP post thread")).start();
110   }
111 
112   public void shutdown() {
113     stopMe = true;
114     connectorClient.stop();
115   }
116 
117   public void run() {
118     log.info("HttpConnector started at time:" + System.currentTimeMillis());
119 
120     // build a list of our destinations from collectors
121     try {
122       if(collectors == null)
123         collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
124     } catch (IOException e) {
125       log.error("Failed to retrieve list of collectors from "
126           + "conf/collectors file", e);
127     }
128     
129     if(ASYNC_ACKS) {
130       try {
131         connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
132       } catch(IOException e) {
133         log.fatal("can't read AsycAck hostlist file, exiting");
134         agent.shutdown(true);
135       }
136     } else
137       connectorClient = new ChukwaHttpSender(agent.getConfiguration());
138 
139     if (argDestination != null) {
140       ArrayList<String> tmp = new ArrayList<String>();
141       tmp.add(argDestination);
142       collectors = tmp.iterator();
143       log.info("using collector specified at agent runtime: " + argDestination);
144     } else
145       log.info("using collectors from collectors file");
146 
147     if (collectors == null || !collectors.hasNext()) {
148       log.error("No collectors specified, exiting (and taking agent with us).");
149       agent.shutdown(true);// error is unrecoverable, so stop hard.
150       return;
151     }
152 
153     connectorClient.setCollectors(collectors);
154 
155 
156     try {
157       long lastPost = System.currentTimeMillis();
158       while (!stopMe) {
159         List<Chunk> newQueue = new ArrayList<Chunk>();
160         try {
161           // get all ready chunks from the chunkQueue to be sent
162           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
163                                                            // really do this by size
164 
165         } catch (InterruptedException e) {
166           System.out.println("thread interrupted during addChunks(ChunkQueue)");
167           Thread.currentThread().interrupt();
168           break;
169         }
170         List<ChukwaHttpSender.CommitListEntry> results = connectorClient
171             .send(newQueue);
172         // checkpoint the chunks which were committed
173         for (ChukwaHttpSender.CommitListEntry cle : results) {
174           agent.reportCommit(cle.adaptor, cle.uuid);
175           chunkCount.set(chunkCount.get()+1);;
176         }
177 
178         long now = System.currentTimeMillis();
179         long delta = MIN_POST_INTERVAL - now + lastPost;
180         if(delta > 0) {
181           Thread.sleep(delta); // wait for stuff to accumulate
182         }
183         lastPost = now;
184       } // end of try forever loop
185       log.info("received stop() command so exiting run() loop to shutdown connector");
186     } catch (OutOfMemoryError e) {
187       log.warn("Bailing out", e);
188     } catch (InterruptedException e) {
189       // do nothing, let thread die.
190       log.warn("Bailing out", e);
191     } catch (java.io.IOException e) {
192       log.error("connector failed; shutting down agent");
193       agent.shutdown(true);
194     }
195   }
196 
197   @Override
198   public void reloadConfiguration() {
199     Iterator<String> destinations = null;
200 
201     // build a list of our destinations from collectors
202     try {
203       destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
204     } catch (IOException e) {
205       log.error("Failed to retreive list of collectors from conf/collectors file", e);
206     }
207     if (destinations != null && destinations.hasNext()) {
208       collectors = destinations;
209       connectorClient.setCollectors(collectors);
210       log.info("Resetting collectors");
211     }
212   }
213   
214   public ChukwaSender getSender() {
215     return connectorClient;
216   }
217   
218   public void setCollectors(Iterator<String> list) {
219     collectors = list;
220   }
221 }