1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.test;
20
21
22 import org.apache.hadoop.chukwa.Chunk;
23 import org.apache.hadoop.chukwa.datacollection.*;
24 import org.apache.hadoop.chukwa.datacollection.agent.*;
25 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
26
27 import java.nio.charset.Charset;
28 import java.util.*;
29
30
31
32
33
34 public class ConsoleOutConnector extends Thread implements Connector {
35
36 final ChukwaAgent agent;
37 volatile boolean shutdown;
38 final boolean silent;
39
40 public ConsoleOutConnector(ChukwaAgent a) {
41 this(a, false);
42 }
43
44 public ConsoleOutConnector(ChukwaAgent a, boolean silent) {
45 agent = a;
46 this.silent = silent;
47 }
48
49 public void run() {
50 try {
51 System.out.println("console connector started");
52 ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
53 if (!silent)
54 System.out.println("-------------------");
55
56 while (!shutdown) {
57 List<Chunk> evts = new ArrayList<Chunk>();
58 eventQueue.collect(evts, 1);
59
60 for (Chunk e : evts) {
61 if (!silent) {
62 System.out.println("Console out connector got event at offset "
63 + e.getSeqID());
64 System.out.println("data type was " + e.getDataType());
65 if (e.getData().length > 1000)
66 System.out.println("data length was " + e.getData().length
67 + ", not printing");
68 else
69 System.out.println(new String(e.getData(), Charset.forName("UTF-8")));
70 }
71
72 agent.reportCommit(e.getInitiator(), e.getSeqID());
73
74 if (!silent)
75 System.out.println("-------------------");
76 }
77 }
78 } catch (InterruptedException e) {
79 }
80 }
81
82 public void shutdown() {
83 shutdown = true;
84 this.interrupt();
85 }
86
87 @Override
88 public void reloadConfiguration() {
89 System.out.println("reloadConfiguration");
90 }
91
92 }