1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.connector.http;
202122/**23 * This class is responsible for setting up a {@link HttpConnectorClient} with a collectors24 * and then repeatedly calling its send function which encapsulates the work of setting up the25 * connection with the appropriate collector and then collecting and sending the {@link Chunk}s 26 * from the global {@link ChunkQueue} which where added by {@link Adaptors}. We want to separate27 * the long living (i.e. looping) behavior from the ConnectorClient because we also want to be able28 * to use the HttpConnectorClient for its add and send API in arbitrary applications that want to send29 * chunks without an {@link LocalAgent} daemon.30 * 31 * * <p>32 * On error, tries the list of available collectors, pauses for a minute, and then repeats.33 * </p>34 * <p> Will wait forever for collectors to come up. </p>3536 */3738import java.io.IOException;
39import java.util.ArrayList;
40import java.util.Iterator;
41import java.util.List;
42import java.util.Timer;
43import java.util.TimerTask;
44import java.util.concurrent.atomic.AtomicInteger;
4546import org.apache.hadoop.chukwa.Chunk;
47import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
48import org.apache.hadoop.chukwa.datacollection.DataFactory;
49import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
50import org.apache.hadoop.chukwa.datacollection.connector.Connector;
51import org.apache.hadoop.chukwa.datacollection.sender.*;
52import org.apache.hadoop.conf.Configuration;
53import org.apache.log4j.Logger;
5455publicclassHttpConnectorimplements Connector, Runnable {
5657static Logger log = Logger.getLogger(HttpConnector.class);
5859 Timer statTimer = null;
60 AtomicInteger chunkCount = new AtomicInteger();
6162int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
63int MIN_POST_INTERVAL = 5 * 1000;
64publicstaticfinal String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
65publicstaticfinal String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
66publicstaticfinal String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
6768boolean ASYNC_ACKS = false;
6970ChunkQueue chunkQueue;
7172ChukwaAgent agent;
73 String argDestination = null;
7475privatevolatileboolean stopMe = false;
76private Iterator<String> collectors = null;
77protectedChukwaSender connectorClient = null;
7879 { //instance initializer block80 statTimer = new Timer();
81 statTimer.schedule(new TimerTask() {
82publicvoid run() {
83int count = chunkCount.get();
84 chunkCount.set(0);
85 log.info("# http chunks ACK'ed since last report: " + count);
86 }
87 }, 100, 60 * 1000);
88 }
8990publicHttpConnector(ChukwaAgent agent) {
91this.agent = agent;
92 }
9394publicHttpConnector(ChukwaAgent agent, String destination) {
95this.agent = agent;
96this.argDestination = destination;
9798 log.info("Setting HTTP Connector URL manually using arg passed to Agent: "99 + destination);
100 }
101102publicvoid start() {
103104 chunkQueue = DataFactory.getInstance().getEventQueue();
105 Configuration conf = agent.getConfiguration();
106 MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
107 MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
108 ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
109 (new Thread(this, "HTTP post thread")).start();
110 }
111112publicvoid shutdown() {
113 stopMe = true;
114 connectorClient.stop();
115 }
116117publicvoid run() {
118 log.info("HttpConnector started at time:" + System.currentTimeMillis());
119120// build a list of our destinations from collectors121try {
122if(collectors == null)
123 collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
124 } catch (IOException e) {
125 log.error("Failed to retrieve list of collectors from "126 + "conf/collectors file", e);
127 }
128129if(ASYNC_ACKS) {
130try {
131 connectorClient = newAsyncAckSender(agent.getConfiguration(), agent);
132 } catch(IOException e) {
133 log.fatal("can't read AsycAck hostlist file, exiting");
134 agent.shutdown(true);
135 }
136 } else137 connectorClient = newChukwaHttpSender(agent.getConfiguration());
138139if (argDestination != null) {
140 ArrayList<String> tmp = new ArrayList<String>();
141 tmp.add(argDestination);
142 collectors = tmp.iterator();
143 log.info("using collector specified at agent runtime: " + argDestination);
144 } else145 log.info("using collectors from collectors file");
146147if (collectors == null || !collectors.hasNext()) {
148 log.error("No collectors specified, exiting (and taking agent with us).");
149 agent.shutdown(true);// error is unrecoverable, so stop hard.150return;
151 }
152153 connectorClient.setCollectors(collectors);
154155156try {
157long lastPost = System.currentTimeMillis();
158while (!stopMe) {
159 List<Chunk> newQueue = new ArrayList<Chunk>();
160try {
161// get all ready chunks from the chunkQueue to be sent162 chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should163// really do this by size164165 } catch (InterruptedException e) {
166 System.out.println("thread interrupted during addChunks(ChunkQueue)");
167 Thread.currentThread().interrupt();
168break;
169 }
170 List<ChukwaHttpSender.CommitListEntry> results = connectorClient
171 .send(newQueue);
172// checkpoint the chunks which were committed173for (ChukwaHttpSender.CommitListEntry cle : results) {
174 agent.reportCommit(cle.adaptor, cle.uuid);
175 chunkCount.set(chunkCount.get()+1);;
176 }
177178long now = System.currentTimeMillis();
179long delta = MIN_POST_INTERVAL - now + lastPost;
180if(delta > 0) {
181 Thread.sleep(delta); // wait for stuff to accumulate182 }
183 lastPost = now;
184 } // end of try forever loop185 log.info("received stop() command so exiting run() loop to shutdown connector");
186 } catch (OutOfMemoryError e) {
187 log.warn("Bailing out", e);
188 } catch (InterruptedException e) {
189// do nothing, let thread die.190 log.warn("Bailing out", e);
191 } catch (java.io.IOException e) {
192 log.error("connector failed; shutting down agent");
193 agent.shutdown(true);
194 }
195 }
196197 @Override
198publicvoid reloadConfiguration() {
199 Iterator<String> destinations = null;
200201// build a list of our destinations from collectors202try {
203 destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
204 } catch (IOException e) {
205 log.error("Failed to retreive list of collectors from conf/collectors file", e);
206 }
207if (destinations != null && destinations.hasNext()) {
208 collectors = destinations;
209 connectorClient.setCollectors(collectors);
210 log.info("Resetting collectors");
211 }
212 }
213214publicChukwaSender getSender() {
215return connectorClient;
216 }
217218publicvoid setCollectors(Iterator<String> list) {
219 collectors = list;
220 }
221 }