This project has retired. For details please refer to its
Attic page.
AdaptorResetThread xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.agent;
19
20 import java.util.*;
21 import org.apache.hadoop.conf.*;
22 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
23 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
24 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
26 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
27 import org.apache.log4j.Logger;
28
29 public class AdaptorResetThread extends Thread {
30
31 static Logger log = Logger.getLogger(AdaptorResetThread.class);
32 public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
33
34
35 int resetCount = 0;
36 private static class AdaptorStat {
37 long lastCommitTime = 0;
38 long maxByteSent = 0 ;
39 public AdaptorStat(long lastCommit, long maxByte) {
40 maxByteSent = maxByte;
41 lastCommitTime = lastCommit;
42 }
43 }
44
45
46 int timeout = 15*60 * 1000;
47
48
49 Map<Adaptor, AdaptorStat> status;
50 ChukwaAgent agent;
51 private volatile boolean running = true;
52
53 public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
54
55 timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3)
56 + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3)
57 + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
58
59 timeout = conf.getInt(TIMEOUT_OPT, timeout);
60
61 status = new LinkedHashMap<Adaptor, AdaptorStat>();
62 this.agent = a;
63 this.setDaemon(true);
64 }
65
66
67
68
69
70
71 public int resetTimedOutAdaptors(int timeSinceLastCommit) {
72 int resetThisTime = 0;
73 long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
74 List<Adaptor> toResetList = new ArrayList<Adaptor>();
75
76 synchronized(this) {
77 for(Map.Entry<Adaptor, AdaptorStat> ent: status.entrySet()) {
78 AdaptorStat stat = ent.getValue();
79 ChukwaAgent.Offset off = agent.offset(ent.getKey());
80 if(off == null) {
81 toResetList.add(ent.getKey());
82 } else if(stat.maxByteSent > off.offset
83 && stat.lastCommitTime < timeoutThresh) {
84 toResetList.add(ent.getKey());
85 log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
86 "last commit was ");
87 }
88 }
89 }
90
91 for(Adaptor a: toResetList) {
92 status.remove(a);
93 ChukwaAgent.Offset off = agent.offset(a);
94 if(off != null) {
95 agent.stopAdaptor(off.id, AdaptorShutdownPolicy.RESTARTING);
96
97 String a_status = a.getCurrentStatus();
98 agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
99 + " "+ a_status + " " + off.offset);
100 resetThisTime ++;
101
102 }
103
104
105 }
106 resetCount += resetThisTime;
107 return resetThisTime;
108 }
109
110 public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
111 long now = System.currentTimeMillis();
112 for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
113 AdaptorStat a = status.get(dc.adaptor);
114 if(a == null)
115 status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
116 else if(a.maxByteSent < dc.uuid)
117 a.maxByteSent = dc.uuid;
118 }
119 }
120
121 public synchronized void reportCommits(Set<Adaptor> commits) {
122 long now = System.currentTimeMillis();
123 for(Adaptor a: commits) {
124 if(status.containsKey(a)) {
125 status.get(a).lastCommitTime = now;
126 } else
127 log.warn("saw commit for adaptor " + a + " before seeing sends");
128 }
129 }
130
131 public void reportStop(Adaptor a) {
132 status.remove(a);
133 }
134
135 public void run() {
136 try {
137 while(running) {
138 Thread.sleep(timeout/2);
139 resetTimedOutAdaptors(timeout);
140 }
141 } catch(InterruptedException e) {}
142 }
143
144 public int getResetCount() {
145 return resetCount;
146 }
147 }