This project has retired. For details please refer to its
Attic page.
ChukwaAgentController xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.controller;
20
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStreamWriter;
27 import java.io.PrintWriter;
28 import java.net.Socket;
29 import java.net.SocketException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Timer;
37 import java.util.TimerTask;
38 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39 import org.apache.log4j.Logger;
40
41
42
43
44
45
46
47 public class ChukwaAgentController {
48 static Logger log = Logger.getLogger(ChukwaAgentController.class);
49
50 public class AddAdaptorTask extends TimerTask {
51
52 String adaptorName;
53 String type;
54 String params;
55 private long offset;
56 long numRetries;
57 long retryInterval;
58
59 AddAdaptorTask(String adaptorName, String type, String params, long offset,
60 long numRetries, long retryInterval) {
61 this.adaptorName = adaptorName;
62 this.type = type;
63 this.params = params;
64 this.offset = offset;
65 this.numRetries = numRetries;
66 this.retryInterval = retryInterval;
67 }
68
69 @Override
70 public void run() {
71 try {
72 log.info("Trying to resend the add command [" + adaptorName + "]["
73 + offset + "][" + params + "] [" + numRetries + "]");
74 addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
75 } catch (Exception e) {
76 log.warn("Exception in AddAdaptorTask.run", e);
77 e.printStackTrace();
78 }
79 }
80 }
81
82
83 public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
84 public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
85
86 static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
87 static int DEFAULT_PORT = 9093;
88 static String DEFAULT_HOST = "localhost";
89 static int numArgs = 0;
90
91 class Adaptor {
92 public String id;
93 final public String className;
94 final public String params;
95 final public String appType;
96 public long offset;
97
98 Adaptor(String className, String appType, String params, long offset) {
99 this.className = className;
100 this.appType = appType;
101 this.params = params;
102 this.offset = offset;
103 }
104
105 Adaptor(String id, String className, String appType, String params,
106 long offset) {
107 this.id = id;
108 this.className = className;
109 this.appType = appType;
110 this.params = params;
111 this.offset = offset;
112 }
113
114
115
116
117
118
119
120
121
122 String register() throws IOException {
123 Socket s = new Socket(hostname, portno);
124 try {
125 s.setSoTimeout(60000);
126 } catch (SocketException e) {
127 log.warn("Error while settin soTimeout to 60000");
128 e.printStackTrace();
129 }
130 PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
131 .getOutputStream()));
132 if(id != null)
133 bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
134 else
135 bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
136 bw.flush();
137 BufferedReader br = new BufferedReader(new InputStreamReader(s
138 .getInputStream()));
139 String resp = br.readLine();
140 if (resp != null) {
141 String[] fields = resp.split(" ");
142 if (fields[0].equals("OK")) {
143 id = fields[fields.length - 1];
144 }
145 }
146 s.close();
147 return id;
148 }
149
150 void unregister() throws IOException {
151 Socket s = new Socket(hostname, portno);
152 try {
153 s.setSoTimeout(60000);
154 } catch (SocketException e) {
155 log.warn("Error while settin soTimeout to 60000");
156 e.printStackTrace();
157 }
158 PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
159 .getOutputStream()));
160 bw.println("SHUTDOWN " + id);
161 bw.flush();
162
163 BufferedReader br = new BufferedReader(new InputStreamReader(s
164 .getInputStream()));
165 String resp = br.readLine();
166 if (resp == null || !resp.startsWith("OK")) {
167 log.error("adaptor unregister error, id: " + id);
168 } else if (resp.startsWith("OK")) {
169 String[] respSplit = resp.split(" ");
170 String newOffset = respSplit[respSplit.length - 1];
171 try {
172 offset = Long.parseLong(newOffset);
173 } catch (NumberFormatException nfe) {
174 log.error("adaptor didn't shutdown gracefully.\n" + nfe);
175 }
176 }
177
178 s.close();
179 }
180
181 public String toString() {
182 String[] namePieces = className.split("\\.");
183 String shortName = namePieces[namePieces.length - 1];
184 return id + " " + shortName + " " + appType + " " + params + " " + offset;
185 }
186 }
187
188 Map<String, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<String, Adaptor>();
189 Map<String, ChukwaAgentController.Adaptor> runningInstanceAdaptors = new HashMap<String, Adaptor>();
190 Map<String, ChukwaAgentController.Adaptor> pausedAdaptors;
191 String hostname;
192 int portno;
193
194 public ChukwaAgentController() {
195 portno = DEFAULT_PORT;
196 hostname = DEFAULT_HOST;
197 pausedAdaptors = new HashMap<String, Adaptor>();
198
199 syncWithAgent();
200 }
201
202 public ChukwaAgentController(String hostname, int portno) {
203 this.hostname = hostname;
204 this.portno = portno;
205 pausedAdaptors = new HashMap<String, Adaptor>();
206
207 syncWithAgent();
208 }
209
210 private boolean syncWithAgent() {
211
212 try {
213 runningAdaptors = list();
214 return true;
215 } catch (IOException e) {
216 System.err.println("Error initializing ChukwaClient with list of "
217 + "currently registered adaptors, clearing our local list of adaptors");
218
219
220
221 runningAdaptors = new HashMap<String, ChukwaAgentController.Adaptor>();
222 return false;
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235 public String add(String adaptorName, String type, String params, long offset) {
236 return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);
237
238
239
240
241
242 }
243
244
245
246
247
248
249
250
251 public String addByName(String adaptorID, String adaptorName, String type, String params, long offset,
252 long numRetries, long retryInterval) {
253 ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
254 adaptorName, type, params, offset);
255 adaptor.id = adaptorID;
256 if (numRetries >= 0) {
257 try {
258 adaptorID = adaptor.register();
259
260 if (adaptorID != null) {
261 runningAdaptors.put(adaptorID, adaptor);
262 runningInstanceAdaptors.put(adaptorID, adaptor);
263 } else {
264 System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
265 }
266 } catch (IOException ioe) {
267 log.warn("AgentClient failed to contact the agent ("
268 + hostname + ":" + portno + ")");
269
270 log.warn("Scheduling a agent connection retry for adaptor add() in another "
271 + retryInterval
272 + " milliseconds, "
273 + numRetries
274 + " retries remaining");
275
276 Timer addFileTimer = new Timer();
277 addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
278 offset, numRetries - 1, retryInterval), retryInterval);
279 }
280 } else {
281 System.err.println("Giving up on connecting to the local agent");
282 }
283 return adaptorID;
284 }
285
286 public synchronized ChukwaAgentController.Adaptor remove(String adaptorID)
287 throws IOException {
288 syncWithAgent();
289 ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
290 if ( a != null ) {
291 a.unregister();
292 }
293 return a;
294
295 }
296
297 public void remove(String className, String appType, String filename)
298 throws IOException {
299 syncWithAgent();
300
301
302
303 for (Adaptor a : runningAdaptors.values()) {
304 if (a.className.equals(className) && a.params.equals(filename)
305 && a.appType.equals(appType)) {
306 remove(a.id);
307 }
308 }
309 }
310
311 public void removeAll() {
312 syncWithAgent();
313 ArrayList<String> keyset = new ArrayList<String>();
314 keyset.addAll( runningAdaptors.keySet());
315
316 for (String id : keyset) {
317 try {
318 remove(id);
319 } catch (IOException ioe) {
320 System.err.println("Error removing an adaptor in removeAll()");
321 ioe.printStackTrace();
322 }
323 log.info("Successfully removed adaptor " + id);
324 }
325 }
326
327 public void removeInstanceAdaptors() {
328
329
330
331
332 for (Adaptor a : runningInstanceAdaptors.values()) {
333 try {
334 remove(a.className, a.appType, a.params);
335 } catch (IOException ioe) {
336 log.warn("Error removing an adaptor in removeInstanceAdaptors()");
337 ioe.printStackTrace();
338 }
339 }
340 }
341
342 Map<String, ChukwaAgentController.Adaptor> list() throws IOException {
343 Socket s = new Socket(hostname, portno);
344 try {
345 s.setSoTimeout(60000);
346 } catch (SocketException e) {
347 log.warn("Error while settin soTimeout to 60000");
348 e.printStackTrace();
349 }
350 PrintWriter bw = new PrintWriter(
351 new OutputStreamWriter(s.getOutputStream()));
352
353 bw.println("LIST");
354 bw.flush();
355 BufferedReader br = new BufferedReader(new InputStreamReader(s
356 .getInputStream()));
357 String ln;
358 Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
359 while ((ln = br.readLine()) != null) {
360 if (ln.equals("")) {
361 break;
362 } else {
363 String[] parts = ln.split("\\s+");
364 if (parts.length >= 4) {
365
366 String id = parts[0].substring(0, parts[0].length() - 1);
367
368
369
370
371
372 long offset = Long.parseLong(parts[parts.length - 1]);
373 String tmpParams = parts[3];
374 for (int i = 4; i < parts.length - 1; i++) {
375 tmpParams += " " + parts[i];
376 }
377 listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
378 offset));
379 }
380 }
381 }
382 s.close();
383 return listResult;
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404 public String addFile(String appType, String filename, long numRetries,
405 long retryInterval) {
406 filename = new File(filename).getAbsolutePath();
407
408
409
410
411
412
413 boolean isDuplicate = false;
414 for (Adaptor a : runningAdaptors.values()) {
415 if (a.className.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
416 && a.params.endsWith(filename)) {
417 isDuplicate = true;
418 }
419 }
420 if (!isDuplicate) {
421 return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
422 numRetries, retryInterval);
423 } else {
424 log.info("An adaptor for filename \"" + filename
425 + "\", type \"" + appType
426 + "\", exists already, addFile() command aborted");
427 return null;
428 }
429 }
430
431 public String addFile(String appType, String filename) {
432 return addFile(appType, filename, 0, 0);
433 }
434
435
436
437
438
439
440
441
442
443
444
445
446
447 public Collection<String> pauseFile(String appType, String filename)
448 throws IOException {
449 syncWithAgent();
450
451
452
453 List<String> results = new ArrayList<String>();
454 for (Adaptor a : runningAdaptors.values()) {
455 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
456 && a.appType.equals(appType)) {
457 pausedAdaptors.put(a.id, a);
458 remove(a.id);
459 results.add(a.id);
460 }
461 }
462 return results;
463 }
464
465 public boolean isFilePaused(String appType, String filename) {
466 for (Adaptor a : pausedAdaptors.values()) {
467 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
468 && a.appType.equals(appType)) {
469 return true;
470 }
471 }
472 return false;
473 }
474
475
476
477
478
479
480
481
482
483
484
485 public Collection<String> resumeFile(String appType, String filename)
486 throws IOException {
487 syncWithAgent();
488
489 List<String> results = new ArrayList<String>();
490 for (Adaptor a : pausedAdaptors.values()) {
491 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
492 && a.appType.equals(appType)) {
493 String newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
494 + filename, a.offset);
495 pausedAdaptors.remove(a.id);
496 a.id = newID;
497 results.add(a.id);
498 }
499 }
500 return results;
501 }
502
503 public void removeFile(String appType, String filename) throws IOException {
504 syncWithAgent();
505
506
507
508 for (Adaptor a : runningAdaptors.values()) {
509 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
510 && a.appType.equals(appType)) {
511 remove(a.id);
512 }
513 }
514 }
515
516
517
518
519
520 public static void main(String[] args) {
521 ChukwaAgentController c = getClient(args);
522 if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
523 doAddFile(c, args[1], args[2]);
524 } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
525 doRemoveFile(c, args[1], args[2]);
526 } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
527 doList(c);
528 } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
529 doRemoveAll(c);
530 } else {
531 System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
532 System.err.println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
533 System.err.println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
534 System.err.println(" ChukwaClient list [IP] [port]");
535 System.err.println(" ChukwaClient removeAll [IP] [port]");
536 }
537 }
538
539 private static ChukwaAgentController getClient(String[] args) {
540 int portno = 9093;
541 String hostname = "localhost";
542
543 numArgs = args.length;
544
545 for (int i = 0; i < args.length; i++) {
546 if (args[i].equals("-h") && args.length > i + 1) {
547 hostname = args[i + 1];
548 log.debug("Setting hostname to: " + hostname);
549 numArgs -= 2;
550 } else if (args[i].equals("-p") && args.length > i + 1) {
551 portno = Integer.parseInt(args[i + 1]);
552 log.debug("Setting portno to: " + portno);
553 numArgs -= 2;
554 }
555 }
556 return new ChukwaAgentController(hostname, portno);
557 }
558
559 private static String doAddFile(ChukwaAgentController c, String appType,
560 String params) {
561 log.info("Adding adaptor with filename: " + params);
562 String adaptorID = c.addFile(appType, params);
563 if (adaptorID != null) {
564 log.info("Successfully added adaptor, id is:" + adaptorID);
565 } else {
566 System.err.println("Agent reported failure to add adaptor, adaptor id returned was:"
567 + adaptorID);
568 }
569 return adaptorID;
570 }
571
572 private static void doRemoveFile(ChukwaAgentController c, String appType,
573 String params) {
574 try {
575 log.debug("Removing adaptor with filename: " + params);
576 c.removeFile(appType, params);
577 } catch (IOException e) {
578 e.printStackTrace();
579 }
580 }
581
582 private static void doList(ChukwaAgentController c) {
583 try {
584 Iterator<Adaptor> adptrs = c.list().values().iterator();
585 while (adptrs.hasNext()) {
586 log.debug(adptrs.next().toString());
587 }
588 } catch (Exception e) {
589 e.printStackTrace();
590 }
591 }
592
593 private static void doRemoveAll(ChukwaAgentController c) {
594 log.info("Removing all adaptors");
595 c.removeAll();
596 }
597 }