1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.sender;
1920import java.io.IOException;
21import org.apache.hadoop.chukwa.datacollection.DataFactory;
22import org.apache.hadoop.chukwa.datacollection.agent.*;
23import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26import java.util.*;
27import java.util.regex.Matcher;
28import java.util.regex.Pattern;
29import org.apache.hadoop.conf.*;
30import org.apache.commons.httpclient.methods.GetMethod;
31import org.apache.commons.httpclient.methods.PostMethod;
32//import com.google.common.collect.SortedSetMultimap;33//import com.google.common.collect.TreeMultimap;3435import org.apache.log4j.Logger;
3637/**38 * An enhancement to ChukwaHttpSender that handles asynchronous acknowledgment.39 * 40 * This class will periodically poll the collectors to find out how much data41 * has been committed to HDFS, and will then pass those acks on to the Agent.42 */43publicclassAsyncAckSenderextendsChukwaHttpSender{
4445protectedfinalstatic Logger log = Logger.getLogger(AsyncAckSender.class);
46/*47 * Represents the state required for an asynchronous ack.48 * 49 * Supplements CommitListEntry with a filename and offset;50 * the data commits when that file reaches that length.51 */52publicstaticclassDelayedCommitextendsCommitListEntryimplements Comparable<DelayedCommit> {
53final String fname;
54long fOffset;
55final String aName;
56publicDelayedCommit(Adaptor a, long uuid, long len, String fname,
57long offset, String aName) {
58super(a, uuid, len);
59this.fname = fname;
60this.fOffset = offset;
61this.aName = aName;
62 }
6364 @Override
65publicint hashCode() {
66returnsuper.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
67 }
6869//sort by adaptor name first, then by start offset70//note that returning 1 means this is "greater" than RHS71publicint compareTo(DelayedCommit o) {
72int c = o.aName.compareTo(this.aName);
73if(c != 0)
74return c;
75 c = o.fname.compareTo(this.fname);
76if(c != 0)
77return c;
78if(o.start < start)
79return 1;
80elseif(o.start > start)
81return -1;
82elsereturn 0;
83 }
8485 @Override
86publicboolean equals(Object o) {
87if(!(o instanceof DelayedCommit)) {
88return false;
89 }
90DelayedCommit dc = (DelayedCommit) o;
91if(this.aName.equals(dc.aName)) {
92returntrue;
93 }
94return false;
95 }
9697public String toString() {
98return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
99 }
100 }
101102publicstaticfinal String POLLPERIOD_OPT = "connector.commitpoll.period";
103publicstaticfinal String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
104finalChukwaAgent agent;
105106/*107 * The list of commits that we're expecting.108 * This is the structure used to pass the list to the CommitPollThread. 109 * Adjacent commits to the same file will be coalesced.110 * 111 */112final List<DelayedCommit> mergedList;
113114/**115 * Periodically scans a subset of the collectors, looking for committed files.116 * This way, not every collector is pestering the namenode with periodic lses.117 */118finalclassCommitPollThreadextends Thread {
119privateChukwaHttpSender scanPath;
120privateint pollPeriod = 1000 * 30;
121122123privatefinal Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
124125CommitPollThread(Configuration conf, Iterator<String> tryList) {
126 pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
127 scanPath = newChukwaHttpSender(conf);
128 scanPath.setCollectors(tryList);
129 pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
130 }
131132privatevolatileboolean running = true;
133publicvoid shutdown() {
134 running = false;
135this.interrupt();
136 }
137138publicvoid run() {
139try {
140while(running) {
141 Thread.sleep(pollPeriod);
142//update table using list of pending delayed commits, in this thread143 checkForCommits();
144 mergePendingTable();
145 }
146 } catch(InterruptedException e) {}
147catch(IOException e) {
148 log.error(e);
149 }
150 }
151152/*153 * Note that this method is NOT threadsafe, and should only be called154 * from the same thread that will later check for commits155 */156privatevoid mergePendingTable() {
157synchronized(mergedList) {
158for(DelayedCommit dc:mergedList) {
159160 PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
161if(pendList == null) {
162 pendList = new PriorityQueue<DelayedCommit>();
163 pendingCommits.put(dc.fname, pendList);
164 }
165 pendList.add(dc);
166 }
167 mergedList.clear();
168 } //end synchronized169 }
170171 Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
172privatevoid checkForCommits() throws IOException, InterruptedException {
173174 log.info("checking for commited chunks");
175 GetMethod method = new GetMethod();
176 List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH);
177178//do an http get179for(String stat: parsedFStatuses) {
180 Matcher m = respLine.matcher(stat);
181if(!m.matches())
182continue;
183 String path = m.group(1);
184 Long committedOffset = Long.parseLong(m.group(2));
185186 PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
187if(delayedOnFile == null)
188continue;
189190 HashSet<Adaptor> committed = new HashSet<Adaptor>();
191while(!delayedOnFile.isEmpty()) {
192DelayedCommit fired = delayedOnFile.element();
193if(fired.fOffset > committedOffset)
194break;
195else {
196 ChukwaAgent.Offset o = agent.offset(fired.adaptor);
197if(o != null && fired.start > o.offset()) {
198 log.error("can't commit "+ o.adaptorID() + " without ordering assumption");
199break; //don't commit200 }
201 delayedOnFile.remove();
202 String s = agent.reportCommit(fired.adaptor, fired.uuid);
203 committed.add(fired.adaptor);
204//TODO: if s == null, then the adaptor has been stopped.205//should we stop sending acks?206 log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
207 }
208 }
209 adaptorReset.reportCommits(committed);
210 }
211 }
212213 }
214215CommitPollThread pollThread;
216217//note that at present we don't actually run this thread; we just use its methods.218publicAdaptorResetThread adaptorReset;
219 Configuration conf;
220221publicAsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
222super(conf);
223 log.info("delayed-commit processing enabled");
224 agent = a;
225226 mergedList = new ArrayList<DelayedCommit>();
227this.conf = conf;
228 adaptorReset = newAdaptorResetThread(conf, a);
229 adaptorReset.start();
230//initialize the commitpoll later, once we have the list of collectors231 }
232233234 @Override
235publicvoid setCollectors(Iterator<String> collectors) {
236 Iterator<String> tryList = null;
237 String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
238try {
239 tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
240 } catch(IOException e) {
241 log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
242 }
243244if(collectors instanceof RetryListOfCollectors) {
245super.setCollectors(collectors);
246if(tryList == null)
247 tryList = ((RetryListOfCollectors) collectors).clone();
248 }
249else {
250 ArrayList<String> l = new ArrayList<String>();
251while(collectors.hasNext())
252 l.add(collectors.next());
253super.setCollectors(l.iterator());
254if(tryList == null)
255 tryList = l.iterator();
256 }
257258 pollThread = newCommitPollThread(conf, tryList);
259 pollThread.setDaemon(true);
260 pollThread.start();
261 }
262263/*264 * This method is the interface from AsyncAckSender to the CommitPollThread --265 * it gets a lock on the merge table, and then updates it with a batch of pending acks266 *267 * This method is called from the thread doing a post; the merge table is268 * read by the CommitPollThread when it figures out what commits are expected.269 */270privatevoid delayCommits(List<DelayedCommit> delayed) {
271 Collections.sort(delayed);
272273synchronized(mergedList) {
274DelayedCommit region =null;
275for(DelayedCommit cur: delayed) {
276if(region == null)
277 region = cur;
278elseif((cur.adaptor == region.adaptor) &&
279 cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
280//since the list is sorted, region.start < cur.start281 region.uuid = Math.max(region.uuid, cur.uuid); //merge282 region.fOffset = Math.max(region.fOffset, cur.fOffset);
283 } else {
284 mergedList.add(region);
285 region= cur;
286 }
287 }
288 mergedList.add(region);
289 }
290 }
291292293 Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
294 @Override
295public List<CommitListEntry> postAndParseResponse(PostMethod method,
296 List<CommitListEntry> expectedCommitResults)
297throws IOException, InterruptedException {
298 adaptorReset.reportPending(expectedCommitResults);
299 List<String> resp = reliablySend(method, ServletCollector.PATH);
300//expect most of 'em to be delayed301 List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
302 ArrayList<CommitListEntry> result = new ArrayList<CommitListEntry>();
303304for(int i = 0; i < resp.size(); ++i) {
305if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
306 result.add(expectedCommitResults.get(i));
307else {
308CommitListEntry cle = expectedCommitResults.get(i);
309 Matcher m = partialCommitPat.matcher(resp.get(i));
310if(!m.matches())
311 log.warn("unexpected response: "+ resp.get(i));
312else313 log.info("waiting for " + m.group(1) + " to hit " + m.group(2) +
314" before committing "+ agent.getAdaptorName(cle.adaptor));
315316 String name = agent.getAdaptorName(cle.adaptor);
317if(name != null)//null name implies adaptor no longer running318 toDelay.add(newDelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1),
319 Long.parseLong(m.group(2)), name));
320 }
321 }
322 delayCommits(toDelay);
323return result;
324 }
325326 @Override
327protectedboolean failedCollector(String downed) {
328 log.info("collector "+ downed + " down; resetting adaptors");
329 adaptorReset.resetTimedOutAdaptors(0); //reset all adaptors with outstanding data.330return false;
331 }
332333 @Override
334publicvoid stop() {
335 pollThread.shutdown();
336 }
337338 }