This project has retired. For details please refer to its
Attic page.
JMXAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileReader;
24 import java.io.IOException;
25 import java.rmi.ConnectException;
26 import java.util.Calendar;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.TimeZone;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.TreeSet;
34
35 import javax.management.Descriptor;
36 import javax.management.MBeanAttributeInfo;
37 import javax.management.MBeanInfo;
38 import javax.management.MBeanServerConnection;
39 import javax.management.ObjectName;
40 import javax.management.openmbean.CompositeData;
41 import javax.management.openmbean.CompositeType;
42 import javax.management.openmbean.OpenType;
43 import javax.management.openmbean.TabularData;
44 import javax.management.openmbean.TabularType;
45 import javax.management.remote.JMXConnector;
46 import javax.management.remote.JMXConnectorFactory;
47 import javax.management.remote.JMXServiceURL;
48
49 import org.apache.hadoop.chukwa.ChunkImpl;
50 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
51 import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
52 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
53 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
54 import org.apache.hadoop.chukwa.util.ExceptionUtil;
55 import org.apache.log4j.Logger;
56 import org.json.simple.JSONObject;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class JMXAdaptor extends AbstractAdaptor{
73
74 private static Logger log = Logger.getLogger(JMXAdaptor.class);
75 private MBeanServerConnection mbsc = null;
76 private String port ="", server="localhost";
77 private JMXServiceURL url;
78 private JMXConnector jmxc = null;
79 private long period = 10;
80 private Timer timer;
81 private JMXTimer runner;
82 private String pattern = "";
83 long sendOffset = 0;
84 volatile boolean shutdown = false;
85
86
87
88
89
90 public class JMXConnect implements Runnable{
91
92 @Override
93 public void run() {
94 String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
95 StringBuffer sb = new StringBuffer(hadoop_conf);
96 if(!hadoop_conf.endsWith("/")){
97 sb.append(File.separator);
98 }
99 sb.append("jmxremote.password");
100 String jmx_pw_file = sb.toString();
101 shutdown = false;
102 while(!shutdown){
103 try{
104 BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
105 String[] creds = br.readLine().split(" ");
106 Map<String, String[]> env = new HashMap<String, String[]>();
107 env.put(JMXConnector.CREDENTIALS, creds);
108 jmxc = JMXConnectorFactory.connect(url, env);
109 mbsc = jmxc.getMBeanServerConnection();
110 if(timer == null) {
111 timer = new Timer();
112 runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
113 }
114 timer.scheduleAtFixedRate(runner, 0, period * 1000);
115 shutdown = true;
116 } catch (IOException e) {
117 log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
118 log.error(ExceptionUtil.getStackTrace(e));
119 try {
120 Thread.sleep(10000);
121 } catch (InterruptedException e1) {
122 log.error("JMXConnect thread interrupted in sleep, bailing");
123 shutdown = true;
124 }
125 } catch (Exception e) {
126 log.error("Something bad happened in JMXConnect thread, bailing");
127 log.error(ExceptionUtil.getStackTrace(e));
128 timer.cancel();
129 timer = null;
130 shutdown = true;
131 }
132 }
133 }
134
135 }
136
137
138
139
140
141
142
143
144
145
146 public class JMXTimer extends TimerTask{
147
148 private Logger log = Logger.getLogger(JMXTimer.class);
149 private ChunkReceiver receiver = null;
150 private JMXAdaptor adaptor = null;
151 private MBeanServerConnection mbsc = null;
152
153
154 public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
155 this.receiver = receiver;
156 this.adaptor = adaptor;
157 this.mbsc = mbsc;
158 }
159
160 @SuppressWarnings("unchecked")
161 @Override
162 public void run() {
163 try{
164 ObjectName query = null;
165 if(!pattern.equals("")){
166 query = new ObjectName(pattern);
167 }
168 Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
169 Object val = null;
170 JSONObject json = new JSONObject();
171
172 for (ObjectName oname: names) {
173 MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
174 MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();
175
176 for (MBeanAttributeInfo mb: mbinfos) {
177 try{
178 Descriptor d = mb.getDescriptor();
179 val = mbsc.getAttribute(oname, mb.getName());
180 if(d.getFieldNames().length > 0){
181 OpenType openType = (OpenType)d.getFieldValue("openType");
182
183 if(openType.isArray()){
184 Object[] valarray = (Object[])val;
185 val = Integer.toString(valarray.length);
186 }
187 else if(openType instanceof CompositeType){
188 CompositeData data = (CompositeData)val;
189 val = Integer.toString(data.values().size());
190 }
191 else if(openType instanceof TabularType){
192 TabularData data = (TabularData)val;
193 val = Integer.toString(data.size());
194 }
195
196 }
197 json.put(mb.getName(),val);
198 }
199 catch(Exception e){
200 log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
201 }
202 }
203 }
204
205 byte[] data = json.toString().getBytes();
206 sendOffset+=data.length;
207 ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
208 long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
209 c.addTag("timeStamp=\""+rightNow+"\"");
210 receiver.add(c);
211 }
212 catch(ConnectException e1){
213 log.error("Got connect exception for the existing MBeanServerConnection");
214 log.error(ExceptionUtil.getStackTrace(e1));
215 log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
216 timer.cancel();
217 timer = null;
218 Thread connectThread = new Thread(new JMXConnect());
219 connectThread.start();
220 }
221 catch(Exception e){
222 log.error(ExceptionUtil.getStackTrace(e));
223 }
224
225 }
226
227 }
228
229
230 @Override
231 public String getCurrentStatus() {
232 StringBuilder buffer = new StringBuilder();
233 buffer.append(type);
234 buffer.append(" ");
235 buffer.append(server);
236 buffer.append(" ");
237 buffer.append(port);
238 buffer.append(" ");
239 buffer.append(period);
240 buffer.append(" ");
241 buffer.append(pattern);
242 return buffer.toString();
243 }
244
245 @Override
246 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
247 throws AdaptorException {
248 log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
249 try {
250 if(jmxc != null){
251 jmxc.close();
252 }
253 if(timer != null){
254 timer.cancel();
255 }
256 } catch (IOException e) {
257 log.error("JMXAdaptor shutdown failed due to IOException");
258 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
259 } catch (Exception e) {
260 log.error("JMXAdaptor shutdown failed");
261 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
262 }
263
264 shutdown = true;
265 return sendOffset;
266
267
268 }
269
270 @Override
271 public void start(long offset) throws AdaptorException {
272 try {
273 sendOffset = offset;
274 Thread connectThread = new Thread(new JMXConnect());
275 connectThread.start();
276 } catch(Exception e) {
277 log.error("Failed to schedule JMX connect thread");
278 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
279 }
280
281 }
282
283 @Override
284 public String parseArgs(String s) {
285
286 String[] tokens = s.split(" ");
287 if(tokens.length == 4){
288 server = tokens[0];
289 port = tokens[1];
290 period = Integer.parseInt(tokens[2]);
291 pattern = tokens[3];
292 }
293 else if(tokens.length == 3){
294 server = tokens[0];
295 port = tokens[1];
296 pattern = tokens[2];
297 }
298 else{
299 log.warn("bad syntax in JMXAdaptor args");
300 return null;
301 }
302 String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
303 try{
304 url = new JMXServiceURL(url_string);
305 return s;
306 }
307 catch(Exception e){
308 log.error(ExceptionUtil.getStackTrace(e));
309 }
310 return null;
311 }
312
313 }
314