This project has retired. For details please refer to its
Attic page.
RestAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.FileInputStream;
22 import java.nio.charset.Charset;
23 import java.security.KeyStore;
24 import java.security.SecureRandom;
25 import java.util.Calendar;
26 import java.util.TimeZone;
27 import java.util.Timer;
28 import java.util.TimerTask;
29
30 import org.apache.hadoop.chukwa.ChunkImpl;
31 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
32 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
33 import org.apache.log4j.Logger;
34 import org.apache.hadoop.chukwa.util.ExceptionUtil;
35 import org.apache.hadoop.conf.Configuration;
36
37 import static org.apache.hadoop.chukwa.datacollection.agent.ChukwaConstants.*;
38
39 import com.sun.jersey.api.client.Client;
40 import com.sun.jersey.api.client.WebResource;
41 import com.sun.jersey.api.client.config.ClientConfig;
42 import com.sun.jersey.api.client.config.DefaultClientConfig;
43 import com.sun.jersey.client.urlconnection.HTTPSProperties;
44
45 import javax.net.ssl.SSLContext;
46 import javax.net.ssl.TrustManager;
47 import javax.net.ssl.TrustManagerFactory;
48 import javax.ws.rs.core.MediaType;
49
50 public class RestAdaptor extends AbstractAdaptor {
51
52 private String uri;
53 private long period = 60;
54 private static Logger log = Logger.getLogger(RestAdaptor.class);
55 private WebResource resource;
56 private Client c;
57 private String bean;
58 private Timer timer;
59 private TimerTask runner;
60 private long sendOffset;
61
62 class RestTimer extends TimerTask {
63
64 private ChunkReceiver receiver;
65 private RestAdaptor adaptor;
66
67 RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
68 this.receiver = receiver;
69 this.adaptor = adaptor;
70 }
71
72 @Override
73 public void run() {
74 try {
75 resource = c.resource(uri);
76 bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
77 String.class);
78 byte[] data = bean.getBytes(Charset.forName("UTF-8"));
79 sendOffset += data.length;
80 ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
81 long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
82 .getTimeInMillis();
83 c.addTag("timeStamp=\"" + rightNow + "\"");
84 receiver.add(c);
85 } catch (com.sun.jersey.api.client.ClientHandlerException e) {
86 Throwable t = e.getCause();
87 if (t instanceof java.net.ConnectException) {
88 log.warn("Connect exception trying to connect to " + uri
89 + ". Make sure the service is running");
90 } else {
91 log.error("RestAdaptor: Interrupted exception");
92 log.error(ExceptionUtil.getStackTrace(e));
93 }
94 } catch (Exception e) {
95 log.error("RestAdaptor: Interrupted exception");
96 log.error(ExceptionUtil.getStackTrace(e));
97 }
98 }
99 }
100
101 @Override
102 public String getCurrentStatus() {
103 StringBuilder buffer = new StringBuilder();
104 buffer.append(type);
105 buffer.append(" ");
106 buffer.append(uri);
107 buffer.append(" ");
108 buffer.append(period);
109 return buffer.toString();
110 }
111
112 @Override
113 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
114 throws AdaptorException {
115 timer.cancel();
116 return sendOffset;
117 }
118
119 @Override
120 public void start(long offset) throws AdaptorException {
121 sendOffset = offset;
122 if (timer == null) {
123 timer = new Timer();
124 runner = new RestTimer(dest, RestAdaptor.this);
125 }
126 timer.scheduleAtFixedRate(runner, 0, period * 1000);
127 }
128
129 @Override
130 public String parseArgs(String s) {
131
132 String[] tokens = s.split(" ");
133 if (tokens.length == 2) {
134 uri = tokens[0];
135 try {
136 period = Integer.parseInt(tokens[1]);
137 } catch (NumberFormatException e) {
138 log.warn("RestAdaptor: incorrect argument for period. Expecting number");
139 return null;
140 }
141 } else {
142 log.warn("bad syntax in RestAdaptor args");
143 return null;
144 }
145 try {
146 initClient();
147 } catch (Exception e) {
148 log.error(ExceptionUtil.getStackTrace(e));
149 return null;
150 }
151 return s;
152 }
153
154 private void initClient() throws Exception {
155 if (uri.contains("https")) {
156 Configuration conf = ChukwaAgent.getAgent().getConfiguration();
157 String trustStoreFile = conf.get(TRUSTSTORE_STORE);
158 String trustStorePw = conf.get(TRUST_PASSWORD);
159 if (trustStoreFile == null || trustStorePw == null) {
160 throw new Exception(
161 "Cannot instantiate RestAdaptor to uri "
162 + uri
163 + " due to missing trust store configurations chukwa.ssl.truststore.store and chukwa.ssl.trust.password");
164 }
165 String trustStoreType = conf.get(TRUSTSTORE_TYPE, DEFAULT_STORE_TYPE);
166 KeyStore trustStore = KeyStore.getInstance(trustStoreType);
167 FileInputStream fis = null;
168 try {
169 fis = new FileInputStream(trustStoreFile);
170 trustStore.load(fis, trustStorePw.toCharArray());
171 } finally {
172 if (fis != null) {
173 fis.close();
174 }
175 }
176 TrustManagerFactory tmf = TrustManagerFactory
177 .getInstance(TrustManagerFactory.getDefaultAlgorithm());
178 tmf.init(trustStore);
179 TrustManager[] trustManagers = tmf.getTrustManagers();
180
181 SSLContext ctx = null;
182 String protocol = conf.get(SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL);
183 ctx = SSLContext.getInstance(protocol);
184 ctx.init(null, trustManagers, new SecureRandom());
185 ClientConfig cc = new DefaultClientConfig();
186 HTTPSProperties props = new HTTPSProperties(null, ctx);
187 cc.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, props);
188 c = Client.create(cc);
189 } else {
190 c = Client.create();
191 }
192 }
193
194 }