1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.sender;
1920import java.io.IOException;
21import org.apache.hadoop.chukwa.datacollection.DataFactory;
22import org.apache.hadoop.chukwa.datacollection.agent.*;
23import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
27import java.util.*;
28import java.util.regex.Matcher;
29import java.util.regex.Pattern;
30import org.apache.hadoop.conf.*;
31import org.apache.commons.httpclient.*;
32import org.apache.commons.httpclient.methods.GetMethod;
33import org.apache.commons.httpclient.methods.PostMethod;
34//import com.google.common.collect.SortedSetMultimap;35//import com.google.common.collect.TreeMultimap;3637import org.apache.log4j.Logger;
3839/**40 * An enhancement to ChukwaHttpSender that handles asynchronous acknowledgment.41 * 42 * This class will periodically poll the collectors to find out how much data43 * has been committed to HDFS, and will then pass those acks on to the Agent.44 */45publicclassAsyncAckSenderextendsChukwaHttpSender{
4647protectedstatic Logger log = Logger.getLogger(AsyncAckSender.class);
48/*49 * Represents the state required for an asynchronous ack.50 * 51 * Supplements CommitListEntry with a filename and offset;52 * the data commits when that file reaches that length.53 */54publicstaticclassDelayedCommitextendsCommitListEntryimplements Comparable<DelayedCommit> {
55final String fname;
56long fOffset;
57final String aName;
58publicDelayedCommit(Adaptor a, long uuid, long len, String fname,
59long offset, String aName) {
60super(a, uuid, len);
61this.fname = fname;
62this.fOffset = offset;
63this.aName = aName;
64 }
6566 @Override
67publicint hashCode() {
68returnsuper.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
69 }
7071//sort by adaptor name first, then by start offset72//note that returning 1 means this is "greater" than RHS73publicint compareTo(DelayedCommit o) {
74int c = o.aName.compareTo(this.aName);
75if(c != 0)
76return c;
77 c = fname.compareTo(this.fname);
78if(c != 0)
79return c;
80if(o.start < start)
81return 1;
82elseif(o.start > start)
83return -1;
84elsereturn 0;
85 }
8687public String toString() {
88return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
89 }
90 }
9192publicstaticfinal String POLLPERIOD_OPT = "connector.commitpoll.period";
93publicstaticfinal String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
94finalChukwaAgent agent;
9596/*97 * The list of commits that we're expecting.98 * This is the structure used to pass the list to the CommitPollThread. 99 * Adjacent commits to the same file will be coalesced.100 * 101 */102final List<DelayedCommit> mergedList;
103104/**105 * Periodically scans a subset of the collectors, looking for committed files.106 * This way, not every collector is pestering the namenode with periodic lses.107 */108finalclassCommitPollThreadextends Thread {
109privateChukwaHttpSender scanPath;
110privateint pollPeriod = 1000 * 30;
111112113privatefinal Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
114115CommitPollThread(Configuration conf, Iterator<String> tryList) {
116 pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
117 scanPath = newChukwaHttpSender(conf);
118 scanPath.setCollectors(tryList);
119 pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
120 }
121122privatevolatileboolean running = true;
123publicvoid shutdown() {
124 running = false;
125this.interrupt();
126 }
127128publicvoid run() {
129try {
130while(running) {
131 Thread.sleep(pollPeriod);
132//update table using list of pending delayed commits, in this thread133 checkForCommits();
134 mergePendingTable();
135 }
136 } catch(InterruptedException e) {}
137catch(IOException e) {
138 log.error(e);
139 }
140 }
141142/*143 * Note that this method is NOT threadsafe, and should only be called144 * from the same thread that will later check for commits145 */146privatevoid mergePendingTable() {
147synchronized(mergedList) {
148for(DelayedCommit dc:mergedList) {
149150 PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
151if(pendList == null) {
152 pendList = new PriorityQueue<DelayedCommit>();
153 pendingCommits.put(dc.fname, pendList);
154 }
155 pendList.add(dc);
156 }
157 mergedList.clear();
158 } //end synchronized159 }
160161 Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
162privatevoid checkForCommits() throws IOException, InterruptedException {
163164 log.info("checking for commited chunks");
165 GetMethod method = new GetMethod();
166 List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH);
167168//do an http get169for(String stat: parsedFStatuses) {
170 Matcher m = respLine.matcher(stat);
171if(!m.matches())
172continue;
173 String path = m.group(1);
174 Long committedOffset = Long.parseLong(m.group(2));
175176 PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
177if(delayedOnFile == null)
178continue;
179180 HashSet<Adaptor> committed = new HashSet<Adaptor>();
181while(!delayedOnFile.isEmpty()) {
182DelayedCommit fired = delayedOnFile.element();
183if(fired.fOffset > committedOffset)
184break;
185else {
186 ChukwaAgent.Offset o = agent.offset(fired.adaptor);
187if(o != null && fired.start > o.offset()) {
188 log.error("can't commit "+ o.adaptorID() + " without ordering assumption");
189break; //don't commit190 }
191 delayedOnFile.remove();
192 String s = agent.reportCommit(fired.adaptor, fired.uuid);
193 committed.add(fired.adaptor);
194//TODO: if s == null, then the adaptor has been stopped.195//should we stop sending acks?196 log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
197 }
198 }
199 adaptorReset.reportCommits(committed);
200 }
201 }
202203 }
204205CommitPollThread pollThread;
206207//note that at present we don't actually run this thread; we just use its methods.208publicAdaptorResetThread adaptorReset;
209 Configuration conf;
210211publicAsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
212super(conf);
213 log.info("delayed-commit processing enabled");
214 agent = a;
215216 mergedList = new ArrayList<DelayedCommit>();
217this.conf = conf;
218 adaptorReset = newAdaptorResetThread(conf, a);
219 adaptorReset.start();
220//initialize the commitpoll later, once we have the list of collectors221 }
222223224 @Override
225publicvoid setCollectors(Iterator<String> collectors) {
226 Iterator<String> tryList = null;
227 String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
228try {
229 tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
230 } catch(IOException e) {
231 log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
232 }
233234if(collectors instanceof RetryListOfCollectors) {
235super.setCollectors(collectors);
236if(tryList == null)
237 tryList = ((RetryListOfCollectors) collectors).clone();
238 }
239else {
240 ArrayList<String> l = new ArrayList<String>();
241while(collectors.hasNext())
242 l.add(collectors.next());
243super.setCollectors(l.iterator());
244if(tryList == null)
245 tryList = l.iterator();
246 }
247248 pollThread = newCommitPollThread(conf, tryList);
249 pollThread.setDaemon(true);
250 pollThread.start();
251 }
252253/*254 * This method is the interface from AsyncAckSender to the CommitPollThread --255 * it gets a lock on the merge table, and then updates it with a batch of pending acks256 *257 * This method is called from the thread doing a post; the merge table is258 * read by the CommitPollThread when it figures out what commits are expected.259 */260privatevoid delayCommits(List<DelayedCommit> delayed) {
261 Collections.sort(delayed);
262263synchronized(mergedList) {
264DelayedCommit region =null;
265for(DelayedCommit cur: delayed) {
266if(region == null)
267 region = cur;
268elseif((cur.adaptor == region.adaptor) &&
269 cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
270//since the list is sorted, region.start < cur.start271 region.uuid = Math.max(region.uuid, cur.uuid); //merge272 region.fOffset = Math.max(region.fOffset, cur.fOffset);
273 } else {
274 mergedList.add(region);
275 region= cur;
276 }
277 }
278 mergedList.add(region);
279 }
280 }
281282283 Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
284 @Override
285public List<CommitListEntry> postAndParseResponse(PostMethod method,
286 List<CommitListEntry> expectedCommitResults)
287throws IOException, InterruptedException {
288 adaptorReset.reportPending(expectedCommitResults);
289 List<String> resp = reliablySend(method, ServletCollector.PATH);
290//expect most of 'em to be delayed291 List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
292 ArrayList<CommitListEntry> result = new ArrayList<CommitListEntry>();
293294for(int i = 0; i < resp.size(); ++i) {
295if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
296 result.add(expectedCommitResults.get(i));
297else {
298CommitListEntry cle = expectedCommitResults.get(i);
299 Matcher m = partialCommitPat.matcher(resp.get(i));
300if(!m.matches())
301 log.warn("unexpected response: "+ resp.get(i));
302else303 log.info("waiting for " + m.group(1) + " to hit " + m.group(2) +
304" before committing "+ agent.getAdaptorName(cle.adaptor));
305306 String name = agent.getAdaptorName(cle.adaptor);
307if(name != null)//null name implies adaptor no longer running308 toDelay.add(newDelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1),
309 Long.parseLong(m.group(2)), name));
310 }
311 }
312 delayCommits(toDelay);
313return result;
314 }
315316 @Override
317protectedboolean failedCollector(String downed) {
318 log.info("collector "+ downed + " down; resetting adaptors");
319 adaptorReset.resetTimedOutAdaptors(0); //reset all adaptors with outstanding data.320return false;
321 }
322323 @Override
324publicvoid stop() {
325 pollThread.shutdown();
326 }
327328 }