This project has retired. For details please refer to its Attic page.
AdaptorResetThread xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.agent;
19  
20  import java.util.*;
21  import org.apache.hadoop.conf.*;
22  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
23  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
24  import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25  import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
26  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
27  import org.apache.log4j.Logger;
28  
29  public class AdaptorResetThread extends Thread {
30    
31    static Logger log = Logger.getLogger(AdaptorResetThread.class);
32    public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
33    
34    
35    int resetCount = 0;
36    private static class AdaptorStat {
37      long lastCommitTime = 0;
38      long maxByteSent = 0 ;
39      public AdaptorStat(long lastCommit, long maxByte) {
40        maxByteSent = maxByte;
41        lastCommitTime = lastCommit;
42      }
43    }
44  
45  
46    int timeout = 15*60 * 1000; //default to wait fifteen minutes for an ack
47                 //note that this is overridden using the poll and rotate periods.
48    
49    Map<Adaptor, AdaptorStat> status;
50    ChukwaAgent agent;
51    private volatile boolean running = true;
52    
53    public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
54          //
55      timeout =  conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3) 
56          + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3) 
57          + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
58      
59      timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
60       
61      status = new LinkedHashMap<Adaptor, AdaptorStat>();
62      this.agent = a;
63      this.setDaemon(true);
64    }
65    
66    /**
67     * Resets all adaptors with outstanding data more than timeSinceLastCommit old.
68     * @param timeSinceLastCommit
69     * @return the number of reset adaptors
70     */
71    public int resetTimedOutAdaptors(int timeSinceLastCommit) {
72      int resetThisTime = 0;
73      long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
74      List<Adaptor> toResetList = new ArrayList<Adaptor>(); //also contains stopped 
75      //adaptors
76      synchronized(this) {
77        for(Map.Entry<Adaptor, AdaptorStat> ent: status.entrySet()) {
78          AdaptorStat stat = ent.getValue();
79          ChukwaAgent.Offset off = agent.offset(ent.getKey());
80          if(off == null) {
81            toResetList.add(ent.getKey());
82          } else if(stat.maxByteSent > off.offset  //some data outstanding
83              && stat.lastCommitTime < timeoutThresh) { //but no progress made
84            toResetList.add(ent.getKey());
85            log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
86                "last commit was ");
87          }
88        }
89      }
90      
91      for(Adaptor a: toResetList) {
92        status.remove(a); //it'll get added again when adaptor resumes, if it does
93        ChukwaAgent.Offset off = agent.offset(a);
94        if(off != null) {
95          agent.stopAdaptor(off.id, AdaptorShutdownPolicy.RESTARTING);
96          
97          String a_status = a.getCurrentStatus();
98          agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
99               + " "+ a_status + " " + off.offset);
100         resetThisTime ++;
101         //will be implicitly added to table once adaptor starts sending
102       } 
103        //implicitly do nothing if adaptor was stopped. We already removed
104       //its entry from the status table.
105     }
106     resetCount += resetThisTime;
107     return resetThisTime;
108   }
109   
110   public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
111     long now = System.currentTimeMillis();
112     for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
113       AdaptorStat a = status.get(dc.adaptor);
114       if(a == null)
115         status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
116       else if(a.maxByteSent < dc.uuid)
117           a.maxByteSent = dc.uuid;
118     }
119   }
120   
121   public synchronized void reportCommits(Set<Adaptor> commits) {
122     long now = System.currentTimeMillis();
123     for(Adaptor a: commits) {
124       if(status.containsKey(a)) {
125         status.get(a).lastCommitTime = now;
126       } else
127         log.warn("saw commit for adaptor " + a + " before seeing sends"); 
128     }
129   }
130   
131   public void reportStop(Adaptor a) {
132     status.remove(a);
133   }
134   
135   public void run() {
136     try {
137       while(running) {
138         Thread.sleep(timeout/2);
139         resetTimedOutAdaptors(timeout);
140       }
141     } catch(InterruptedException e) {}
142   } 
143   
144   public int getResetCount() {
145     return resetCount;
146   }
147 }