This project has retired. For details please refer to its
Attic page.
AdaptorResetThread xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.agent;
19
20 import java.util.*;
21 import org.apache.hadoop.conf.*;
22 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
23 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
24 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
25 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
26 import org.apache.log4j.Logger;
27
28 public class AdaptorResetThread extends Thread {
29
30 static Logger log = Logger.getLogger(AdaptorResetThread.class);
31 public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
32
33
34 int resetCount = 0;
35 private static class AdaptorStat {
36 long lastCommitTime = 0;
37 long maxByteSent = 0 ;
38 public AdaptorStat(long lastCommit, long maxByte) {
39 maxByteSent = maxByte;
40 lastCommitTime = lastCommit;
41 }
42 }
43
44
45 int timeout = 15*60 * 1000;
46
47
48 Map<Adaptor, AdaptorStat> status;
49 ChukwaAgent agent;
50 private volatile boolean running = true;
51
52 public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
53
54 timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2)
55 + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/2);
56
57 timeout = conf.getInt(TIMEOUT_OPT, timeout);
58
59 status = new LinkedHashMap<Adaptor, AdaptorStat>();
60 this.agent = a;
61 this.setDaemon(true);
62 }
63
64
65
66
67
68
69 public int resetTimedOutAdaptors(int timeSinceLastCommit) {
70 int resetThisTime = 0;
71 long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
72 List<Adaptor> toResetList = new ArrayList<Adaptor>();
73
74 synchronized(this) {
75 for(Map.Entry<Adaptor, AdaptorStat> ent: status.entrySet()) {
76 AdaptorStat stat = ent.getValue();
77 ChukwaAgent.Offset off = agent.offset(ent.getKey());
78 if(off == null) {
79 toResetList.add(ent.getKey());
80 } else if(stat.maxByteSent > off.offset
81 && stat.lastCommitTime < timeoutThresh) {
82 toResetList.add(ent.getKey());
83 log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
84 "last commit was ");
85 }
86 }
87 }
88
89 for(Adaptor a: toResetList) {
90 status.remove(a);
91 ChukwaAgent.Offset off = agent.offset(a);
92 if(off != null) {
93 agent.stopAdaptor(off.id, AdaptorShutdownPolicy.RESTARTING);
94
95 String a_status = a.getCurrentStatus();
96 agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
97 + " "+ a_status + " " + off.offset);
98 resetThisTime ++;
99
100 }
101
102
103 }
104 resetCount += resetThisTime;
105 return resetThisTime;
106 }
107
108 public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
109 long now = System.currentTimeMillis();
110 for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
111 AdaptorStat a = status.get(dc.adaptor);
112 if(a == null)
113 status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
114 else if(a.maxByteSent < dc.uuid)
115 a.maxByteSent = dc.uuid;
116 }
117 }
118
119 public synchronized void reportCommits(Set<Adaptor> commits) {
120 long now = System.currentTimeMillis();
121 for(Adaptor a: commits) {
122 if(status.containsKey(a)) {
123 status.get(a).lastCommitTime = now;
124 } else
125 log.warn("saw commit for adaptor " + a + " before seeing sends");
126 }
127 }
128
129 public void reportStop(Adaptor a) {
130 status.remove(a);
131 }
132
133 public void run() {
134 try {
135 while(running) {
136 Thread.sleep(timeout/2);
137 resetTimedOutAdaptors(timeout);
138 }
139 } catch(InterruptedException e) {}
140 }
141
142 public int getResetCount() {
143 return resetCount;
144 }
145 }