This project has retired. For details please refer to its Attic page.
AsyncAckSender xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.sender;
19  
20  import java.io.IOException;
21  import org.apache.hadoop.chukwa.datacollection.DataFactory;
22  import org.apache.hadoop.chukwa.datacollection.agent.*;
23  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24  import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26  import java.util.*;
27  import java.util.regex.Matcher;
28  import java.util.regex.Pattern;
29  import org.apache.hadoop.conf.*;
30  import org.apache.commons.httpclient.methods.GetMethod;
31  import org.apache.commons.httpclient.methods.PostMethod;
32  //import com.google.common.collect.SortedSetMultimap;
33  //import com.google.common.collect.TreeMultimap;
34  
35  import org.apache.log4j.Logger;
36  
37  /**
38   * An enhancement to ChukwaHttpSender that handles asynchronous acknowledgment.
39   * 
40   * This class will periodically poll the collectors to find out how much data
41   * has been committed to HDFS, and will then pass those acks on to the Agent.
42   */
43  public class AsyncAckSender extends ChukwaHttpSender{
44    
45    protected final static Logger log = Logger.getLogger(AsyncAckSender.class);
46    /*
47     * Represents the state required for an asynchronous ack.
48     * 
49     * Supplements CommitListEntry with a filename and offset;
50     * the data commits when that file reaches that length.
51     */
52    public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
53      final String fname;
54      long fOffset;
55      final String aName;
56      public DelayedCommit(Adaptor a, long uuid, long len, String fname, 
57          long offset, String aName) {
58        super(a, uuid, len);
59        this.fname = fname;
60        this.fOffset = offset;
61        this.aName = aName;
62      }
63  
64      @Override
65      public int hashCode() {
66        return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
67      }
68      
69      //sort by adaptor name first, then by start offset
70      //note that returning 1 means this is "greater" than RHS
71      public int compareTo(DelayedCommit o) {
72        int c = o.aName.compareTo(this.aName);
73        if(c != 0)
74          return c;
75        c = o.fname.compareTo(this.fname);
76        if(c != 0)
77          return c;
78        if(o.start < start)
79          return 1;
80        else if(o.start > start)
81          return -1;
82        else return 0;
83      }
84  
85      @Override
86      public boolean equals(Object o) {
87        if(!(o instanceof DelayedCommit)) {
88          return false;
89        }
90        DelayedCommit dc = (DelayedCommit) o;
91        if(this.aName.equals(dc.aName)) {
92          return true;
93        }
94        return false;
95      }
96  
97      public String toString() {
98        return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
99      }
100   }
101   
102   public static final String POLLPERIOD_OPT = "connector.commitpoll.period";
103   public static final String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
104   final ChukwaAgent agent;
105   
106   /*
107    * The list of commits that we're expecting.
108    * This is the structure used to pass the list to the CommitPollThread.  
109    * Adjacent commits to the same file will be coalesced.
110    * 
111    */
112   final List<DelayedCommit> mergedList;
113   
114   /**
115    * Periodically scans a subset of the collectors, looking for committed files.
116    * This way, not every collector is pestering the namenode with periodic lses.
117    */
118   final class CommitPollThread extends Thread {
119     private ChukwaHttpSender scanPath;
120     private int pollPeriod = 1000 * 30;
121 
122 
123     private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
124 
125     CommitPollThread(Configuration conf, Iterator<String> tryList) {
126       pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
127       scanPath = new ChukwaHttpSender(conf);
128       scanPath.setCollectors(tryList);
129       pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
130     }
131 
132     private volatile boolean running = true;
133     public void shutdown() {
134       running = false;
135       this.interrupt();
136     }
137     
138     public void run() {
139       try {
140         while(running) {
141           Thread.sleep(pollPeriod);
142           //update table using list of pending delayed commits, in this thread
143           checkForCommits();
144           mergePendingTable();
145         }
146       } catch(InterruptedException e) {}
147       catch(IOException e) {
148         log.error(e);
149       }
150     } 
151     
152     /*
153      * Note that this method is NOT threadsafe, and should only be called
154      * from the same thread that will later check for commits
155      */
156     private void mergePendingTable() {
157       synchronized(mergedList) {
158         for(DelayedCommit dc:mergedList) {
159           
160           PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
161           if(pendList == null) {
162             pendList = new PriorityQueue<DelayedCommit>();
163             pendingCommits.put(dc.fname, pendList);
164           }
165           pendList.add(dc);
166         }
167         mergedList.clear();
168       } //end synchronized
169     }
170     
171     Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
172     private void checkForCommits() throws IOException, InterruptedException {
173       
174       log.info("checking for commited chunks");
175       GetMethod method = new GetMethod();
176       List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH); 
177 
178       //do an http get
179       for(String stat: parsedFStatuses) {
180         Matcher m = respLine.matcher(stat);
181         if(!m.matches())
182           continue;
183         String path = m.group(1);
184         Long committedOffset = Long.parseLong(m.group(2));
185 
186         PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
187         if(delayedOnFile == null)
188           continue;
189        
190         HashSet<Adaptor> committed = new HashSet<Adaptor>();
191         while(!delayedOnFile.isEmpty()) {
192           DelayedCommit fired = delayedOnFile.element();
193           if(fired.fOffset > committedOffset)
194             break;
195           else {
196             ChukwaAgent.Offset o = agent.offset(fired.adaptor);
197             if(o != null && fired.start > o.offset()) {
198               log.error("can't commit "+ o.adaptorID() +  "  without ordering assumption");
199               break; //don't commit
200             }
201             delayedOnFile.remove();
202             String s = agent.reportCommit(fired.adaptor, fired.uuid);
203             committed.add(fired.adaptor);
204             //TODO: if s == null, then the adaptor has been stopped.
205             //should we stop sending acks?
206             log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
207           }
208         }
209         adaptorReset.reportCommits(committed);
210       }
211     }
212 
213   } 
214   
215   CommitPollThread pollThread;
216   
217   //note that at present we don't actually run this thread; we just use its methods.
218   public AdaptorResetThread adaptorReset;
219   Configuration conf;
220   
221   public AsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
222     super(conf);
223     log.info("delayed-commit processing enabled");
224     agent = a;
225     
226     mergedList = new ArrayList<DelayedCommit>();
227     this.conf = conf;
228     adaptorReset = new AdaptorResetThread(conf, a);
229     adaptorReset.start();
230     //initialize the commitpoll later, once we have the list of collectors
231   }
232   
233   
234   @Override
235   public void setCollectors(Iterator<String> collectors) {
236    Iterator<String> tryList = null;
237    String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
238    try {
239      tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
240    } catch(IOException e) {
241      log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
242    }
243 
244    if(collectors instanceof RetryListOfCollectors) {
245      super.setCollectors(collectors);
246      if(tryList == null)
247        tryList = ((RetryListOfCollectors) collectors).clone();
248    } 
249    else {
250      ArrayList<String> l = new ArrayList<String>();
251      while(collectors.hasNext())
252        l.add(collectors.next());
253      super.setCollectors(l.iterator());
254      if(tryList == null)
255        tryList = l.iterator();
256    }
257 
258    pollThread = new CommitPollThread(conf, tryList);
259    pollThread.setDaemon(true);
260    pollThread.start();
261   }
262   
263   /*
264    * This method is the interface from AsyncAckSender to the CommitPollThread --
265    * it gets a lock on the merge table, and then updates it with a batch of pending acks
266    *
267    *  This method is called from the thread doing a post; the merge table is
268    *  read by the CommitPollThread when it figures out what commits are expected.
269    */
270   private void delayCommits(List<DelayedCommit> delayed) {
271     Collections.sort(delayed);
272     
273     synchronized(mergedList) {
274       DelayedCommit region =null;
275       for(DelayedCommit cur: delayed) {
276         if(region == null)
277           region = cur;
278         else if((cur.adaptor == region.adaptor) &&
279             cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
280           //since the list is sorted, region.start < cur.start
281           region.uuid = Math.max(region.uuid, cur.uuid); //merge
282           region.fOffset = Math.max(region.fOffset, cur.fOffset);
283         } else {
284           mergedList.add(region);
285           region= cur;
286         }
287       }
288       mergedList.add(region);
289     }
290   }
291   
292   
293   Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
294   @Override
295   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
296       List<CommitListEntry> expectedCommitResults)
297   throws IOException, InterruptedException {
298     adaptorReset.reportPending(expectedCommitResults);
299     List<String> resp = reliablySend(method, ServletCollector.PATH);
300       //expect most of 'em to be delayed
301     List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
302     ArrayList<CommitListEntry> result =  new ArrayList<CommitListEntry>();
303     
304     for(int i = 0; i < resp.size(); ++i)  {
305       if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
306         result.add(expectedCommitResults.get(i));
307       else {
308         CommitListEntry cle = expectedCommitResults.get(i);
309         Matcher m = partialCommitPat.matcher(resp.get(i));
310         if(!m.matches())
311           log.warn("unexpected response: "+ resp.get(i));
312         else
313           log.info("waiting for " + m.group(1) + " to hit " + m.group(2) + 
314               " before committing "+ agent.getAdaptorName(cle.adaptor));
315         
316         String name = agent.getAdaptorName(cle.adaptor);
317         if(name != null)//null name implies adaptor no longer running
318           toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1), 
319                 Long.parseLong(m.group(2)), name));
320       }
321     }
322     delayCommits(toDelay);
323     return result;
324   }
325   
326   @Override
327   protected boolean failedCollector(String downed) {
328     log.info("collector "+ downed + " down; resetting adaptors");
329     adaptorReset.resetTimedOutAdaptors(0); //reset all adaptors with outstanding data.
330     return false;
331   }
332   
333   @Override
334   public void stop() {
335     pollThread.shutdown();
336   }
337 
338 }