This project has retired. For details please refer to its Attic page.
HttpConnector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.connector.http;
20  
21  
22  /**
23   * This class is responsible for setting up a {@link HttpConnectorClient} with a  collectors
24   * and then repeatedly calling its send function which encapsulates the work of setting up the
25   * connection with the appropriate collector and then collecting and sending the {@link Chunk}s 
26   * from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate
27   * the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able
28   * to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send
29   * chunks without an {@link LocalAgent} daemon.
30   * 
31   * * <p>
32   * On error, tries the list of available collectors, pauses for a minute, and then repeats.
33   * </p>
34   * <p> Will wait forever for collectors to come up. </p>
35  
36   */
37  
38  import java.io.IOException;
39  import java.util.ArrayList;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Timer;
43  import java.util.TimerTask;
44  import org.apache.hadoop.chukwa.Chunk;
45  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
46  import org.apache.hadoop.chukwa.datacollection.DataFactory;
47  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
48  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
49  import org.apache.hadoop.chukwa.datacollection.sender.*;
50  import org.apache.hadoop.chukwa.util.DaemonWatcher;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.log4j.Logger;
53  
54  public class HttpConnector implements Connector, Runnable {
55  
56    static Logger log = Logger.getLogger(HttpConnector.class);
57  
58    Timer statTimer = null;
59    volatile int chunkCount = 0;
60    
61    int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
62    int MIN_POST_INTERVAL = 5 * 1000;
63    public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
64    public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
65    public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
66  
67    boolean ASYNC_ACKS = false;
68    
69    ChunkQueue chunkQueue;
70  
71    ChukwaAgent agent;
72    String argDestination = null;
73  
74    private volatile boolean stopMe = false;
75    private Iterator<String> collectors = null;
76    protected ChukwaSender connectorClient = null;
77  
78    { //instance initializer block
79      statTimer = new Timer();
80      statTimer.schedule(new TimerTask() {
81        public void run() {
82          int count = chunkCount;
83          chunkCount = 0;
84          log.info("# http chunks ACK'ed since last report: " + count);
85        }
86      }, 100, 60 * 1000);
87    }
88  
89    public HttpConnector(ChukwaAgent agent) {
90      this.agent = agent;
91    }
92  
93    public HttpConnector(ChukwaAgent agent, String destination) {
94      this.agent = agent;
95      this.argDestination = destination;
96  
97      log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
98          + destination);
99    }
100 
101   public void start() {
102 
103     chunkQueue = DataFactory.getInstance().getEventQueue();
104     Configuration conf = agent.getConfiguration();
105     MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
106     MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
107     ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
108     (new Thread(this, "HTTP post thread")).start();
109   }
110 
111   public void shutdown() {
112     stopMe = true;
113     connectorClient.stop();
114   }
115 
116   public void run() {
117     log.info("HttpConnector started at time:" + System.currentTimeMillis());
118 
119     // build a list of our destinations from collectors
120     try {
121       if(collectors == null)
122         collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
123     } catch (IOException e) {
124       log.error("Failed to retrieve list of collectors from "
125           + "conf/collectors file", e);
126     }
127     
128     if(ASYNC_ACKS) {
129       try {
130         connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
131       } catch(IOException e) {
132         log.fatal("can't read AsycAck hostlist file, exiting");
133         agent.shutdown(true);
134       }
135     } else
136       connectorClient = new ChukwaHttpSender(agent.getConfiguration());
137 
138     if (argDestination != null) {
139       ArrayList<String> tmp = new ArrayList<String>();
140       tmp.add(argDestination);
141       collectors = tmp.iterator();
142       log.info("using collector specified at agent runtime: " + argDestination);
143     } else
144       log.info("using collectors from collectors file");
145 
146     if (collectors == null || !collectors.hasNext()) {
147       log.error("No collectors specified, exiting (and taking agent with us).");
148       agent.shutdown(true);// error is unrecoverable, so stop hard.
149       return;
150     }
151 
152     connectorClient.setCollectors(collectors);
153 
154 
155     try {
156       long lastPost = System.currentTimeMillis();
157       while (!stopMe) {
158         List<Chunk> newQueue = new ArrayList<Chunk>();
159         try {
160           // get all ready chunks from the chunkQueue to be sent
161           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
162                                                            // really do this by size
163 
164         } catch (InterruptedException e) {
165           System.out.println("thread interrupted during addChunks(ChunkQueue)");
166           Thread.currentThread().interrupt();
167           break;
168         }
169         List<ChukwaHttpSender.CommitListEntry> results = connectorClient
170             .send(newQueue);
171         // checkpoint the chunks which were committed
172         for (ChukwaHttpSender.CommitListEntry cle : results) {
173           agent.reportCommit(cle.adaptor, cle.uuid);
174           chunkCount++;
175         }
176 
177         long now = System.currentTimeMillis();
178         long delta = MIN_POST_INTERVAL - now + lastPost;
179         if(delta > 0) {
180           Thread.sleep(delta); // wait for stuff to accumulate
181         }
182         lastPost = now;
183       } // end of try forever loop
184       log.info("received stop() command so exiting run() loop to shutdown connector");
185     } catch (OutOfMemoryError e) {
186       log.warn("Bailing out", e);
187       DaemonWatcher.bailout(-1);
188     } catch (InterruptedException e) {
189       // do nothing, let thread die.
190       log.warn("Bailing out", e);
191       DaemonWatcher.bailout(-1);
192     } catch (java.io.IOException e) {
193       log.error("connector failed; shutting down agent");
194       agent.shutdown(true);
195     }
196   }
197 
198   @Override
199   public void reloadConfiguration() {
200     Iterator<String> destinations = null;
201 
202     // build a list of our destinations from collectors
203     try {
204       destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
205     } catch (IOException e) {
206       log.error("Failed to retreive list of collectors from conf/collectors file", e);
207     }
208     if (destinations != null && destinations.hasNext()) {
209       collectors = destinations;
210       connectorClient.setCollectors(collectors);
211       log.info("Resetting collectors");
212     }
213   }
214   
215   public ChukwaSender getSender() {
216     return connectorClient;
217   }
218   
219   public void setCollectors(Iterator<String> list) {
220     collectors = list;
221   }
222 }