This project has retired. For details please refer to its Attic page.
AsyncAckSender xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.sender;
19  
20  import java.io.IOException;
21  import org.apache.hadoop.chukwa.datacollection.DataFactory;
22  import org.apache.hadoop.chukwa.datacollection.agent.*;
23  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24  import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26  import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
27  import java.util.*;
28  import java.util.regex.Matcher;
29  import java.util.regex.Pattern;
30  import org.apache.hadoop.conf.*;
31  import org.apache.commons.httpclient.*;
32  import org.apache.commons.httpclient.methods.GetMethod;
33  import org.apache.commons.httpclient.methods.PostMethod;
34  //import com.google.common.collect.SortedSetMultimap;
35  //import com.google.common.collect.TreeMultimap;
36  
37  import org.apache.log4j.Logger;
38  
39  /**
40   * An enhancement to ChukwaHttpSender that handles asynchronous acknowledgment.
41   * 
42   * This class will periodically poll the collectors to find out how much data
43   * has been committed to HDFS, and will then pass those acks on to the Agent.
44   */
45  public class AsyncAckSender extends ChukwaHttpSender{
46    
47    protected static Logger log = Logger.getLogger(AsyncAckSender.class);
48    /*
49     * Represents the state required for an asynchronous ack.
50     * 
51     * Supplements CommitListEntry with a filename and offset;
52     * the data commits when that file reaches that length.
53     */
54    public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
55      final String fname;
56      long fOffset;
57      final String aName;
58      public DelayedCommit(Adaptor a, long uuid, long len, String fname, 
59          long offset, String aName) {
60        super(a, uuid, len);
61        this.fname = fname;
62        this.fOffset = offset;
63        this.aName = aName;
64      }
65  
66      @Override
67      public int hashCode() {
68        return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
69      }
70      
71      //sort by adaptor name first, then by start offset
72      //note that returning 1 means this is "greater" than RHS
73      public int compareTo(DelayedCommit o) {
74        int c = o.aName.compareTo(this.aName);
75        if(c != 0)
76          return c;
77        c = fname.compareTo(this.fname);
78        if(c != 0)
79          return c;
80        if(o.start < start)
81          return 1;
82        else if(o.start > start)
83          return -1;
84        else return 0;
85      }
86      
87      public String toString() {
88        return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
89      }
90    }
91    
92    public static final String POLLPERIOD_OPT = "connector.commitpoll.period";
93    public static final String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
94    final ChukwaAgent agent;
95    
96    /*
97     * The list of commits that we're expecting.
98     * This is the structure used to pass the list to the CommitPollThread.  
99     * Adjacent commits to the same file will be coalesced.
100    * 
101    */
102   final List<DelayedCommit> mergedList;
103   
104   /**
105    * Periodically scans a subset of the collectors, looking for committed files.
106    * This way, not every collector is pestering the namenode with periodic lses.
107    */
108   final class CommitPollThread extends Thread {
109     private ChukwaHttpSender scanPath;
110     private int pollPeriod = 1000 * 30;
111 
112 
113     private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
114 
115     CommitPollThread(Configuration conf, Iterator<String> tryList) {
116       pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
117       scanPath = new ChukwaHttpSender(conf);
118       scanPath.setCollectors(tryList);
119       pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
120     }
121 
122     private volatile boolean running = true;
123     public void shutdown() {
124       running = false;
125       this.interrupt();
126     }
127     
128     public void run() {
129       try {
130         while(running) {
131           Thread.sleep(pollPeriod);
132           //update table using list of pending delayed commits, in this thread
133           checkForCommits();
134           mergePendingTable();
135         }
136       } catch(InterruptedException e) {}
137       catch(IOException e) {
138         log.error(e);
139       }
140     } 
141     
142     /*
143      * Note that this method is NOT threadsafe, and should only be called
144      * from the same thread that will later check for commits
145      */
146     private void mergePendingTable() {
147       synchronized(mergedList) {
148         for(DelayedCommit dc:mergedList) {
149           
150           PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
151           if(pendList == null) {
152             pendList = new PriorityQueue<DelayedCommit>();
153             pendingCommits.put(dc.fname, pendList);
154           }
155           pendList.add(dc);
156         }
157         mergedList.clear();
158       } //end synchronized
159     }
160     
161     Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
162     private void checkForCommits() throws IOException, InterruptedException {
163       
164       log.info("checking for commited chunks");
165       GetMethod method = new GetMethod();
166       List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH); 
167 
168       //do an http get
169       for(String stat: parsedFStatuses) {
170         Matcher m = respLine.matcher(stat);
171         if(!m.matches())
172           continue;
173         String path = m.group(1);
174         Long committedOffset = Long.parseLong(m.group(2));
175 
176         PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
177         if(delayedOnFile == null)
178           continue;
179        
180         HashSet<Adaptor> committed = new HashSet<Adaptor>();
181         while(!delayedOnFile.isEmpty()) {
182           DelayedCommit fired = delayedOnFile.element();
183           if(fired.fOffset > committedOffset)
184             break;
185           else {
186             ChukwaAgent.Offset o = agent.offset(fired.adaptor);
187             if(o != null && fired.start > o.offset()) {
188               log.error("can't commit "+ o.adaptorID() +  "  without ordering assumption");
189               break; //don't commit
190             }
191             delayedOnFile.remove();
192             String s = agent.reportCommit(fired.adaptor, fired.uuid);
193             committed.add(fired.adaptor);
194             //TODO: if s == null, then the adaptor has been stopped.
195             //should we stop sending acks?
196             log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
197           }
198         }
199         adaptorReset.reportCommits(committed);
200       }
201     }
202 
203   } 
204   
205   CommitPollThread pollThread;
206   
207   //note that at present we don't actually run this thread; we just use its methods.
208   public AdaptorResetThread adaptorReset;
209   Configuration conf;
210   
211   public AsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
212     super(conf);
213     log.info("delayed-commit processing enabled");
214     agent = a;
215     
216     mergedList = new ArrayList<DelayedCommit>();
217     this.conf = conf;
218     adaptorReset = new AdaptorResetThread(conf, a);
219     adaptorReset.start();
220     //initialize the commitpoll later, once we have the list of collectors
221   }
222   
223   
224   @Override
225   public void setCollectors(Iterator<String> collectors) {
226    Iterator<String> tryList = null;
227    String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
228    try {
229      tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
230    } catch(IOException e) {
231      log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
232    }
233 
234    if(collectors instanceof RetryListOfCollectors) {
235      super.setCollectors(collectors);
236      if(tryList == null)
237        tryList = ((RetryListOfCollectors) collectors).clone();
238    } 
239    else {
240      ArrayList<String> l = new ArrayList<String>();
241      while(collectors.hasNext())
242        l.add(collectors.next());
243      super.setCollectors(l.iterator());
244      if(tryList == null)
245        tryList = l.iterator();
246    }
247 
248    pollThread = new CommitPollThread(conf, tryList);
249    pollThread.setDaemon(true);
250    pollThread.start();
251   }
252   
253   /*
254    * This method is the interface from AsyncAckSender to the CommitPollThread --
255    * it gets a lock on the merge table, and then updates it with a batch of pending acks
256    *
257    *  This method is called from the thread doing a post; the merge table is
258    *  read by the CommitPollThread when it figures out what commits are expected.
259    */
260   private void delayCommits(List<DelayedCommit> delayed) {
261     Collections.sort(delayed);
262     
263     synchronized(mergedList) {
264       DelayedCommit region =null;
265       for(DelayedCommit cur: delayed) {
266         if(region == null)
267           region = cur;
268         else if((cur.adaptor == region.adaptor) &&
269             cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
270           //since the list is sorted, region.start < cur.start
271           region.uuid = Math.max(region.uuid, cur.uuid); //merge
272           region.fOffset = Math.max(region.fOffset, cur.fOffset);
273         } else {
274           mergedList.add(region);
275           region= cur;
276         }
277       }
278       mergedList.add(region);
279     }
280   }
281   
282   
283   Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
284   @Override
285   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
286       List<CommitListEntry> expectedCommitResults)
287   throws IOException, InterruptedException {
288     adaptorReset.reportPending(expectedCommitResults);
289     List<String> resp = reliablySend(method, ServletCollector.PATH);
290       //expect most of 'em to be delayed
291     List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
292     ArrayList<CommitListEntry> result =  new ArrayList<CommitListEntry>();
293     
294     for(int i = 0; i < resp.size(); ++i)  {
295       if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
296         result.add(expectedCommitResults.get(i));
297       else {
298         CommitListEntry cle = expectedCommitResults.get(i);
299         Matcher m = partialCommitPat.matcher(resp.get(i));
300         if(!m.matches())
301           log.warn("unexpected response: "+ resp.get(i));
302         else
303           log.info("waiting for " + m.group(1) + " to hit " + m.group(2) + 
304               " before committing "+ agent.getAdaptorName(cle.adaptor));
305         
306         String name = agent.getAdaptorName(cle.adaptor);
307         if(name != null)//null name implies adaptor no longer running
308           toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1), 
309                 Long.parseLong(m.group(2)), name));
310       }
311     }
312     delayCommits(toDelay);
313     return result;
314   }
315   
316   @Override
317   protected boolean failedCollector(String downed) {
318     log.info("collector "+ downed + " down; resetting adaptors");
319     adaptorReset.resetTimedOutAdaptors(0); //reset all adaptors with outstanding data.
320     return false;
321   }
322   
323   @Override
324   public void stop() {
325     pollThread.shutdown();
326   }
327 
328 }