This project has retired. For details please refer to its
Attic page.
HttpTriggerAction xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datatrigger;
19
20 import org.apache.hadoop.fs.FileSystem;
21 import org.apache.hadoop.fs.FileStatus;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import java.io.IOException;
27 import java.io.InputStreamReader;
28 import java.io.BufferedReader;
29 import java.io.OutputStreamWriter;
30 import java.net.URL;
31 import java.net.HttpURLConnection;
32 import java.net.MalformedURLException;
33 import java.util.Map;
34 import java.util.HashMap;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class HttpTriggerAction implements TriggerAction {
71 protected Log log = LogFactory.getLog(getClass());
72
73
74
75
76
77
78
79
80
81
82
83
84 public void execute(Configuration conf, FileSystem fs,
85 FileStatus[] src, TriggerEvent triggerEvent) throws IOException {
86
87 if (log.isDebugEnabled()) {
88 for (FileStatus file : src) {
89 log.debug("Execute file: " + file.getPath());
90 }
91 }
92
93 int reqNumber = 1;
94 URL url = null;
95 while ((url = getUrl(conf, triggerEvent, reqNumber)) != null) {
96
97
98 String method = getMethod(conf, triggerEvent, reqNumber);
99 Map<String, String> headers = getHeaders(conf, triggerEvent, reqNumber);
100 String body = getBody(conf, triggerEvent, reqNumber);
101 int connectTimeout = getConnectTimeout(conf, triggerEvent, reqNumber);
102 int readTimeout = getReadTimeout(conf, triggerEvent, reqNumber);
103
104 try {
105
106 makeHttpRequest(url, method, headers, body, connectTimeout, readTimeout);
107 }
108 catch(Exception e) {
109 log.error("Error making request to " + url, e);
110 }
111 reqNumber++;
112 }
113 }
114
115 private void makeHttpRequest(URL url, String method,
116 Map<String, String> headers, String body,
117 int connectTimeout, int readTimeout) throws IOException {
118 if (url == null) {
119 return;
120 }
121
122
123 HttpURLConnection conn = (HttpURLConnection)url.openConnection();
124 conn.setRequestMethod(method);
125 conn.setDoInput(true);
126 conn.setConnectTimeout(connectTimeout);
127 conn.setReadTimeout(readTimeout);
128
129
130 boolean contentLengthExists = false;
131 if (headers != null) {
132 for (String name: headers.keySet()) {
133 if (log.isDebugEnabled()) {
134 log.debug("Setting header " + name + ": " + headers.get(name));
135 }
136 if (name.equalsIgnoreCase("content-length")) {
137 contentLengthExists = true;
138 }
139 conn.setRequestProperty(name, headers.get(name));
140 }
141 }
142
143
144 if (!"GET".equals(method) && !contentLengthExists) {
145 String contentLength = body != null ? String.valueOf(body.length()) : "0";
146 conn.setRequestProperty("Content-Length", contentLength);
147 }
148
149
150 if (body != null) {
151 conn.setDoOutput(true);
152 OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream());
153 writer.write(body);
154 writer.flush();
155 writer.close();
156 }
157 else {
158 conn.setDoOutput(false);
159 }
160
161
162 log.info("Making HTTP " + method + " to: " + url);
163 int responseCode = conn.getResponseCode();
164 log.info("HTTP Response code: " + responseCode);
165
166
167 if (responseCode != 200) {
168 log.info("HTTP Response message: " + conn.getResponseMessage());
169 }
170 else {
171 BufferedReader reader = new BufferedReader(
172 new InputStreamReader(conn.getInputStream()));
173 String line;
174 StringBuilder sb = new StringBuilder();
175 while ((line = reader.readLine()) != null) {
176 if(sb.length() > 0) {
177 sb.append("\n");
178 }
179 sb.append(line);
180 }
181 log.info("HTTP Response:\n" + sb);
182
183 reader.close();
184 }
185
186 conn.disconnect();
187 }
188
189 protected URL getUrl(Configuration conf,
190 TriggerEvent triggerEvent,
191 int reqNumber) throws MalformedURLException {
192 String urlString = conf.get(getConfigKey(triggerEvent, reqNumber, "url"), null);
193 if (urlString == null) {
194 return null;
195 }
196
197 return new URL(urlString);
198 }
199
200 protected String getMethod(Configuration conf,
201 TriggerEvent triggerEvent,
202 int reqNumber) {
203 return conf.get(getConfigKey(triggerEvent, reqNumber, "method"), "GET");
204 }
205
206 protected Map<String, String> getHeaders(Configuration conf,
207 TriggerEvent triggerEvent,
208 int reqNumber) {
209 Map<String, String> headerMap = new HashMap<String,String>();
210
211 String headers = conf.get(getConfigKey(triggerEvent, reqNumber, "headers"), null);
212
213 if (headers != null) {
214 String[] headersSplit = headers.split(",");
215 for (String header : headersSplit) {
216 String[] nvp = header.split(":", 2);
217 if (nvp.length < 2) {
218 log.error("Invalid HTTP header found: " + nvp);
219 continue;
220 }
221 headerMap.put(nvp[0].trim(), nvp[1].trim());
222 }
223 }
224
225 return headerMap;
226 }
227
228 protected String getBody(Configuration conf,
229 TriggerEvent triggerEvent,
230 int reqNumber) {
231 return conf.get(getConfigKey(triggerEvent, reqNumber, "body"), "GET");
232 }
233
234 protected int getConnectTimeout(Configuration conf,
235 TriggerEvent triggerEvent,
236 int reqNumber) {
237 String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "connect.timeout"), null);
238 return timeout != null ? Integer.parseInt(timeout) : 5000;
239 }
240
241
242 protected int getReadTimeout(Configuration conf,
243 TriggerEvent triggerEvent,
244 int reqNumber) {
245 String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "read.timeout"), null);
246 return timeout != null ? Integer.parseInt(timeout) : 5000;
247 }
248
249 private String getConfigKey(TriggerEvent triggerEvent, int reqNumber, String name) {
250 return triggerEvent.getConfigKeyBase() + ".http." + reqNumber + "." + name;
251 }
252 }