This project has retired. For details please refer to its Attic page.
AdaptorResetThread xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.agent;
19  
20  import java.util.*;
21  import org.apache.hadoop.conf.*;
22  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
23  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
24  import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
25  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
26  import org.apache.log4j.Logger;
27  
28  public class AdaptorResetThread extends Thread {
29    
30    static Logger log = Logger.getLogger(AdaptorResetThread.class);
31    public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
32    
33    
34    int resetCount = 0;
35    private static class AdaptorStat {
36      long lastCommitTime = 0;
37      long maxByteSent = 0 ;
38      public AdaptorStat(long lastCommit, long maxByte) {
39        maxByteSent = maxByte;
40        lastCommitTime = lastCommit;
41      }
42    }
43  
44  
45    int timeout = 15*60 * 1000; //default to wait fifteen minutes for an ack
46                 //note that this is overridden using the poll and rotate periods.
47    
48    Map<Adaptor, AdaptorStat> status;
49    ChukwaAgent agent;
50    private volatile boolean running = true;
51    
52    public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
53          //
54      timeout =  conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2) 
55          + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/2);
56      
57      timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
58       
59      status = new LinkedHashMap<Adaptor, AdaptorStat>();
60      this.agent = a;
61      this.setDaemon(true);
62    }
63    
64    /**
65     * Resets all adaptors with outstanding data more than timeSinceLastCommit old.
66     * @param timeSinceLastCommit is millisecond since last check point
67     * @return the number of reset adaptors
68     */
69    public int resetTimedOutAdaptors(int timeSinceLastCommit) {
70      int resetThisTime = 0;
71      long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
72      List<Adaptor> toResetList = new ArrayList<Adaptor>(); //also contains stopped 
73      //adaptors
74      synchronized(this) {
75        for(Map.Entry<Adaptor, AdaptorStat> ent: status.entrySet()) {
76          AdaptorStat stat = ent.getValue();
77          ChukwaAgent.Offset off = agent.offset(ent.getKey());
78          if(off == null) {
79            toResetList.add(ent.getKey());
80          } else if(stat.maxByteSent > off.offset  //some data outstanding
81              && stat.lastCommitTime < timeoutThresh) { //but no progress made
82            toResetList.add(ent.getKey());
83            log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
84                "last commit was ");
85          }
86        }
87      }
88      
89      for(Adaptor a: toResetList) {
90        status.remove(a); //it'll get added again when adaptor resumes, if it does
91        ChukwaAgent.Offset off = agent.offset(a);
92        if(off != null) {
93          agent.stopAdaptor(off.id, AdaptorShutdownPolicy.RESTARTING);
94          
95          String a_status = a.getCurrentStatus();
96          agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
97               + " "+ a_status + " " + off.offset);
98          resetThisTime ++;
99          //will be implicitly added to table once adaptor starts sending
100       } 
101        //implicitly do nothing if adaptor was stopped. We already removed
102       //its entry from the status table.
103     }
104     resetCount += resetThisTime;
105     return resetThisTime;
106   }
107   
108   public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
109     long now = System.currentTimeMillis();
110     for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
111       AdaptorStat a = status.get(dc.adaptor);
112       if(a == null)
113         status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
114       else if(a.maxByteSent < dc.uuid)
115           a.maxByteSent = dc.uuid;
116     }
117   }
118   
119   public synchronized void reportCommits(Set<Adaptor> commits) {
120     long now = System.currentTimeMillis();
121     for(Adaptor a: commits) {
122       if(status.containsKey(a)) {
123         status.get(a).lastCommitTime = now;
124       } else
125         log.warn("saw commit for adaptor " + a + " before seeing sends"); 
126     }
127   }
128   
129   public void reportStop(Adaptor a) {
130     status.remove(a);
131   }
132   
133   public void run() {
134     try {
135       while(running) {
136         Thread.sleep(timeout/2);
137         resetTimedOutAdaptors(timeout);
138       }
139     } catch(InterruptedException e) {}
140   } 
141   
142   public int getResetCount() {
143     return resetCount;
144   }
145 }