This project has retired. For details please refer to its
Attic page.
ChukwaAgentController xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.controller;
20
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStreamWriter;
27 import java.io.PrintWriter;
28 import java.net.Socket;
29 import java.net.SocketException;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Timer;
38 import java.util.TimerTask;
39
40 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
41 import org.apache.log4j.Logger;
42
43
44
45
46
47
48
49 public class ChukwaAgentController {
50 static Logger log = Logger.getLogger(ChukwaAgentController.class);
51
52 public class AddAdaptorTask extends TimerTask {
53
54 String adaptorName;
55 String type;
56 String params;
57 private long offset;
58 long numRetries;
59 long retryInterval;
60
61 AddAdaptorTask(String adaptorName, String type, String params, long offset,
62 long numRetries, long retryInterval) {
63 this.adaptorName = adaptorName;
64 this.type = type;
65 this.params = params;
66 this.offset = offset;
67 this.numRetries = numRetries;
68 this.retryInterval = retryInterval;
69 }
70
71 @Override
72 public void run() {
73 try {
74 log.info("Trying to resend the add command [" + adaptorName + "]["
75 + offset + "][" + params + "] [" + numRetries + "]");
76 addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
77 } catch (Exception e) {
78 log.warn("Exception in AddAdaptorTask.run", e);
79 e.printStackTrace();
80 }
81 }
82 }
83
84
85 public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
86 public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
87
88 static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
89 static int DEFAULT_PORT = 9093;
90 static String DEFAULT_HOST = "localhost";
91 static int numArgs = 0;
92
93 class Adaptor {
94 public String id;
95 final public String className;
96 final public String params;
97 final public String appType;
98 public long offset;
99
100 Adaptor(String className, String appType, String params, long offset) {
101 this.className = className;
102 this.appType = appType;
103 this.params = params;
104 this.offset = offset;
105 }
106
107 Adaptor(String id, String className, String appType, String params,
108 long offset) {
109 this.id = id;
110 this.className = className;
111 this.appType = appType;
112 this.params = params;
113 this.offset = offset;
114 }
115
116
117
118
119
120
121
122
123
124 String register() throws IOException {
125 Socket s = new Socket(hostname, portno);
126 try {
127 s.setSoTimeout(60000);
128 } catch (SocketException e) {
129 log.warn("Error while settin soTimeout to 60000");
130 e.printStackTrace();
131 }
132 PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
133 .getOutputStream(), Charset.forName("UTF-8")));
134 if(id != null)
135 bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " + offset);
136 else
137 bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
138 bw.flush();
139 BufferedReader br = new BufferedReader(new InputStreamReader(s
140 .getInputStream(), Charset.forName("UTF-8")));
141 String resp = br.readLine();
142 if (resp != null) {
143 String[] fields = resp.split(" ");
144 if (fields[0].equals("OK")) {
145 id = fields[fields.length - 1];
146 }
147 }
148 s.close();
149 return id;
150 }
151
152 void unregister() throws IOException {
153 Socket s = new Socket(hostname, portno);
154 try {
155 s.setSoTimeout(60000);
156 } catch (SocketException e) {
157 log.warn("Error while settin soTimeout to 60000");
158 }
159 PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
160 .getOutputStream(), Charset.forName("UTF-8")));
161 bw.println("SHUTDOWN " + id);
162 bw.flush();
163
164 BufferedReader br = new BufferedReader(new InputStreamReader(s
165 .getInputStream(), Charset.forName("UTF-8")));
166 String resp = br.readLine();
167 if (resp == null || !resp.startsWith("OK")) {
168 log.error("adaptor unregister error, id: " + id);
169 } else if (resp.startsWith("OK")) {
170 String[] respSplit = resp.split(" ");
171 String newOffset = respSplit[respSplit.length - 1];
172 try {
173 offset = Long.parseLong(newOffset);
174 } catch (NumberFormatException nfe) {
175 log.error("adaptor didn't shutdown gracefully.\n" + nfe);
176 }
177 }
178
179 s.close();
180 }
181
182 public String toString() {
183 String[] namePieces = className.split("\\.");
184 String shortName = namePieces[namePieces.length - 1];
185 return id + " " + shortName + " " + appType + " " + params + " " + offset;
186 }
187 }
188
189 Map<String, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<String, Adaptor>();
190 Map<String, ChukwaAgentController.Adaptor> runningInstanceAdaptors = new HashMap<String, Adaptor>();
191 Map<String, ChukwaAgentController.Adaptor> pausedAdaptors;
192 String hostname;
193 int portno;
194
195 public ChukwaAgentController() {
196 portno = DEFAULT_PORT;
197 hostname = DEFAULT_HOST;
198 pausedAdaptors = new HashMap<String, Adaptor>();
199
200 syncWithAgent();
201 }
202
203 public ChukwaAgentController(String hostname, int portno) {
204 this.hostname = hostname;
205 this.portno = portno;
206 pausedAdaptors = new HashMap<String, Adaptor>();
207
208 syncWithAgent();
209 }
210
211 private boolean syncWithAgent() {
212
213 try {
214 runningAdaptors = list();
215 return true;
216 } catch (IOException e) {
217 System.err.println("Error initializing ChukwaClient with list of "
218 + "currently registered adaptors, clearing our local list of adaptors");
219
220
221
222 runningAdaptors = new HashMap<String, ChukwaAgentController.Adaptor>();
223 return false;
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 public String add(String adaptorName, String type, String params, long offset) {
241 return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);
242
243
244
245
246
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263 public String addByName(String adaptorID, String adaptorName, String type, String params, long offset,
264 long numRetries, long retryInterval) {
265 ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
266 adaptorName, type, params, offset);
267 adaptor.id = adaptorID;
268 if (numRetries >= 0) {
269 try {
270 adaptorID = adaptor.register();
271
272 if (adaptorID != null) {
273 runningAdaptors.put(adaptorID, adaptor);
274 runningInstanceAdaptors.put(adaptorID, adaptor);
275 } else {
276 System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
277 }
278 } catch (IOException ioe) {
279 log.warn("AgentClient failed to contact the agent ("
280 + hostname + ":" + portno + ")");
281
282 log.warn("Scheduling a agent connection retry for adaptor add() in another "
283 + retryInterval
284 + " milliseconds, "
285 + numRetries
286 + " retries remaining");
287
288 Timer addFileTimer = new Timer();
289 addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
290 offset, numRetries - 1, retryInterval), retryInterval);
291 }
292 } else {
293 System.err.println("Giving up on connecting to the local agent");
294 }
295 return adaptorID;
296 }
297
298 public synchronized ChukwaAgentController.Adaptor remove(String adaptorID)
299 throws IOException {
300 syncWithAgent();
301 ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
302 if ( a != null ) {
303 a.unregister();
304 }
305 return a;
306
307 }
308
309 public void remove(String className, String appType, String filename)
310 throws IOException {
311 syncWithAgent();
312
313
314
315 for (Adaptor a : runningAdaptors.values()) {
316 if (a.className.equals(className) && a.params.equals(filename)
317 && a.appType.equals(appType)) {
318 remove(a.id);
319 }
320 }
321 }
322
323 public void removeAll() {
324 syncWithAgent();
325 ArrayList<String> keyset = new ArrayList<String>();
326 keyset.addAll( runningAdaptors.keySet());
327
328 for (String id : keyset) {
329 try {
330 remove(id);
331 } catch (IOException ioe) {
332 System.err.println("Error removing an adaptor in removeAll()");
333 ioe.printStackTrace();
334 }
335 log.info("Successfully removed adaptor " + id);
336 }
337 }
338
339 public void removeInstanceAdaptors() {
340
341
342
343
344 for (Adaptor a : runningInstanceAdaptors.values()) {
345 try {
346 remove(a.className, a.appType, a.params);
347 } catch (IOException ioe) {
348 log.warn("Error removing an adaptor in removeInstanceAdaptors()");
349 ioe.printStackTrace();
350 }
351 }
352 }
353
354 Map<String, ChukwaAgentController.Adaptor> list() throws IOException {
355 Socket s = new Socket(hostname, portno);
356 try {
357 s.setSoTimeout(60000);
358 } catch (SocketException e) {
359 log.warn("Error while settin soTimeout to 60000");
360 e.printStackTrace();
361 }
362 PrintWriter bw = new PrintWriter(
363 new OutputStreamWriter(s.getOutputStream(), Charset.forName("UTF-8")));
364
365 bw.println("LIST");
366 bw.flush();
367 BufferedReader br = new BufferedReader(new InputStreamReader(s
368 .getInputStream(), Charset.forName("UTF-8")));
369 String ln;
370 Map<String, Adaptor> listResult = new HashMap<String, Adaptor>();
371 while ((ln = br.readLine()) != null) {
372 if (ln.equals("")) {
373 break;
374 } else {
375 String[] parts = ln.split("\\s+");
376 if (parts.length >= 4) {
377
378 String id = parts[0].substring(0, parts[0].length() - 1);
379
380
381
382
383
384 long offset = Long.parseLong(parts[parts.length - 1]);
385 StringBuilder tmpParams = new StringBuilder();
386 tmpParams.append(parts[3]);
387 for (int i = 4; i < parts.length - 1; i++) {
388 tmpParams.append(" ");
389 tmpParams.append(parts[i]);
390 }
391 listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams.toString(),
392 offset));
393 }
394 }
395 }
396 s.close();
397 return listResult;
398 }
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420 public String addFile(String appType, String filename, long numRetries,
421 long retryInterval) {
422 filename = new File(filename).getAbsolutePath();
423
424
425
426
427
428
429 boolean isDuplicate = false;
430 for (Adaptor a : runningAdaptors.values()) {
431 if (a.className.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
432 && a.params.endsWith(filename)) {
433 isDuplicate = true;
434 }
435 }
436 if (!isDuplicate) {
437 return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
438 numRetries, retryInterval);
439 } else {
440 log.info("An adaptor for filename \"" + filename
441 + "\", type \"" + appType
442 + "\", exists already, addFile() command aborted");
443 return null;
444 }
445 }
446
447 public String addFile(String appType, String filename) {
448 return addFile(appType, filename, 0, 0);
449 }
450
451
452
453
454
455
456
457
458
459
460
461
462
463 public Collection<String> pauseFile(String appType, String filename)
464 throws IOException {
465 syncWithAgent();
466
467
468
469 List<String> results = new ArrayList<String>();
470 for (Adaptor a : runningAdaptors.values()) {
471 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
472 && a.appType.equals(appType)) {
473 pausedAdaptors.put(a.id, a);
474 remove(a.id);
475 results.add(a.id);
476 }
477 }
478 return results;
479 }
480
481 public boolean isFilePaused(String appType, String filename) {
482 for (Adaptor a : pausedAdaptors.values()) {
483 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
484 && a.appType.equals(appType)) {
485 return true;
486 }
487 }
488 return false;
489 }
490
491
492
493
494
495
496
497
498
499
500
501 public Collection<String> resumeFile(String appType, String filename)
502 throws IOException {
503 syncWithAgent();
504
505 List<String> results = new ArrayList<String>();
506 for (Adaptor a : pausedAdaptors.values()) {
507 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
508 && a.appType.equals(appType)) {
509 String newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
510 + filename, a.offset);
511 pausedAdaptors.remove(a.id);
512 a.id = newID;
513 results.add(a.id);
514 }
515 }
516 return results;
517 }
518
519 public void removeFile(String appType, String filename) throws IOException {
520 syncWithAgent();
521
522
523
524 for (Adaptor a : runningAdaptors.values()) {
525 if (a.className.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
526 && a.appType.equals(appType)) {
527 remove(a.id);
528 }
529 }
530 }
531
532
533
534
535
536 public static void main(String[] args) {
537 ChukwaAgentController c = getClient(args);
538 if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
539 doAddFile(c, args[1], args[2]);
540 } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
541 doRemoveFile(c, args[1], args[2]);
542 } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
543 doList(c);
544 } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
545 doRemoveAll(c);
546 } else {
547 System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
548 System.err.println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
549 System.err.println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
550 System.err.println(" ChukwaClient list [IP] [port]");
551 System.err.println(" ChukwaClient removeAll [IP] [port]");
552 }
553 }
554
555 private static ChukwaAgentController getClient(String[] args) {
556 int portno = 9093;
557 String hostname = "localhost";
558
559 numArgs = args.length;
560
561 for (int i = 0; i < args.length; i++) {
562 if (args[i].equals("-h") && args.length > i + 1) {
563 hostname = args[i + 1];
564 log.debug("Setting hostname to: " + hostname);
565 numArgs -= 2;
566 } else if (args[i].equals("-p") && args.length > i + 1) {
567 portno = Integer.parseInt(args[i + 1]);
568 log.debug("Setting portno to: " + portno);
569 numArgs -= 2;
570 }
571 }
572 return new ChukwaAgentController(hostname, portno);
573 }
574
575 private static String doAddFile(ChukwaAgentController c, String appType,
576 String params) {
577 log.info("Adding adaptor with filename: " + params);
578 String adaptorID = c.addFile(appType, params);
579 if (adaptorID != null) {
580 log.info("Successfully added adaptor, id is:" + adaptorID);
581 } else {
582 log.error("Agent reported failure to add adaptor.");
583 }
584 return adaptorID;
585 }
586
587 private static void doRemoveFile(ChukwaAgentController c, String appType,
588 String params) {
589 try {
590 log.debug("Removing adaptor with filename: " + params);
591 c.removeFile(appType, params);
592 } catch (IOException e) {
593 e.printStackTrace();
594 }
595 }
596
597 private static void doList(ChukwaAgentController c) {
598 try {
599 Iterator<Adaptor> adptrs = c.list().values().iterator();
600 while (adptrs.hasNext()) {
601 log.debug(adptrs.next().toString());
602 }
603 } catch (Exception e) {
604 e.printStackTrace();
605 }
606 }
607
608 private static void doRemoveAll(ChukwaAgentController c) {
609 log.info("Removing all adaptors");
610 c.removeAll();
611 }
612 }