This project has retired. For details please refer to its
Attic page.
NonBlockingMemLimitQueue xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Queue;
24
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27 import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.log4j.Logger;
30
31
32
33
34
35
36
37
38
39 public class NonBlockingMemLimitQueue implements ChunkQueue {
40 static Logger log = Logger.getLogger(NonBlockingMemLimitQueue.class);
41 static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent",
42 "chunkQueue");
43 static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
44 static final int QUEUE_SIZE = 10 * 1024 * 1024;
45 private Queue<Chunk> queue = new LinkedList<Chunk>();
46 private long dataSize = 0;
47 private long MAX_MEM_USAGE;
48
49 public NonBlockingMemLimitQueue(Configuration conf) {
50 configure(conf);
51 }
52
53
54
55
56 public void add(Chunk chunk) throws InterruptedException {
57 assert chunk != null : "can't enqueue null chunks";
58 int chunkSize = chunk.getData().length;
59 synchronized (this) {
60 if (chunkSize + dataSize > MAX_MEM_USAGE) {
61 if (dataSize == 0) {
62 log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType()
63 + " and source =" + chunk.getStreamName());
64 return;
65
66
67 } else {
68 metrics.fullQueue.set(1);
69 log.warn("Discarding chunk due to NonBlockingMemLimitQueue full [" + dataSize
70 + "]");
71 return;
72 }
73 }
74 metrics.fullQueue.set(0);
75 dataSize += chunk.getData().length;
76 queue.add(chunk);
77 metrics.addedChunk.inc();
78 metrics.queueSize.set(queue.size());
79 metrics.dataSize.set(dataSize);
80 this.notifyAll();
81 }
82 }
83
84
85
86
87
88 public void collect(List<Chunk> events, int maxSize)
89 throws InterruptedException {
90 synchronized (this) {
91
92 while (queue.isEmpty()) {
93 this.wait();
94 }
95
96 int size = 0;
97 while (!queue.isEmpty() && (size < maxSize)) {
98 Chunk e = this.queue.remove();
99 metrics.removedChunk.inc();
100 int chunkSize = e.getData().length;
101 size += chunkSize;
102 dataSize -= chunkSize;
103 metrics.dataSize.set(dataSize);
104 events.add(e);
105 }
106 metrics.queueSize.set(queue.size());
107 this.notifyAll();
108 }
109
110 if (log.isDebugEnabled()) {
111 log.debug("WaitingQueue.inQueueCount:" + queue.size()
112 + "\tWaitingQueue.collectCount:" + events.size());
113 }
114 }
115
116 public int size() {
117 return queue.size();
118 }
119
120 private void configure(Configuration conf) {
121 MAX_MEM_USAGE = QUEUE_SIZE;
122 if(conf == null){
123 return;
124 }
125 String limit = conf.get(CHUNK_QUEUE_LIMIT);
126 if(limit != null){
127 try{
128 MAX_MEM_USAGE = Integer.parseInt(limit);
129 } catch(NumberFormatException nfe) {
130 log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
131 + ". Defaulting internal queue size to " + QUEUE_SIZE);
132 }
133 }
134 log.info("Using NonBlockingMemLimitQueue limit of " + MAX_MEM_USAGE);
135 }
136 }