1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.connector;
192021import org.apache.hadoop.chukwa.Chunk;
22import org.apache.hadoop.chukwa.datacollection.*;
23import java.util.*;
2425publicclassChunkCatcherConnectorimplementsConnector {
2627ChunkQueue eq;
2829 Timer tm;
3031classInterruptorextends TimerTask {
32 Thread targ;
33volatileboolean deactivate = false;
34Interruptor(Thread t) {
35 targ =t;
36 }
3738publicsynchronizedvoid run() {
39if(!deactivate)
40 targ.interrupt();
41 }
42 };
4344publicvoid start() {
45 eq = DataFactory.getInstance().getEventQueue();
46 tm = new Timer();
47 }
4849publicChunk waitForAChunk(long ms) {
5051 ArrayList<Chunk> chunks = new ArrayList<Chunk>();
52Interruptor i = newInterruptor(Thread.currentThread());
53if(ms > 0)
54 tm.schedule(i, ms);
55try {
56 eq.collect(chunks, 1);
57synchronized(i) {
58 i.deactivate = true;
59 }
60 } catch(InterruptedException e) {
61 Thread.interrupted();
62returnnull;
63 }
64return chunks.get(0);
65 }
6667publicChunk waitForAChunk() throws InterruptedException {
68returnthis.waitForAChunk(0);//wait forever by default69 }
7071publicvoid shutdown() {
72 tm.cancel();
73 }
7475 @Override
76publicvoid reloadConfiguration() {
77 System.out.println("reloadConfiguration");
78 }
7980publicvoid clear() throws InterruptedException {
81 ArrayList<Chunk> list = new ArrayList<Chunk>();
82while(eq.size() > 0)
83 eq.collect(list, 1);
84 }
8586 }