1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.agent;
202122import java.util.List;
23import java.util.concurrent.BlockingQueue;
24import java.util.concurrent.LinkedBlockingQueue;
25import org.apache.hadoop.chukwa.Chunk;
26import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27import org.apache.log4j.Logger;
2829publicclassWaitingQueueimplementsChunkQueue {
3031static Logger log = Logger.getLogger(WaitingQueue.class);
32private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
3334publicvoid add(Chunk event) {
35try {
36this.queue.put(event);
37 } catch (InterruptedException e) {
38 }// return upwards39 }
4041publicvoid add(List<Chunk> events) {
42this.queue.addAll(events);
4344 }
4546publicvoid collect(List<Chunk> events, int maxCount) {
47// Workaround to block on the queue48try {
49 events.add(this.queue.take());
50 } catch (InterruptedException e) {
51 }
52this.queue.drainTo(events, maxCount - 1);
5354 System.out.println("collect [" + Thread.currentThread().getName() + "] ["55 + events.size() + "]");
5657if (log.isDebugEnabled()) {
58 log.debug("WaitingQueue.inQueueCount:" + queue.size()
59 + "\tWaitingQueue.collectCount:" + events.size());
60 }
61 }
6263publicint size() {
64return queue.size();
65 }
6667 }