This project has retired. For details please refer to its
Attic page.
JMSAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19
20 import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
21 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
22 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
23 import org.apache.hadoop.chukwa.Chunk;
24 import org.apache.hadoop.chukwa.ChunkImpl;
25 import org.apache.log4j.Logger;
26 import org.apache.activemq.ActiveMQConnectionFactory;
27
28 import javax.jms.TopicConnection;
29 import javax.jms.TopicSession;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32 import javax.jms.Connection;
33 import javax.jms.MessageListener;
34 import javax.jms.Message;
35 import javax.jms.JMSException;
36 import javax.jms.ConnectionFactory;
37 import javax.jms.QueueConnection;
38 import javax.jms.QueueSession;
39 import javax.jms.Queue;
40 import javax.jms.MessageConsumer;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class JMSAdaptor extends AbstractAdaptor {
71
72 static Logger log = Logger.getLogger(JMSAdaptor.class);
73
74 ConnectionFactory connectionFactory = null;
75 Connection connection;
76 String brokerURL;
77 String topic;
78 String queue;
79 String selector = null;
80 JMSMessageTransformer transformer;
81
82 volatile long bytesReceived = 0;
83 String status;
84 String source;
85
86 class JMSListener implements MessageListener {
87
88 public void onMessage(Message message) {
89 if (log.isDebugEnabled()) {
90 log.debug("got a JMS message");
91 }
92
93 try {
94
95 byte[] bytes = transformer.transform(message);
96 if (bytes == null) {
97 return;
98 }
99
100 bytesReceived += bytes.length;
101
102 if (log.isDebugEnabled()) {
103 log.debug("Adding Chunk from JMS message: " + new String(bytes));
104 }
105
106 Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
107 dest.add(c);
108
109 } catch (JMSException e) {
110 log.error("can't read JMS messages in " + adaptorID, e);
111 }
112 catch (InterruptedException e) {
113 log.error("can't add JMS messages in " + adaptorID, e);
114 }
115 }
116 }
117
118
119
120
121
122
123
124
125
126 @Override
127 public String parseArgs(String s) {
128 if (log.isDebugEnabled()) {
129 log.debug("Parsing args to initialize adaptor: " + s);
130 }
131
132 String[] tokens = s.split(" ");
133 if (tokens.length < 1) {
134 throw new IllegalArgumentException("Configuration must include brokerURL.");
135 }
136
137 brokerURL = tokens[0];
138
139 if (brokerURL.length() < 6 || brokerURL.indexOf("://") == -1) {
140 throw new IllegalArgumentException("Invalid brokerURL: " + brokerURL);
141 }
142
143 String transformerName = null;
144 String transformerConfs = null;
145 for (int i = 1; i < tokens.length; i++) {
146 String value = tokens[i];
147 if ("-t".equals(value)) {
148 topic = tokens[++i];
149 }
150 else if ("-q".equals(value)) {
151 queue = tokens[++i];
152 }
153 else if ("-s".equals(value) && i <= tokens.length - 1) {
154 selector = tokens[++i];
155
156
157 if (selector.startsWith("\"")) {
158 for(int j = i + 1; j < tokens.length; j++) {
159 selector = selector + " " + tokens[++i];
160 if(tokens[j].endsWith("\"")) {
161 break;
162 }
163 }
164 selector = trimQuotes(selector);
165 }
166 }
167 else if ("-x".equals(value)) {
168 transformerName = tokens[++i];
169 }
170 else if ("-p".equals(value)) {
171 transformerConfs = tokens[++i];
172
173
174 if (transformerConfs.startsWith("\"")) {
175 for(int j = i + 1; j < tokens.length; j++) {
176 transformerConfs = transformerConfs + " " + tokens[++i];
177 if(tokens[j].endsWith("\"")) {
178 break;
179 }
180 }
181 transformerConfs = trimQuotes(transformerConfs);
182 }
183 }
184 }
185
186 if (topic == null && queue == null) {
187 log.error("topicName or queueName must be set");
188 return null;
189 }
190
191 if (topic != null && queue != null) {
192 log.error("Either topicName or queueName must be set, but not both");
193 return null;
194 }
195
196
197 if (transformerName != null) {
198 try {
199 Class classDefinition = Class.forName(transformerName);
200 Object object = classDefinition.newInstance();
201 transformer = (JMSMessageTransformer)object;
202 } catch (Exception e) {
203 log.error("Couldn't find class for transformerName=" + transformerName, e);
204 return null;
205 }
206 }
207 else {
208 transformer = new JMSTextMessageTransformer();
209 }
210
211
212 if (transformerConfs != null) {
213 String result = transformer.parseArgs(transformerConfs);
214 if (result == null) {
215 log.error("JMSMessageTransformer couldn't parse transformer configs: " +
216 transformerConfs);
217 return null;
218 }
219 }
220
221 status = s;
222
223 if(topic != null) {
224 source = "jms:"+brokerURL + ",topic:" + topic;
225 }
226 else if(queue != null) {
227 source = "jms:"+brokerURL + ",queue:" + queue;
228 }
229
230 return s;
231 }
232
233 @Override
234 public void start(long offset) throws AdaptorException {
235
236 try {
237 bytesReceived = offset;
238
239 connectionFactory = initializeConnectionFactory(brokerURL);
240 connection = connectionFactory.createConnection();
241
242 log.info("Starting JMS adaptor: " + adaptorID + " started on brokerURL=" + brokerURL +
243 ", topic=" + topic + ", selector=" + selector +
244 ", offset =" + bytesReceived);
245
246
247 if(topic != null) {
248 initializeTopic(connection, topic, selector, new JMSListener());
249 }
250 else if(queue != null) {
251 initializeQueue(connection, queue, selector, new JMSListener());
252 }
253 connection.start();
254
255 } catch(Exception e) {
256 throw new AdaptorException(e);
257 }
258 }
259
260
261
262
263
264
265 protected ConnectionFactory initializeConnectionFactory(String brokerURL) {
266 return new ActiveMQConnectionFactory(brokerURL);
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283 @Override
284 public String getCurrentStatus() {
285 return type + " " + status;
286 }
287
288 @Override
289 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
290 throws AdaptorException {
291 try {
292 connection.close();
293
294 } catch(Exception e) {
295 log.error("Exception closing JMS connection.", e);
296 }
297
298 return bytesReceived;
299 }
300
301 private void initializeTopic(Connection connection,
302 String topic,
303 String selector,
304 JMSListener listener) throws JMSException {
305 TopicSession session = ((TopicConnection)connection).
306 createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
307 Topic jmsTopic = session.createTopic(topic);
308 MessageConsumer consumer = session.createConsumer(jmsTopic, selector, true);
309 consumer.setMessageListener(listener);
310 }
311
312 private void initializeQueue(Connection connection,
313 String topic,
314 String selector,
315 JMSListener listener) throws JMSException {
316 QueueSession session = ((QueueConnection)connection).
317 createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
318 Queue queue = session.createQueue(topic);
319 MessageConsumer consumer = session.createConsumer(queue, selector, true);
320 consumer.setMessageListener(listener);
321 }
322
323 private static String trimQuotes(String value) {
324
325 if (value.charAt(0) == '"') {
326 value = value.substring(1);
327 }
328 if (value.charAt(value.length() - 1) == '"') {
329 value = value.substring(0, value.length() - 1);
330 }
331 return value;
332 }
333
334 }