This project has retired. For details please refer to its Attic page.
RestAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.FileInputStream;
22  import java.nio.charset.Charset;
23  import java.security.KeyStore;
24  import java.security.SecureRandom;
25  import java.util.Calendar;
26  import java.util.TimeZone;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  
30  import org.apache.hadoop.chukwa.ChunkImpl;
31  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
32  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
33  import org.apache.log4j.Logger;
34  import org.apache.hadoop.chukwa.util.ExceptionUtil;
35  import org.apache.hadoop.conf.Configuration;
36  
37  import static org.apache.hadoop.chukwa.datacollection.agent.ChukwaConstants.*;
38  
39  import com.sun.jersey.api.client.Client;
40  import com.sun.jersey.api.client.WebResource;
41  import com.sun.jersey.api.client.config.ClientConfig;
42  import com.sun.jersey.api.client.config.DefaultClientConfig;
43  import com.sun.jersey.client.urlconnection.HTTPSProperties;
44  
45  import javax.net.ssl.SSLContext;
46  import javax.net.ssl.TrustManager;
47  import javax.net.ssl.TrustManagerFactory;
48  import javax.ws.rs.core.MediaType;
49  
50  public class RestAdaptor extends AbstractAdaptor {
51  
52    private String uri;
53    private long period = 60;
54    private static Logger log = Logger.getLogger(RestAdaptor.class);
55    private WebResource resource;
56    private Client c;
57    private String bean;
58    private Timer timer;
59    private TimerTask runner;
60    private long sendOffset;
61  
62    class RestTimer extends TimerTask {
63  
64      private ChunkReceiver receiver;
65      private RestAdaptor adaptor;
66  
67      RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
68        this.receiver = receiver;
69        this.adaptor = adaptor;
70      }
71  
72      @Override
73      public void run() {
74        try {
75          resource = c.resource(uri);
76          bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
77              String.class);
78          byte[] data = bean.getBytes(Charset.forName("UTF-8"));
79          sendOffset += data.length;
80          ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
81          long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
82              .getTimeInMillis();
83          c.addTag("timeStamp=\"" + rightNow + "\"");
84          receiver.add(c);
85        } catch (com.sun.jersey.api.client.ClientHandlerException e) {
86          Throwable t = e.getCause();
87          if (t instanceof java.net.ConnectException) {
88            log.warn("Connect exception trying to connect to " + uri
89                + ". Make sure the service is running");
90          } else {
91            log.error("RestAdaptor: Interrupted exception");
92            log.error(ExceptionUtil.getStackTrace(e));
93          }
94        } catch (Exception e) {
95          log.error("RestAdaptor: Interrupted exception");
96          log.error(ExceptionUtil.getStackTrace(e));
97        }
98      }
99    }
100 
101   @Override
102   public String getCurrentStatus() {
103     StringBuilder buffer = new StringBuilder();
104     buffer.append(type);
105     buffer.append(" ");
106     buffer.append(uri);
107     buffer.append(" ");
108     buffer.append(period);
109     return buffer.toString();
110   }
111 
112   @Override
113   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
114       throws AdaptorException {
115     timer.cancel();
116     return sendOffset;
117   }
118 
119   @Override
120   public void start(long offset) throws AdaptorException {
121     sendOffset = offset;
122     if (timer == null) {
123       timer = new Timer();
124       runner = new RestTimer(dest, RestAdaptor.this);
125     }
126     timer.scheduleAtFixedRate(runner, 0, period * 1000);
127   }
128 
129   @Override
130   public String parseArgs(String s) {
131     // RestAdaptor [Host] port uri [interval]
132     String[] tokens = s.split(" ");
133     if (tokens.length == 2) {
134       uri = tokens[0];
135       try {
136         period = Integer.parseInt(tokens[1]);
137       } catch (NumberFormatException e) {
138         log.warn("RestAdaptor: incorrect argument for period. Expecting number");
139         return null;
140       }
141     } else {
142       log.warn("bad syntax in RestAdaptor args");
143       return null;
144     }
145     try {
146       initClient();
147     } catch (Exception e) {
148       log.error(ExceptionUtil.getStackTrace(e));
149       return null;
150     }    
151     return s;
152   }
153   
154   private void initClient() throws Exception {
155     if (uri.contains("https")) {
156       Configuration conf = ChukwaAgent.getAgent().getConfiguration();
157       String trustStoreFile = conf.get(TRUSTSTORE_STORE);
158       String trustStorePw = conf.get(TRUST_PASSWORD);
159       if (trustStoreFile == null || trustStorePw == null) {
160         throw new Exception(
161             "Cannot instantiate RestAdaptor to uri "
162                 + uri
163                 + " due to missing trust store configurations chukwa.ssl.truststore.store and chukwa.ssl.trust.password");
164       }
165       String trustStoreType = conf.get(TRUSTSTORE_TYPE, DEFAULT_STORE_TYPE);
166       KeyStore trustStore = KeyStore.getInstance(trustStoreType);
167       FileInputStream fis = null;
168       try {
169         fis = new FileInputStream(trustStoreFile);
170         trustStore.load(fis, trustStorePw.toCharArray());
171       } finally {
172         if (fis != null) {
173           fis.close();
174         }
175       }
176       TrustManagerFactory tmf = TrustManagerFactory
177           .getInstance(TrustManagerFactory.getDefaultAlgorithm());
178       tmf.init(trustStore);
179       TrustManager[] trustManagers = tmf.getTrustManagers();
180 
181       SSLContext ctx = null;
182       String protocol = conf.get(SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL);
183       ctx = SSLContext.getInstance(protocol);
184       ctx.init(null, trustManagers, new SecureRandom());
185       ClientConfig cc = new DefaultClientConfig();
186       HTTPSProperties props = new HTTPSProperties(null, ctx);
187       cc.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, props);
188       c = Client.create(cc);
189     } else {
190       c = Client.create();
191     }
192   }
193 
194 }