This project has retired. For details please refer to its Attic page.
JMSAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19  
20  import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
21  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
22  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
23  import org.apache.hadoop.chukwa.Chunk;
24  import org.apache.hadoop.chukwa.ChunkImpl;
25  import org.apache.log4j.Logger;
26  import org.apache.activemq.ActiveMQConnectionFactory;
27  
28  import javax.jms.TopicConnection;
29  import javax.jms.TopicSession;
30  import javax.jms.Session;
31  import javax.jms.Topic;
32  import javax.jms.Connection;
33  import javax.jms.MessageListener;
34  import javax.jms.Message;
35  import javax.jms.JMSException;
36  import javax.jms.ConnectionFactory;
37  import javax.jms.QueueConnection;
38  import javax.jms.QueueSession;
39  import javax.jms.Queue;
40  import javax.jms.MessageConsumer;
41  
42  /**
43   * Adaptor that is able to listen to a JMS topic or queue for messages, receive
44   * the message, and transform it to a Chukwa chunk. Transformation is handled by
45   * a JMSMessageTransformer. The default JMSMessageTransformer used is the
46   * JMSTextMessageTransformer.
47   * <P>
48   * This adaptor is added to an Agent like so:
49   * <code>
50   * add JMSAdaptor <dataType> <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>]
51   *  [-x <transformerName>] [-p <transformerConfigs>] <offset>
52   * </code>
53   * <ul>
54   * <li><code>dataType</code> - The chukwa data type.</li>
55   * <li><code>brokerURL</code> - The JMS broker URL to bind to.</li>
56   * <li><code>topicName</code> - The JMS topic to listen on.</li>
57   * <li><code>queueName</code> - The JMS queue to listen on.</li>
58   * <li><code>JMSSelector</code> - The JMS selector to filter with. Surround
59   * with quotes if selector contains multiple words.</li>
60   * <li><code>transformerName</code> - Class name of the JMSMessageTransformer to
61   * use.</li>
62   * <li><code>transformerConfigs</code> - Properties to be passed to the
63   * JMSMessageTransformer to use.  Surround with quotes if configs contain
64   * multiple words.</li>
65   * </ul>
66   *
67   * @see JMSMessageTransformer
68   * @see JMSTextMessageTransformer
69   */
70  public class JMSAdaptor extends AbstractAdaptor {
71  
72    static Logger log = Logger.getLogger(JMSAdaptor.class);
73  
74    ConnectionFactory connectionFactory = null;
75    Connection connection;
76    String brokerURL;
77    String topic;
78    String queue;
79    String selector = null;
80    JMSMessageTransformer transformer;
81  
82    volatile long bytesReceived = 0;
83    String status; // used to write checkpoint info. See getStatus() below
84    String source; // added to the chunk to identify the stream
85  
86    class JMSListener implements MessageListener {
87  
88      public void onMessage(Message message) {
89        if (log.isDebugEnabled()) {
90          log.debug("got a JMS message");
91        }
92        
93        try {
94  
95          byte[] bytes = transformer.transform(message);
96          if (bytes == null) {
97            return;
98          }
99  
100         bytesReceived += bytes.length;
101 
102         if (log.isDebugEnabled()) {
103           log.debug("Adding Chunk from JMS message: " + new String(bytes));
104         }
105 
106         Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
107         dest.add(c);
108 
109       } catch (JMSException e) {
110         log.error("can't read JMS messages in " + adaptorID, e);
111       }
112       catch (InterruptedException e) {
113         log.error("can't add JMS messages in " + adaptorID, e);
114       }
115     }
116   }
117 
118   /**
119    * This adaptor received configuration like this:
120    * <brokerURL> <-t <topicName>|-q <queueName>> [-s <JMSSelector>] [-x <transformerName>]
121    * [-p <transformerProperties>]
122    *
123    * @param s
124    * @return
125    */
126   @Override
127   public String parseArgs(String s) {
128     if (log.isDebugEnabled()) {
129       log.debug("Parsing args to initialize adaptor: " + s);
130     }
131 
132     String[] tokens = s.split(" ");
133     if (tokens.length < 1) {
134       throw new IllegalArgumentException("Configuration must include brokerURL.");
135     }
136 
137     brokerURL = tokens[0];
138 
139     if (brokerURL.length() < 6 || brokerURL.indexOf("://") == -1) {
140       throw new IllegalArgumentException("Invalid brokerURL: " + brokerURL);
141     }
142 
143     String transformerName = null;
144     String transformerConfs = null;
145     for (int i = 1; i < tokens.length; i++) {
146       String value = tokens[i];
147       if ("-t".equals(value)) {
148         topic = tokens[++i];
149       }
150       else if ("-q".equals(value)) {
151         queue = tokens[++i];
152       }
153       else if ("-s".equals(value) && i <= tokens.length - 1) {
154         selector = tokens[++i];
155 
156         // selector can have multiple words
157         if (selector.startsWith("\"")) {
158           for(int j = i + 1; j < tokens.length; j++) {
159             selector = selector + " " + tokens[++i];
160             if(tokens[j].endsWith("\"")) {
161               break;
162             }
163           }
164           selector = trimQuotes(selector);
165         }
166       }
167       else if ("-x".equals(value)) {
168         transformerName = tokens[++i];
169       }
170       else if ("-p".equals(value)) {
171         transformerConfs = tokens[++i];
172 
173         // transformerConfs can have multiple words
174         if (transformerConfs.startsWith("\"")) {
175           for(int j = i + 1; j < tokens.length; j++) {
176             transformerConfs = transformerConfs + " " + tokens[++i];
177             if(tokens[j].endsWith("\"")) {
178               break;
179             }
180           }
181           transformerConfs = trimQuotes(transformerConfs);
182         }
183       }
184     }
185 
186     if (topic == null && queue == null) {
187       log.error("topicName or queueName must be set");
188       return null;
189     }
190 
191     if (topic != null && queue != null) {
192       log.error("Either topicName or queueName must be set, but not both");
193       return null;
194     }
195 
196     // create transformer
197     if (transformerName != null) {
198       try {
199         Class classDefinition = Class.forName(transformerName);
200         Object object = classDefinition.newInstance();
201         transformer = (JMSMessageTransformer)object;
202       } catch (Exception e) {
203         log.error("Couldn't find class for transformerName=" + transformerName, e);
204         return null;
205       }
206     }
207     else {
208       transformer = new JMSTextMessageTransformer();
209     }
210 
211     // configure transformer
212     if (transformerConfs != null) {
213       String result = transformer.parseArgs(transformerConfs);
214       if (result == null) {
215         log.error("JMSMessageTransformer couldn't parse transformer configs: " +
216                 transformerConfs);
217         return null;
218       }
219     }
220 
221     status = s;
222 
223     if(topic != null) {
224       source = "jms:"+brokerURL + ",topic:" + topic;
225     }
226     else if(queue != null) {
227       source = "jms:"+brokerURL + ",queue:" + queue;
228     }
229 
230     return s;
231   }
232 
233   @Override
234   public void start(long offset) throws AdaptorException {
235 
236     try {
237       bytesReceived = offset;
238 
239       connectionFactory = initializeConnectionFactory(brokerURL);
240       connection = connectionFactory.createConnection();
241 
242       log.info("Starting JMS adaptor: " + adaptorID + " started on brokerURL=" + brokerURL +
243               ", topic=" + topic + ", selector=" + selector +
244               ", offset =" + bytesReceived);
245 
246       // this is where different initialization could be used for a queue
247       if(topic != null) {
248         initializeTopic(connection, topic, selector, new JMSListener());
249       }
250       else if(queue != null) {
251         initializeQueue(connection, queue, selector, new JMSListener());
252       }
253       connection.start();
254 
255     } catch(Exception e) {
256       throw new AdaptorException(e);
257     }
258   }
259 
260   /**
261    * Override this to initialize with a different connection factory.
262    * @param brokerURL
263    * @return
264    */
265   protected ConnectionFactory initializeConnectionFactory(String brokerURL) {
266     return new ActiveMQConnectionFactory(brokerURL);
267   }
268 
269   /**
270    * Status is used to write checkpoints. Checkpoints are written as:
271    * ADD <adaptorKey> = <adaptorClass> <currentStatus> <offset>
272    *
273    * Once they're reloaded, adaptors are re-initialized with
274    * <adaptorClass> <currentStatus> <offset>
275    *
276    * While doing so, this gets passed by to the parseArgs method:
277    * <currentStatus>
278    *
279    * Without the first token in <currentStatus>, which is expected to be <dataType>.
280    *
281    * @return
282    */
283   @Override
284   public String getCurrentStatus() {
285     return type + " " + status;
286   }
287 
288   @Override
289   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
290       throws AdaptorException {
291     try {
292       connection.close();
293 
294     } catch(Exception e) {
295       log.error("Exception closing JMS connection.", e);
296     }
297 
298     return bytesReceived;
299   }
300 
301   private void initializeTopic(Connection connection,
302                                       String topic,
303                                       String selector,
304                                       JMSListener listener) throws JMSException {
305     TopicSession session = ((TopicConnection)connection).
306             createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
307     Topic jmsTopic = session.createTopic(topic);
308     MessageConsumer consumer = session.createConsumer(jmsTopic, selector, true);
309     consumer.setMessageListener(listener);
310   }
311 
312   private void initializeQueue(Connection connection,
313                                       String topic,
314                                       String selector,
315                                       JMSListener listener) throws JMSException {
316     QueueSession session = ((QueueConnection)connection).
317             createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
318     Queue queue = session.createQueue(topic);
319     MessageConsumer consumer = session.createConsumer(queue, selector, true);
320     consumer.setMessageListener(listener);
321   }
322 
323   private static String trimQuotes(String value) {
324     // trim leading and trailing quotes
325     if (value.charAt(0) == '"') {
326       value = value.substring(1);
327     }
328     if (value.charAt(value.length() - 1) == '"') {
329       value = value.substring(0, value.length() - 1);
330     }
331     return value;
332   }
333 
334 }