1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.sender;
19
20 import java.io.IOException;
21 import org.apache.hadoop.chukwa.datacollection.DataFactory;
22 import org.apache.hadoop.chukwa.datacollection.agent.*;
23 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26 import java.util.*;
27 import java.util.regex.Matcher;
28 import java.util.regex.Pattern;
29 import org.apache.hadoop.conf.*;
30 import org.apache.commons.httpclient.methods.GetMethod;
31 import org.apache.commons.httpclient.methods.PostMethod;
32
33
34
35 import org.apache.log4j.Logger;
36
37
38
39
40
41
42
43 public class AsyncAckSender extends ChukwaHttpSender{
44
45 protected final static Logger log = Logger.getLogger(AsyncAckSender.class);
46
47
48
49
50
51
52 public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
53 final String fname;
54 long fOffset;
55 final String aName;
56 public DelayedCommit(Adaptor a, long uuid, long len, String fname,
57 long offset, String aName) {
58 super(a, uuid, len);
59 this.fname = fname;
60 this.fOffset = offset;
61 this.aName = aName;
62 }
63
64 @Override
65 public int hashCode() {
66 return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
67 }
68
69
70
71 public int compareTo(DelayedCommit o) {
72 int c = o.aName.compareTo(this.aName);
73 if(c != 0)
74 return c;
75 c = o.fname.compareTo(this.fname);
76 if(c != 0)
77 return c;
78 if(o.start < start)
79 return 1;
80 else if(o.start > start)
81 return -1;
82 else return 0;
83 }
84
85 @Override
86 public boolean equals(Object o) {
87 if(!(o instanceof DelayedCommit)) {
88 return false;
89 }
90 DelayedCommit dc = (DelayedCommit) o;
91 if(this.aName.equals(dc.aName)) {
92 return true;
93 }
94 return false;
95 }
96
97 public String toString() {
98 return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
99 }
100 }
101
102 public static final String POLLPERIOD_OPT = "connector.commitpoll.period";
103 public static final String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
104 final ChukwaAgent agent;
105
106
107
108
109
110
111
112 final List<DelayedCommit> mergedList;
113
114
115
116
117
118 final class CommitPollThread extends Thread {
119 private ChukwaHttpSender scanPath;
120 private int pollPeriod = 1000 * 30;
121
122
123 private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
124
125 CommitPollThread(Configuration conf, Iterator<String> tryList) {
126 pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
127 scanPath = new ChukwaHttpSender(conf);
128 scanPath.setCollectors(tryList);
129 pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
130 }
131
132 private volatile boolean running = true;
133 public void shutdown() {
134 running = false;
135 this.interrupt();
136 }
137
138 public void run() {
139 try {
140 while(running) {
141 Thread.sleep(pollPeriod);
142
143 checkForCommits();
144 mergePendingTable();
145 }
146 } catch(InterruptedException e) {}
147 catch(IOException e) {
148 log.error(e);
149 }
150 }
151
152
153
154
155
156 private void mergePendingTable() {
157 synchronized(mergedList) {
158 for(DelayedCommit dc:mergedList) {
159
160 PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
161 if(pendList == null) {
162 pendList = new PriorityQueue<DelayedCommit>();
163 pendingCommits.put(dc.fname, pendList);
164 }
165 pendList.add(dc);
166 }
167 mergedList.clear();
168 }
169 }
170
171 Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
172 private void checkForCommits() throws IOException, InterruptedException {
173
174 log.info("checking for commited chunks");
175 GetMethod method = new GetMethod();
176 List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH);
177
178
179 for(String stat: parsedFStatuses) {
180 Matcher m = respLine.matcher(stat);
181 if(!m.matches())
182 continue;
183 String path = m.group(1);
184 Long committedOffset = Long.parseLong(m.group(2));
185
186 PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
187 if(delayedOnFile == null)
188 continue;
189
190 HashSet<Adaptor> committed = new HashSet<Adaptor>();
191 while(!delayedOnFile.isEmpty()) {
192 DelayedCommit fired = delayedOnFile.element();
193 if(fired.fOffset > committedOffset)
194 break;
195 else {
196 ChukwaAgent.Offset o = agent.offset(fired.adaptor);
197 if(o != null && fired.start > o.offset()) {
198 log.error("can't commit "+ o.adaptorID() + " without ordering assumption");
199 break;
200 }
201 delayedOnFile.remove();
202 String s = agent.reportCommit(fired.adaptor, fired.uuid);
203 committed.add(fired.adaptor);
204
205
206 log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
207 }
208 }
209 adaptorReset.reportCommits(committed);
210 }
211 }
212
213 }
214
215 CommitPollThread pollThread;
216
217
218 public AdaptorResetThread adaptorReset;
219 Configuration conf;
220
221 public AsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
222 super(conf);
223 log.info("delayed-commit processing enabled");
224 agent = a;
225
226 mergedList = new ArrayList<DelayedCommit>();
227 this.conf = conf;
228 adaptorReset = new AdaptorResetThread(conf, a);
229 adaptorReset.start();
230
231 }
232
233
234 @Override
235 public void setCollectors(Iterator<String> collectors) {
236 Iterator<String> tryList = null;
237 String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
238 try {
239 tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
240 } catch(IOException e) {
241 log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
242 }
243
244 if(collectors instanceof RetryListOfCollectors) {
245 super.setCollectors(collectors);
246 if(tryList == null)
247 tryList = ((RetryListOfCollectors) collectors).clone();
248 }
249 else {
250 ArrayList<String> l = new ArrayList<String>();
251 while(collectors.hasNext())
252 l.add(collectors.next());
253 super.setCollectors(l.iterator());
254 if(tryList == null)
255 tryList = l.iterator();
256 }
257
258 pollThread = new CommitPollThread(conf, tryList);
259 pollThread.setDaemon(true);
260 pollThread.start();
261 }
262
263
264
265
266
267
268
269
270 private void delayCommits(List<DelayedCommit> delayed) {
271 Collections.sort(delayed);
272
273 synchronized(mergedList) {
274 DelayedCommit region =null;
275 for(DelayedCommit cur: delayed) {
276 if(region == null)
277 region = cur;
278 else if((cur.adaptor == region.adaptor) &&
279 cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
280
281 region.uuid = Math.max(region.uuid, cur.uuid);
282 region.fOffset = Math.max(region.fOffset, cur.fOffset);
283 } else {
284 mergedList.add(region);
285 region= cur;
286 }
287 }
288 mergedList.add(region);
289 }
290 }
291
292
293 Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
294 @Override
295 public List<CommitListEntry> postAndParseResponse(PostMethod method,
296 List<CommitListEntry> expectedCommitResults)
297 throws IOException, InterruptedException {
298 adaptorReset.reportPending(expectedCommitResults);
299 List<String> resp = reliablySend(method, ServletCollector.PATH);
300
301 List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
302 ArrayList<CommitListEntry> result = new ArrayList<CommitListEntry>();
303
304 for(int i = 0; i < resp.size(); ++i) {
305 if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
306 result.add(expectedCommitResults.get(i));
307 else {
308 CommitListEntry cle = expectedCommitResults.get(i);
309 Matcher m = partialCommitPat.matcher(resp.get(i));
310 if(!m.matches())
311 log.warn("unexpected response: "+ resp.get(i));
312 else
313 log.info("waiting for " + m.group(1) + " to hit " + m.group(2) +
314 " before committing "+ agent.getAdaptorName(cle.adaptor));
315
316 String name = agent.getAdaptorName(cle.adaptor);
317 if(name != null)
318 toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1),
319 Long.parseLong(m.group(2)), name));
320 }
321 }
322 delayCommits(toDelay);
323 return result;
324 }
325
326 @Override
327 protected boolean failedCollector(String downed) {
328 log.info("collector "+ downed + " down; resetting adaptors");
329 adaptorReset.resetTimedOutAdaptors(0);
330 return false;
331 }
332
333 @Override
334 public void stop() {
335 pollThread.shutdown();
336 }
337
338 }