This project has retired. For details please refer to its
Attic page.
RestAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.util.Calendar;
22 import java.util.TimeZone;
23 import java.util.Timer;
24 import java.util.TimerTask;
25
26 import org.apache.hadoop.chukwa.ChunkImpl;
27 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
28 import org.apache.log4j.Logger;
29 import org.apache.hadoop.chukwa.util.ExceptionUtil;
30
31 import com.sun.jersey.api.client.Client;
32 import com.sun.jersey.api.client.WebResource;
33
34 import javax.ws.rs.core.MediaType;
35
36 public class RestAdaptor extends AbstractAdaptor {
37
38 private String uri;
39 private long period = 60;
40 private static Logger log = Logger.getLogger(RestAdaptor.class);
41 private WebResource resource;
42 private Client c;
43 private String bean;
44 private Timer timer;
45 private TimerTask runner;
46 private long sendOffset;
47
48 class RestTimer extends TimerTask {
49
50 private ChunkReceiver receiver;
51 private RestAdaptor adaptor;
52
53 RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
54 this.receiver = receiver;
55 this.adaptor = adaptor;
56 }
57
58 @Override
59 public void run() {
60 try {
61 resource = c.resource(uri);
62 bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
63 String.class);
64 byte[] data = bean.getBytes();
65 sendOffset += data.length;
66 ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
67 long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
68 .getTimeInMillis();
69 c.addTag("timeStamp=\"" + rightNow + "\"");
70 receiver.add(c);
71 } catch (com.sun.jersey.api.client.ClientHandlerException e) {
72 Throwable t = e.getCause();
73 if (t instanceof java.net.ConnectException) {
74 log.warn("Connect exception trying to connect to " + uri
75 + ". Make sure the service is running");
76 } else {
77 log.error("RestAdaptor: Interrupted exception");
78 log.error(ExceptionUtil.getStackTrace(e));
79 }
80 } catch (Exception e) {
81 log.error("RestAdaptor: Interrupted exception");
82 log.error(ExceptionUtil.getStackTrace(e));
83 }
84 }
85 }
86
87 @Override
88 public String getCurrentStatus() {
89 StringBuilder buffer = new StringBuilder();
90 buffer.append(type);
91 buffer.append(" ");
92 buffer.append(uri);
93 buffer.append(" ");
94 buffer.append(period);
95 return buffer.toString();
96 }
97
98 @Override
99 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
100 throws AdaptorException {
101 timer.cancel();
102 return sendOffset;
103 }
104
105 @Override
106 public void start(long offset) throws AdaptorException {
107 sendOffset = offset;
108 if (timer == null) {
109 timer = new Timer();
110 runner = new RestTimer(dest, RestAdaptor.this);
111 }
112 timer.scheduleAtFixedRate(runner, 0, period * 1000);
113 }
114
115 @Override
116 public String parseArgs(String s) {
117
118 String[] tokens = s.split(" ");
119 if (tokens.length == 2) {
120 uri = tokens[0];
121 try {
122 period = Integer.parseInt(tokens[1]);
123 } catch (NumberFormatException e) {
124 log.warn("RestAdaptor: incorrect argument for period. Expecting number");
125 return null;
126 }
127 } else {
128 log.warn("bad syntax in RestAdaptor args");
129 return null;
130 }
131 c = Client.create();
132 return s;
133 }
134
135 }