This project has retired. For details please refer to its
Attic page.
JMXAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.nio.charset.Charset;
27 import java.rmi.ConnectException;
28 import java.util.Calendar;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.TimeZone;
33 import java.util.Timer;
34 import java.util.TimerTask;
35 import java.util.TreeSet;
36
37 import javax.management.Descriptor;
38 import javax.management.MBeanAttributeInfo;
39 import javax.management.MBeanInfo;
40 import javax.management.MBeanServerConnection;
41 import javax.management.ObjectName;
42 import javax.management.openmbean.CompositeData;
43 import javax.management.openmbean.CompositeType;
44 import javax.management.openmbean.OpenType;
45 import javax.management.openmbean.TabularData;
46 import javax.management.openmbean.TabularType;
47 import javax.management.remote.JMXConnector;
48 import javax.management.remote.JMXConnectorFactory;
49 import javax.management.remote.JMXServiceURL;
50
51 import org.apache.hadoop.chukwa.ChunkImpl;
52 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
53 import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
54 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
55 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
56 import org.apache.hadoop.chukwa.util.ExceptionUtil;
57 import org.apache.log4j.Logger;
58 import org.json.simple.JSONObject;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class JMXAdaptor extends AbstractAdaptor{
75
76 private static Logger log = Logger.getLogger(JMXAdaptor.class);
77 private MBeanServerConnection mbsc = null;
78 private String port ="", server="localhost";
79 private JMXServiceURL url;
80 private JMXConnector jmxc = null;
81 private long period = 10;
82 private Timer timer;
83 private JMXTimer runner;
84 private String pattern = "";
85 long sendOffset = 0;
86 volatile boolean shutdown = false;
87
88
89
90
91
92 public class JMXConnect implements Runnable{
93
94 @Override
95 public void run() {
96 String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
97 StringBuffer sb = new StringBuffer(hadoop_conf);
98 if(!hadoop_conf.endsWith("/")){
99 sb.append(File.separator);
100 }
101 sb.append("jmxremote.password");
102 File jmx_pw_file = new File(sb.toString());
103 shutdown = false;
104 while(!shutdown){
105 try{
106 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
107 String buffer = br.readLine();
108 String[] creds = null;
109 if(buffer != null ) {
110 creds = buffer.split(" ");
111 }
112 br.close();
113 Map<String, String[]> env = new HashMap<String, String[]>();
114 if(creds!=null) {
115 env.put(JMXConnector.CREDENTIALS, creds);
116 }
117 jmxc = JMXConnectorFactory.connect(url, env);
118 mbsc = jmxc.getMBeanServerConnection();
119 if(timer == null) {
120 timer = new Timer();
121 runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
122 }
123 timer.scheduleAtFixedRate(runner, 0, period * 1000);
124 shutdown = true;
125 } catch (IOException e) {
126 log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
127 log.error(ExceptionUtil.getStackTrace(e));
128 try {
129 Thread.sleep(10000);
130 } catch (InterruptedException e1) {
131 log.error("JMXConnect thread interrupted in sleep, bailing");
132 shutdown = true;
133 }
134 } catch (Exception e) {
135 log.error("Something bad happened in JMXConnect thread, bailing");
136 log.error(ExceptionUtil.getStackTrace(e));
137 timer.cancel();
138 timer = null;
139 shutdown = true;
140 }
141 }
142 }
143
144 }
145
146
147
148
149
150
151
152
153
154
155 public class JMXTimer extends TimerTask{
156
157 private Logger log = Logger.getLogger(JMXTimer.class);
158 private ChunkReceiver receiver = null;
159 private JMXAdaptor adaptor = null;
160 private MBeanServerConnection mbsc = null;
161
162
163 public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
164 this.receiver = receiver;
165 this.adaptor = adaptor;
166 this.mbsc = mbsc;
167 }
168
169 @SuppressWarnings("unchecked")
170 @Override
171 public void run() {
172 try{
173 ObjectName query = null;
174 if(!pattern.equals("")){
175 query = new ObjectName(pattern);
176 }
177 Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
178 Object val = null;
179 JSONObject json = new JSONObject();
180
181 for (ObjectName oname: names) {
182 MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
183 MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();
184
185 for (MBeanAttributeInfo mb: mbinfos) {
186 try{
187 Descriptor d = mb.getDescriptor();
188 val = mbsc.getAttribute(oname, mb.getName());
189 if(d.getFieldNames().length > 0){
190 OpenType<?> openType = (OpenType<?>)d.getFieldValue("openType");
191
192 if(openType.isArray()){
193 Object[] valarray = (Object[])val;
194 val = Integer.toString(valarray.length);
195 }
196 else if(openType instanceof CompositeType){
197 CompositeData data = (CompositeData)val;
198 val = Integer.toString(data.values().size());
199 }
200 else if(openType instanceof TabularType){
201 TabularData data = (TabularData)val;
202 val = Integer.toString(data.size());
203 }
204
205 }
206 json.put(mb.getName(),val);
207 }
208 catch(Exception e){
209 log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
210 }
211 }
212 }
213
214 byte[] data = json.toString().getBytes(Charset.forName("UTF-8"));
215 sendOffset+=data.length;
216 ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
217 long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
218 c.addTag("timeStamp=\""+rightNow+"\"");
219 receiver.add(c);
220 }
221 catch(ConnectException e1){
222 log.error("Got connect exception for the existing MBeanServerConnection");
223 log.error(ExceptionUtil.getStackTrace(e1));
224 log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
225 timer.cancel();
226 timer = null;
227 Thread connectThread = new Thread(new JMXConnect());
228 connectThread.start();
229 }
230 catch(Exception e){
231 log.error(ExceptionUtil.getStackTrace(e));
232 }
233
234 }
235
236 }
237
238
239 @Override
240 public String getCurrentStatus() {
241 StringBuilder buffer = new StringBuilder();
242 buffer.append(type);
243 buffer.append(" ");
244 buffer.append(server);
245 buffer.append(" ");
246 buffer.append(port);
247 buffer.append(" ");
248 buffer.append(period);
249 buffer.append(" ");
250 buffer.append(pattern);
251 return buffer.toString();
252 }
253
254 @Override
255 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
256 throws AdaptorException {
257 log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
258 try {
259 if(jmxc != null){
260 jmxc.close();
261 }
262 if(timer != null){
263 timer.cancel();
264 }
265 } catch (IOException e) {
266 log.error("JMXAdaptor shutdown failed due to IOException");
267 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
268 } catch (Exception e) {
269 log.error("JMXAdaptor shutdown failed");
270 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
271 }
272
273 shutdown = true;
274 return sendOffset;
275
276
277 }
278
279 @Override
280 public void start(long offset) throws AdaptorException {
281 try {
282 sendOffset = offset;
283 Thread connectThread = new Thread(new JMXConnect());
284 connectThread.start();
285 } catch(Exception e) {
286 log.error("Failed to schedule JMX connect thread");
287 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
288 }
289
290 }
291
292 @Override
293 public String parseArgs(String s) {
294
295 String[] tokens = s.split(" ");
296 if(tokens.length == 4){
297 server = tokens[0];
298 port = tokens[1];
299 period = Integer.parseInt(tokens[2]);
300 pattern = tokens[3];
301 }
302 else if(tokens.length == 3){
303 server = tokens[0];
304 port = tokens[1];
305 pattern = tokens[2];
306 }
307 else{
308 log.warn("bad syntax in JMXAdaptor args");
309 return null;
310 }
311 String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
312 try{
313 url = new JMXServiceURL(url_string);
314 return s;
315 }
316 catch(Exception e){
317 log.error(ExceptionUtil.getStackTrace(e));
318 }
319 return null;
320 }
321
322 }
323