This project has retired. For details please refer to its Attic page.
RestAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.util.Calendar;
22  import java.util.TimeZone;
23  import java.util.Timer;
24  import java.util.TimerTask;
25  
26  import org.apache.hadoop.chukwa.ChunkImpl;
27  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
28  import org.apache.log4j.Logger;
29  import org.apache.hadoop.chukwa.util.ExceptionUtil;
30  
31  import com.sun.jersey.api.client.Client;
32  import com.sun.jersey.api.client.WebResource;
33  
34  import javax.ws.rs.core.MediaType;
35  
36  public class RestAdaptor extends AbstractAdaptor {
37  
38    private String uri;
39    private long period = 60;
40    private static Logger log = Logger.getLogger(RestAdaptor.class);
41    private WebResource resource;
42    private Client c;
43    private String bean;
44    private Timer timer;
45    private TimerTask runner;
46    private long sendOffset;
47  
48    class RestTimer extends TimerTask {
49  
50      private ChunkReceiver receiver;
51      private RestAdaptor adaptor;
52  
53      RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
54        this.receiver = receiver;
55        this.adaptor = adaptor;
56      }
57  
58      @Override
59      public void run() {
60        try {
61          resource = c.resource(uri);
62          bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
63              String.class);
64          byte[] data = bean.getBytes();
65          sendOffset += data.length;
66          ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
67          long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
68              .getTimeInMillis();
69          c.addTag("timeStamp=\"" + rightNow + "\"");
70          receiver.add(c);
71        } catch (com.sun.jersey.api.client.ClientHandlerException e) {
72          Throwable t = e.getCause();
73          if (t instanceof java.net.ConnectException) {
74            log.warn("Connect exception trying to connect to " + uri
75                + ". Make sure the service is running");
76          } else {
77            log.error("RestAdaptor: Interrupted exception");
78            log.error(ExceptionUtil.getStackTrace(e));
79          }
80        } catch (Exception e) {
81          log.error("RestAdaptor: Interrupted exception");
82          log.error(ExceptionUtil.getStackTrace(e));
83        }
84      }
85    }
86  
87    @Override
88    public String getCurrentStatus() {
89      StringBuilder buffer = new StringBuilder();
90      buffer.append(type);
91      buffer.append(" ");
92      buffer.append(uri);
93      buffer.append(" ");
94      buffer.append(period);
95      return buffer.toString();
96    }
97  
98    @Override
99    public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
100       throws AdaptorException {
101     timer.cancel();
102     return sendOffset;
103   }
104 
105   @Override
106   public void start(long offset) throws AdaptorException {
107     sendOffset = offset;
108     if (timer == null) {
109       timer = new Timer();
110       runner = new RestTimer(dest, RestAdaptor.this);
111     }
112     timer.scheduleAtFixedRate(runner, 0, period * 1000);
113   }
114 
115   @Override
116   public String parseArgs(String s) {
117     // RestAdaptor [Host] port uri [interval]
118     String[] tokens = s.split(" ");
119     if (tokens.length == 2) {
120       uri = tokens[0];
121       try {
122         period = Integer.parseInt(tokens[1]);
123       } catch (NumberFormatException e) {
124         log.warn("RestAdaptor: incorrect argument for period. Expecting number");
125         return null;
126       }
127     } else {
128       log.warn("bad syntax in RestAdaptor args");
129       return null;
130     }
131     c = Client.create();
132     return s;
133   }
134 
135 }