This project has retired. For details please refer to its Attic page.
JMXAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.nio.charset.Charset;
27  import java.rmi.ConnectException;
28  import java.util.Calendar;
29  import java.util.HashMap;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.TimeZone;
33  import java.util.Timer;
34  import java.util.TimerTask;
35  import java.util.TreeSet;
36  
37  import javax.management.Descriptor;
38  import javax.management.MBeanAttributeInfo;
39  import javax.management.MBeanInfo;
40  import javax.management.MBeanServerConnection;
41  import javax.management.ObjectName;
42  import javax.management.openmbean.CompositeData;
43  import javax.management.openmbean.CompositeType;
44  import javax.management.openmbean.OpenType;
45  import javax.management.openmbean.TabularData;
46  import javax.management.openmbean.TabularType;
47  import javax.management.remote.JMXConnector;
48  import javax.management.remote.JMXConnectorFactory;
49  import javax.management.remote.JMXServiceURL;
50  
51  import org.apache.hadoop.chukwa.ChunkImpl;
52  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
53  import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
54  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
55  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
56  import org.apache.hadoop.chukwa.util.ExceptionUtil;
57  import org.apache.log4j.Logger;
58  import org.json.simple.JSONObject;
59  
60  /**
61   * Query metrics through JMX interface. <br>
62   * 1. Enable remote jmx monitoring for the target
63   * jvm by specifying -Dcom.sun.management.jmxremote.port=jmx_port<br>
64   * 2. Enable authentication with -Dcom.sun.management.jmxremote.authenticate=true <br>
65   *    -Dcom.sun.management.jmxremote.password.file=${CHUKWA_CONF_DIR}/jmxremote.password <br>
66   *    -Dcom.sun.management.jmxremote.access.file=${CHUKWA_CONF_DIR}/jmxremote.access <br>
67   * 3. Optionally specify these jvm options <br>
68   * 	  -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false <br>
69   * 4. Connect to the jmx agent using jconsole and find out which domain you want to collect data for
70   * 5. Add the jmx adaptor. Ex: To collect metrics from a hadoop datanode that has enabled jmx on 8007, at 60s interval, use command<br>
71   *   "add JMXAdaptor DatanodeProcessor localhost 8007 60 Hadoop:*" <br><br>
72   * Your JMX adaptor is now good to go and will send out the collected metrics as chunks to the collector.
73   */
74  public class JMXAdaptor extends AbstractAdaptor{
75  
76  	private static Logger log = Logger.getLogger(JMXAdaptor.class);
77  	private MBeanServerConnection mbsc = null;
78  	private String port ="", server="localhost";	
79  	private JMXServiceURL url;
80  	private JMXConnector jmxc = null;
81  	private long period = 10;
82  	private Timer timer;
83  	private JMXTimer runner;
84  	private String pattern = "";
85  	long sendOffset = 0;
86  	volatile boolean shutdown = false;
87  	
88  	/**
89  	 * A thread which creates a new connection to JMX and retries every 10s if the connection is not  
90  	 * successful. It uses the credentials specified in $CHUKWA_CONF_DIR/jmxremote.password.
91  	 */
92  	public class JMXConnect implements Runnable{
93  
94  		@Override
95  		public void run() {
96  			String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
97  			StringBuffer sb = new StringBuffer(hadoop_conf);
98  			if(!hadoop_conf.endsWith("/")){
99  				sb.append(File.separator);
100 			}
101 			sb.append("jmxremote.password");
102 			File jmx_pw_file = new File(sb.toString());
103 			shutdown = false;
104 			while(!shutdown){
105 				try{					
106 					BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
107 					String buffer = br.readLine();
108 					String[] creds = null;
109 					if(buffer != null ) {
110 					  creds = buffer.split(" ");
111 					}
112 					br.close();
113 					Map<String, String[]> env = new HashMap<String, String[]>();
114 					if(creds!=null) {
115 					  env.put(JMXConnector.CREDENTIALS, creds);
116 					}
117 					jmxc = JMXConnectorFactory.connect(url, env);
118 					mbsc = jmxc.getMBeanServerConnection();							
119 					if(timer == null) {
120 						timer = new Timer();
121 						runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
122 					}
123 					timer.scheduleAtFixedRate(runner, 0, period * 1000);
124 					shutdown = true;
125 				} catch (IOException e) {
126 					log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
127 					log.error(ExceptionUtil.getStackTrace(e));	
128 					try {
129 						Thread.sleep(10000);
130 					} catch (InterruptedException e1) {
131 						log.error("JMXConnect thread interrupted in sleep, bailing");
132 						shutdown = true;
133 					}
134 				} catch (Exception e) {
135 					log.error("Something bad happened in JMXConnect thread, bailing");
136 					log.error(ExceptionUtil.getStackTrace(e));
137 					timer.cancel();
138 					timer = null;
139 					shutdown = true;
140 				}
141 			}
142 		}
143 		
144 	}
145 	
146 	/**
147 	 * A TimerTask which queries the mbean server for all mbeans that match the pattern specified in
148 	 * the JMXAdaptor arguments, constructs a json object of all data and sends it as a chunk. The 
149 	 * CompositeType, TabularType and Array open mbean types return the numerical values (sizes). 
150 	 * This task is scheduled to run at the interval specified in the adaptor arguments. If the 
151 	 * connection to mbean server is broken, this task cancels the existing timer and tries to 
152 	 * re-connect to the mbean server.  
153 	 */
154 	
155 	public class JMXTimer extends TimerTask{
156 
157 		private Logger log = Logger.getLogger(JMXTimer.class);
158 		private ChunkReceiver receiver = null;
159 		private JMXAdaptor adaptor = null;
160 		private MBeanServerConnection mbsc = null;
161 		//private long sendOffset = 0;
162 		
163 		public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
164 			this.receiver = receiver;
165 			this.adaptor = adaptor;		
166 			this.mbsc = mbsc;
167 		}
168 		
169 		@SuppressWarnings("unchecked")
170 		@Override
171 		public void run() {
172 			try{
173 				ObjectName query = null;
174 				if(!pattern.equals("")){
175 					query = new ObjectName(pattern);			
176 				}
177 				Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
178 				Object val = null;
179 				JSONObject json = new JSONObject();
180 									
181 				for (ObjectName oname: names) {			
182 					MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
183 					MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();						
184 					
185 					for (MBeanAttributeInfo mb: mbinfos) {
186 						try{
187 							Descriptor d = mb.getDescriptor();
188 							val = mbsc.getAttribute(oname, mb.getName());
189 							if(d.getFieldNames().length > 0){ //this is an open mbean
190 								OpenType<?> openType = (OpenType<?>)d.getFieldValue("openType");	
191 								
192 								if(openType.isArray()){									
193 									Object[] valarray = (Object[])val;									
194 									val = Integer.toString(valarray.length);
195 								}
196 								else if(openType instanceof CompositeType){
197 									CompositeData data = (CompositeData)val;
198 									val = Integer.toString(data.values().size());									
199 								}
200 								else if(openType instanceof TabularType){
201 									TabularData data = (TabularData)val;
202 									val = Integer.toString(data.size());
203 								}
204 								//else it is SimpleType									
205 							}
206 							json.put(mb.getName(),val);
207 						}
208 						catch(Exception e){
209 							log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
210 						}						
211 					}
212 				}
213 				
214 				byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));		
215 				sendOffset+=data.length;				
216 				ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
217 				long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
218 				c.addTag("timeStamp=\""+rightNow+"\"");
219 				receiver.add(c);
220 			}
221 			catch(ConnectException e1){
222 				log.error("Got connect exception for the existing MBeanServerConnection");
223 				log.error(ExceptionUtil.getStackTrace(e1));
224 				log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
225 				timer.cancel();
226 				timer = null;
227 				Thread connectThread = new Thread(new JMXConnect());
228 				connectThread.start();
229 			}
230 			catch(Exception e){
231 				log.error(ExceptionUtil.getStackTrace(e));
232 			}
233 			
234 		}
235 		
236 	}
237 	
238 	
239 	@Override
240 	public String getCurrentStatus() {
241 		StringBuilder buffer = new StringBuilder();
242 		buffer.append(type);
243 		buffer.append(" ");
244 		buffer.append(server);
245 		buffer.append(" ");
246 		buffer.append(port);
247 		buffer.append(" ");
248 		buffer.append(period);
249 		buffer.append(" ");
250 		buffer.append(pattern);
251 		return buffer.toString();
252 	}
253 	
254 	@Override
255 	public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
256 			throws AdaptorException {
257 		log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
258 		try {
259 			if(jmxc != null){
260 				jmxc.close();
261 			}
262 			if(timer != null){
263 				timer.cancel();
264 			}
265 		} catch (IOException e) {
266 			log.error("JMXAdaptor shutdown failed due to IOException");
267 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
268 		} catch (Exception e) {
269 			log.error("JMXAdaptor shutdown failed");
270 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
271 		}
272 		//in case the start thread is still retrying
273 		shutdown = true;
274 	    return sendOffset;
275 		
276 		
277 	}
278 
279 	@Override
280 	public void start(long offset) throws AdaptorException {
281 		try {			
282 			sendOffset = offset;
283 			Thread connectThread = new Thread(new JMXConnect());
284 			connectThread.start();			
285 		} catch(Exception e) {
286 			log.error("Failed to schedule JMX connect thread");
287 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));	
288 		}
289 		
290 	}
291 
292 	@Override
293 	public String parseArgs(String s) {
294 		//JMXAdaptor MBeanServer port [interval] DomainNamePattern-Ex:"Hadoop:*"
295 		String[] tokens = s.split(" ");
296 		if(tokens.length == 4){
297 			server = tokens[0];
298 			port = tokens[1];
299 			period = Integer.parseInt(tokens[2]);
300 			pattern = tokens[3];
301 		}
302 		else if(tokens.length == 3){
303 			server = tokens[0];
304 			port = tokens[1];
305 			pattern = tokens[2];
306 		}
307 		else{
308 			log.warn("bad syntax in JMXAdaptor args");
309 			return null;
310 		}
311 		String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
312 		try{
313 			url = new JMXServiceURL(url_string);			
314 			return s;
315 		}
316 		catch(Exception e){
317 			log.error(ExceptionUtil.getStackTrace(e));
318 		}
319 		return null;		
320 	}
321 	
322 }
323