1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.test;
202122import org.apache.hadoop.chukwa.Chunk;
23import org.apache.hadoop.chukwa.datacollection.*;
24import org.apache.hadoop.chukwa.datacollection.agent.*;
25import org.apache.hadoop.chukwa.datacollection.connector.Connector;
2627import java.nio.charset.Charset;
28import java.util.*;
2930/**31 * Output events to stdout. Intended for debugging use.32 * 33 */34publicclassConsoleOutConnectorextends Thread implementsConnector {
3536finalChukwaAgent agent;
37volatileboolean shutdown;
38finalboolean silent;
3940publicConsoleOutConnector(ChukwaAgent a) {
41this(a, false);
42 }
4344publicConsoleOutConnector(ChukwaAgent a, boolean silent) {
45 agent = a;
46this.silent = silent;
47 }
4849publicvoid run() {
50try {
51 System.out.println("console connector started");
52ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
53if (!silent)
54 System.out.println("-------------------");
5556while (!shutdown) {
57 List<Chunk> evts = new ArrayList<Chunk>();
58 eventQueue.collect(evts, 1);
5960for (Chunk e : evts) {
61if (!silent) {
62 System.out.println("Console out connector got event at offset "63 + e.getSeqID());
64 System.out.println("data type was " + e.getDataType());
65if (e.getData().length > 1000)
66 System.out.println("data length was " + e.getData().length
67 + ", not printing");
68else69 System.out.println(new String(e.getData(), Charset.forName("UTF-8")));
70 }
7172 agent.reportCommit(e.getInitiator(), e.getSeqID());
7374if (!silent)
75 System.out.println("-------------------");
76 }
77 }
78 } catch (InterruptedException e) {
79 } // thread is about to exit anyway80 }
8182publicvoid shutdown() {
83 shutdown = true;
84this.interrupt();
85 }
8687 @Override
88publicvoid reloadConfiguration() {
89 System.out.println("reloadConfiguration");
90 }
9192 }