This project has retired. For details please refer to its Attic page.
JMXAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.FileReader;
24  import java.io.IOException;
25  import java.rmi.ConnectException;
26  import java.util.Calendar;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.TimeZone;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.TreeSet;
34  
35  import javax.management.Descriptor;
36  import javax.management.MBeanAttributeInfo;
37  import javax.management.MBeanInfo;
38  import javax.management.MBeanServerConnection;
39  import javax.management.ObjectName;
40  import javax.management.openmbean.CompositeData;
41  import javax.management.openmbean.CompositeType;
42  import javax.management.openmbean.OpenType;
43  import javax.management.openmbean.TabularData;
44  import javax.management.openmbean.TabularType;
45  import javax.management.remote.JMXConnector;
46  import javax.management.remote.JMXConnectorFactory;
47  import javax.management.remote.JMXServiceURL;
48  
49  import org.apache.hadoop.chukwa.ChunkImpl;
50  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
51  import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
52  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
53  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
54  import org.apache.hadoop.chukwa.util.ExceptionUtil;
55  import org.apache.log4j.Logger;
56  import org.json.simple.JSONObject;
57  
58  /**
59   * Query metrics through JMX interface. <br>
60   * 1. Enable remote jmx monitoring for the target
61   * jvm by specifying -Dcom.sun.management.jmxremote.port=jmx_port<br>
62   * 2. Enable authentication with -Dcom.sun.management.jmxremote.authenticate=true <br>
63   *    -Dcom.sun.management.jmxremote.password.file=${CHUKWA_CONF_DIR}/jmxremote.password <br>
64   *    -Dcom.sun.management.jmxremote.access.file=${CHUKWA_CONF_DIR}/jmxremote.access <br>
65   * 3. Optionally specify these jvm options <br>
66   * 	  -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false <br>
67   * 4. Connect to the jmx agent using jconsole and find out which domain you want to collect data for
68   * 5. Add the jmx adaptor. Ex: To collect metrics from a hadoop datanode that has enabled jmx on 8007, at 60s interval, use command<br>
69   *   "add JMXAdaptor DatanodeProcessor localhost 8007 60 Hadoop:*" <br><br>
70   * Your JMX adaptor is now good to go and will send out the collected metrics as chunks to the collector.
71   */
72  public class JMXAdaptor extends AbstractAdaptor{
73  
74  	private static Logger log = Logger.getLogger(JMXAdaptor.class);
75  	private MBeanServerConnection mbsc = null;
76  	private String port ="", server="localhost";	
77  	private JMXServiceURL url;
78  	private JMXConnector jmxc = null;
79  	private long period = 10;
80  	private Timer timer;
81  	private JMXTimer runner;
82  	private String pattern = "";
83  	long sendOffset = 0;
84  	volatile boolean shutdown = false;
85  	
86  	/**
87  	 * A thread which creates a new connection to JMX and retries every 10s if the connection is not  
88  	 * successful. It uses the credentials specified in $CHUKWA_CONF_DIR/jmxremote.password.
89  	 */
90  	public class JMXConnect implements Runnable{
91  
92  		@Override
93  		public void run() {
94  			String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
95  			StringBuffer sb = new StringBuffer(hadoop_conf);
96  			if(!hadoop_conf.endsWith("/")){
97  				sb.append(File.separator);
98  			}
99  			sb.append("jmxremote.password");
100 			String jmx_pw_file = sb.toString();
101 			shutdown = false;
102 			while(!shutdown){
103 				try{					
104 					BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
105 					String[] creds = br.readLine().split(" ");
106 					Map<String, String[]> env = new HashMap<String, String[]>();			
107 					env.put(JMXConnector.CREDENTIALS, creds);
108 					jmxc = JMXConnectorFactory.connect(url, env);
109 					mbsc = jmxc.getMBeanServerConnection();							
110 					if(timer == null) {
111 						timer = new Timer();
112 						runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
113 					}
114 					timer.scheduleAtFixedRate(runner, 0, period * 1000);
115 					shutdown = true;
116 				} catch (IOException e) {
117 					log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
118 					log.error(ExceptionUtil.getStackTrace(e));	
119 					try {
120 						Thread.sleep(10000);
121 					} catch (InterruptedException e1) {
122 						log.error("JMXConnect thread interrupted in sleep, bailing");
123 						shutdown = true;
124 					}
125 				} catch (Exception e) {
126 					log.error("Something bad happened in JMXConnect thread, bailing");
127 					log.error(ExceptionUtil.getStackTrace(e));
128 					timer.cancel();
129 					timer = null;
130 					shutdown = true;
131 				}						
132 			}
133 		}
134 		
135 	}
136 	
137 	/**
138 	 * A TimerTask which queries the mbean server for all mbeans that match the pattern specified in
139 	 * the JMXAdaptor arguments, constructs a json object of all data and sends it as a chunk. The 
140 	 * CompositeType, TabularType and Array open mbean types return the numerical values (sizes). 
141 	 * This task is scheduled to run at the interval specified in the adaptor arguments. If the 
142 	 * connection to mbean server is broken, this task cancels the existing timer and tries to 
143 	 * re-connect to the mbean server.  
144 	 */
145 	
146 	public class JMXTimer extends TimerTask{
147 
148 		private Logger log = Logger.getLogger(JMXTimer.class);
149 		private ChunkReceiver receiver = null;
150 		private JMXAdaptor adaptor = null;
151 		private MBeanServerConnection mbsc = null;
152 		//private long sendOffset = 0;
153 		
154 		public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
155 			this.receiver = receiver;
156 			this.adaptor = adaptor;		
157 			this.mbsc = mbsc;
158 		}
159 		
160 		@SuppressWarnings("unchecked")
161 		@Override
162 		public void run() {
163 			try{
164 				ObjectName query = null;
165 				if(!pattern.equals("")){
166 					query = new ObjectName(pattern);			
167 				}
168 				Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
169 				Object val = null;
170 				JSONObject json = new JSONObject();
171 									
172 				for (ObjectName oname: names) {			
173 					MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
174 					MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();						
175 					
176 					for (MBeanAttributeInfo mb: mbinfos) {
177 						try{
178 							Descriptor d = mb.getDescriptor();
179 							val = mbsc.getAttribute(oname, mb.getName());
180 							if(d.getFieldNames().length > 0){ //this is an open mbean
181 								OpenType openType = (OpenType)d.getFieldValue("openType");	
182 								
183 								if(openType.isArray()){									
184 									Object[] valarray = (Object[])val;									
185 									val = Integer.toString(valarray.length);
186 								}
187 								else if(openType instanceof CompositeType){
188 									CompositeData data = (CompositeData)val;
189 									val = Integer.toString(data.values().size());									
190 								}
191 								else if(openType instanceof TabularType){
192 									TabularData data = (TabularData)val;
193 									val = Integer.toString(data.size());
194 								}
195 								//else it is SimpleType									
196 							}
197 							json.put(mb.getName(),val);
198 						}
199 						catch(Exception e){
200 							log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
201 						}						
202 					}
203 				}
204 				
205 				byte[] data = json.toString().getBytes();		
206 				sendOffset+=data.length;				
207 				ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
208 				long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
209 				c.addTag("timeStamp=\""+rightNow+"\"");
210 				receiver.add(c);
211 			}
212 			catch(ConnectException e1){
213 				log.error("Got connect exception for the existing MBeanServerConnection");
214 				log.error(ExceptionUtil.getStackTrace(e1));
215 				log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
216 				timer.cancel();
217 				timer = null;
218 				Thread connectThread = new Thread(new JMXConnect());
219 				connectThread.start();
220 			}
221 			catch(Exception e){
222 				log.error(ExceptionUtil.getStackTrace(e));
223 			}
224 			
225 		}
226 		
227 	}
228 	
229 	
230 	@Override
231 	public String getCurrentStatus() {
232 		StringBuilder buffer = new StringBuilder();
233 		buffer.append(type);
234 		buffer.append(" ");
235 		buffer.append(server);
236 		buffer.append(" ");
237 		buffer.append(port);
238 		buffer.append(" ");
239 		buffer.append(period);
240 		buffer.append(" ");
241 		buffer.append(pattern);
242 		return buffer.toString();
243 	}
244 	
245 	@Override
246 	public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
247 			throws AdaptorException {
248 		log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
249 		try {
250 			if(jmxc != null){
251 				jmxc.close();
252 			}
253 			if(timer != null){
254 				timer.cancel();
255 			}
256 		} catch (IOException e) {
257 			log.error("JMXAdaptor shutdown failed due to IOException");
258 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
259 		} catch (Exception e) {
260 			log.error("JMXAdaptor shutdown failed");
261 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
262 		}
263 		//in case the start thread is still retrying
264 		shutdown = true;
265 	    return sendOffset;
266 		
267 		
268 	}
269 
270 	@Override
271 	public void start(long offset) throws AdaptorException {
272 		try {			
273 			sendOffset = offset;
274 			Thread connectThread = new Thread(new JMXConnect());
275 			connectThread.start();			
276 		} catch(Exception e) {
277 			log.error("Failed to schedule JMX connect thread");
278 			throw new AdaptorException(ExceptionUtil.getStackTrace(e));	
279 		}
280 		
281 	}
282 
283 	@Override
284 	public String parseArgs(String s) {
285 		//JMXAdaptor MBeanServer port [interval] DomainNamePattern-Ex:"Hadoop:*"
286 		String[] tokens = s.split(" ");
287 		if(tokens.length == 4){
288 			server = tokens[0];
289 			port = tokens[1];
290 			period = Integer.parseInt(tokens[2]);
291 			pattern = tokens[3];
292 		}
293 		else if(tokens.length == 3){
294 			server = tokens[0];
295 			port = tokens[1];
296 			pattern = tokens[2];
297 		}
298 		else{
299 			log.warn("bad syntax in JMXAdaptor args");
300 			return null;
301 		}
302 		String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
303 		try{
304 			url = new JMXServiceURL(url_string);			
305 			return s;
306 		}
307 		catch(Exception e){
308 			log.error(ExceptionUtil.getStackTrace(e));
309 		}
310 		return null;		
311 	}
312 	
313 }
314