This project has retired. For details please refer to its
Attic page.
MemLimitQueue xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Queue;
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27 import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.log4j.Logger;
30
31
32
33
34
35
36
37
38 public class MemLimitQueue implements ChunkQueue {
39 static Logger log = Logger.getLogger(WaitingQueue.class);
40 static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
41 private Queue<Chunk> queue = new LinkedList<Chunk>();
42 private long dataSize = 0;
43 private long MAX_MEM_USAGE;
44 static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
45 static final int QUEUE_SIZE = 10 * 1024 * 1024;
46
47 public MemLimitQueue(Configuration conf) {
48 configure(conf);
49 }
50
51
52
53
54 public void add(Chunk chunk) throws InterruptedException {
55 assert chunk != null : "can't enqueue null chunks";
56 synchronized (this) {
57 while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
58 try {
59 if(dataSize == 0) {
60 log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType() +
61 " and source =" +chunk.getStreamName());
62 return;
63
64 }
65 metrics.fullQueue.set(1);
66 this.wait();
67 log.info("MemLimitQueue is full [" + dataSize + "]");
68 } catch (InterruptedException e) {
69 }
70 }
71 metrics.fullQueue.set(0);
72 dataSize += chunk.getData().length;
73 queue.add(chunk);
74 metrics.addedChunk.inc();
75 metrics.queueSize.set(queue.size());
76 metrics.dataSize.set(dataSize);
77 this.notifyAll();
78 }
79
80 }
81
82
83
84
85
86 public void collect(List<Chunk> events, int maxSize)
87 throws InterruptedException {
88 synchronized (this) {
89
90 while (queue.isEmpty()) {
91 this.wait();
92 }
93
94 int size = 0;
95 while (!queue.isEmpty() && (size < maxSize)) {
96 Chunk e = this.queue.remove();
97 metrics.removedChunk.inc();
98 int chunkSize = e.getData().length;
99 size += chunkSize;
100 dataSize -= chunkSize;
101 metrics.dataSize.set(dataSize);
102 events.add(e);
103 }
104 metrics.queueSize.set(queue.size());
105 this.notifyAll();
106 }
107
108 if (log.isDebugEnabled()) {
109 log.debug("WaitingQueue.inQueueCount:" + queue.size()
110 + "\tWaitingQueue.collectCount:" + events.size());
111 }
112 }
113
114 public int size() {
115 return queue.size();
116 }
117
118 private void configure(Configuration conf) {
119 MAX_MEM_USAGE = QUEUE_SIZE;
120 if(conf == null){
121 return;
122 }
123 String limit = conf.get(CHUNK_QUEUE_LIMIT);
124 if(limit != null){
125 try{
126 MAX_MEM_USAGE = Integer.parseInt(limit);
127 } catch(NumberFormatException nfe) {
128 log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
129 + ". Defaulting internal queue size to " + QUEUE_SIZE);
130 }
131 }
132 log.info("Using MemLimitQueue limit of " + MAX_MEM_USAGE);
133 }
134 }