This project has retired. For details please refer to its Attic page.
MemLimitQueue xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.agent;
20  
21  
22  import java.util.LinkedList;
23  import java.util.List;
24  import java.util.Queue;
25  import org.apache.hadoop.chukwa.Chunk;
26  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27  import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.log4j.Logger;
30  
31  /**
32   * An event queue that blocks once a fixed upper limit of data is enqueued.
33   * 
34   * For now, uses the size of the data field. Should really use
35   * estimatedSerializedSize()?
36   * 
37   */
38  public class MemLimitQueue implements ChunkQueue {
39    static Logger log = Logger.getLogger(WaitingQueue.class);
40    static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
41    private Queue<Chunk> queue = new LinkedList<Chunk>();
42    private long dataSize = 0;
43    private long MAX_MEM_USAGE;
44    static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
45    static final int QUEUE_SIZE = 10 * 1024 * 1024;
46  
47    public MemLimitQueue(Configuration conf) {
48      configure(conf);
49    }
50  
51    /**
52     * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
53     */
54    public void add(Chunk chunk) throws InterruptedException {
55      assert chunk != null : "can't enqueue null chunks";
56      synchronized (this) {
57        while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
58          try {
59            if(dataSize == 0) { //queue is empty, but data is still too big
60              log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType() + 
61                  " and source =" +chunk.getStreamName()); 
62              return; //return without sending; otherwise we'd deadlock.
63              //this error should probably be fatal; there's no way to recover.
64            }
65            metrics.fullQueue.set(1);
66            this.wait();
67            log.info("MemLimitQueue is full [" + dataSize + "]");
68          } catch (InterruptedException e) {
69          }
70        }
71        metrics.fullQueue.set(0);
72        dataSize += chunk.getData().length;
73        queue.add(chunk);
74        metrics.addedChunk.inc();
75        metrics.queueSize.set(queue.size());
76        metrics.dataSize.set(dataSize);
77        this.notifyAll();
78      }
79  
80    }
81  
82    /**
83     * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
84     *      int)
85     */
86    public void collect(List<Chunk> events, int maxSize)
87        throws InterruptedException {
88      synchronized (this) {
89        // we can't just say queue.take() here, since we're holding a lock.
90        while (queue.isEmpty()) {
91          this.wait();
92        }
93  
94        int size = 0;
95        while (!queue.isEmpty() && (size < maxSize)) {
96          Chunk e = this.queue.remove();
97          metrics.removedChunk.inc();
98          int chunkSize = e.getData().length;
99          size += chunkSize;
100         dataSize -= chunkSize;
101         metrics.dataSize.set(dataSize);
102         events.add(e);
103       }
104       metrics.queueSize.set(queue.size());
105       this.notifyAll();
106     }
107 
108     if (log.isDebugEnabled()) {
109       log.debug("WaitingQueue.inQueueCount:" + queue.size()
110           + "\tWaitingQueue.collectCount:" + events.size());
111     }
112   }
113 
114   public int size() {
115     return queue.size();
116   }
117   
118   private void configure(Configuration conf) {
119     MAX_MEM_USAGE = QUEUE_SIZE;
120     if(conf == null){
121       return;
122     }
123     String limit = conf.get(CHUNK_QUEUE_LIMIT);
124     if(limit != null){
125       try{
126         MAX_MEM_USAGE = Integer.parseInt(limit);
127       } catch(NumberFormatException nfe) {
128         log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
129             + ". Defaulting internal queue size to " + QUEUE_SIZE);
130       }
131     }
132     log.info("Using MemLimitQueue limit of " + MAX_MEM_USAGE);
133   }
134 }