This project has retired. For details please refer to its Attic page.
HttpTriggerAction xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datatrigger;
19  
20  import org.apache.hadoop.fs.FileSystem;
21  import org.apache.hadoop.fs.FileStatus;
22  import org.apache.hadoop.conf.Configuration;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  
26  import java.io.IOException;
27  import java.io.InputStreamReader;
28  import java.io.BufferedReader;
29  import java.io.OutputStreamWriter;
30  import java.net.URL;
31  import java.net.HttpURLConnection;
32  import java.net.MalformedURLException;
33  import java.util.Map;
34  import java.util.HashMap;
35  
36  /**
37   * Trigger action that makes an HTTP request when executed.
38   * <P>
39   * To use this trigger, two types of configurations must be set. First, this class
40   * must be configured to be invoked for a given trigger event. Second, the
41   * the relevant settings for the HTTP request(s) to be made must be set as
42   * described below.
43   * <P>
44   * The general format of this classes configs is
45   * <code>chukwa.trigger.[eventName].http.[N].[paramName]</code> where
46   * <code>eventName</code> is the name of the event the request values are bound
47   * to (see TriggerEvent), <code>N</code> is a counter for each request configured (starting at 1)
48   * and <code>paramName</code> is the request parameter being set.
49   * <P>
50   * Using the post demux success trigger event as an example, the first request
51   * to be fired would use the following configurations
52   * <ul>
53   * <li><code>chukwa.trigger.post.demux.success.http.1.url</code> - The HTTP url to
54   * invoke.</li>
55   * <li><code>chukwa.trigger.post.demux.success.http.1.method</code> - The HTTP method
56   * (optional, default=GET).</li>
57   * <li><code>chukwa.trigger.post.demux.success.http.1.headers</code> - A comma-delimited
58   * set of HTTP headers (in <code>[headerName]:[headerValue]</code> form) to
59   * include (optional).</li>
60   * <li><code>chukwa.trigger.post.demux.success.http.1.body</code> - The text HTTP body
61   * to include (optional).</li>
62   * <li><code>chukwa.trigger.post.demux.success.http.1.connect.timeout</code> - The
63   * HTTP connection timeout setting in milliseconds (optional, default=5000ms).</li>
64   * <li><code>chukwa.trigger.post.demux.success.http.1.read.timeout</code> - The
65   * HTTP read timeout setting in milliseconds (optional, default=5000ms).</li>
66   * </ul>
67   * @see TriggerAction
68   * @see TriggerEvent
69   */
70  public class HttpTriggerAction implements TriggerAction {
71    protected Log log = LogFactory.getLog(getClass());
72  
73  
74    /**
75     * Iterates over each URL found, fetched other settings and fires and HTTP
76     * request.
77     *
78     * @param conf
79     * @param fs
80     * @param src
81     * @param triggerEvent
82     * @throws IOException
83     */
84    public void execute(Configuration conf, FileSystem fs,
85                        FileStatus[] src, TriggerEvent triggerEvent) throws IOException {
86  
87      if (log.isDebugEnabled()) {
88        for (FileStatus file : src) {
89            log.debug("Execute file: " + file.getPath());
90        }
91      }
92  
93      int reqNumber = 1;
94      URL url = null;
95      while ((url = getUrl(conf, triggerEvent, reqNumber)) != null) {
96  
97        // get settings for this request
98        String method = getMethod(conf, triggerEvent, reqNumber);
99        Map<String, String> headers = getHeaders(conf, triggerEvent, reqNumber);
100       String body = getBody(conf, triggerEvent, reqNumber);
101       int connectTimeout = getConnectTimeout(conf, triggerEvent, reqNumber);
102       int readTimeout = getReadTimeout(conf, triggerEvent, reqNumber);
103 
104       try {
105         // make the request
106         makeHttpRequest(url, method, headers, body, connectTimeout, readTimeout);
107       }
108       catch(Exception e) {
109         log.error("Error making request to " + url, e);
110       }
111       reqNumber++;
112     }
113   }
114 
115   private void makeHttpRequest(URL url, String method,
116                                Map<String, String> headers, String body,
117                                int connectTimeout, int readTimeout) throws IOException {
118     if (url == null) {
119       return;
120     }
121 
122     // initialize the connection
123     HttpURLConnection conn = (HttpURLConnection)url.openConnection();
124     conn.setRequestMethod(method);
125     conn.setDoInput(true);
126     conn.setConnectTimeout(connectTimeout);
127     conn.setReadTimeout(readTimeout);
128 
129     // set headers
130     boolean contentLengthExists = false;
131     if (headers != null) {
132       for (String name: headers.keySet()) {
133         if (log.isDebugEnabled()) {
134           log.debug("Setting header " + name + ": " + headers.get(name));
135         }
136         if (name.equalsIgnoreCase("content-length")) {
137           contentLengthExists = true;
138         }
139         conn.setRequestProperty(name, headers.get(name));
140       }
141     }
142 
143     // set content-length if not already set
144     if (!"GET".equals(method) && !contentLengthExists) {
145       String contentLength = body != null ? String.valueOf(body.length()) : "0";
146       conn.setRequestProperty("Content-Length", contentLength);
147     }
148 
149     // send body if it exists
150     if (body != null) {
151       conn.setDoOutput(true);
152       OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream());
153       writer.write(body);
154       writer.flush();
155       writer.close();
156     }
157     else {
158       conn.setDoOutput(false);
159     }
160 
161     // read reponse code/message and dump response
162     log.info("Making HTTP " + method + " to: " + url);
163     int responseCode = conn.getResponseCode();
164     log.info("HTTP Response code: " + responseCode);
165 
166 
167     if (responseCode != 200) {
168       log.info("HTTP Response message: " + conn.getResponseMessage());
169     }
170     else {
171       BufferedReader reader = new BufferedReader(
172                                 new InputStreamReader(conn.getInputStream()));
173       String line;
174       StringBuilder sb = new StringBuilder();
175       while ((line = reader.readLine()) != null) {
176         if(sb.length() > 0) {
177           sb.append("\n");
178         }
179         sb.append(line);
180       }
181       log.info("HTTP Response:\n" + sb);
182 
183       reader.close();
184     }
185 
186     conn.disconnect();
187   }
188 
189   protected URL getUrl(Configuration conf,
190                        TriggerEvent triggerEvent,
191                        int reqNumber) throws MalformedURLException {
192     String urlString = conf.get(getConfigKey(triggerEvent, reqNumber, "url"), null);
193     if (urlString == null) {
194       return null;
195     }
196 
197     return new URL(urlString);
198   }
199 
200   protected String getMethod(Configuration conf,
201                              TriggerEvent triggerEvent,
202                              int reqNumber) {
203     return conf.get(getConfigKey(triggerEvent, reqNumber, "method"), "GET");
204   }
205 
206   protected Map<String, String> getHeaders(Configuration conf,
207                                            TriggerEvent triggerEvent,
208                                            int reqNumber) {
209     Map<String, String> headerMap = new HashMap<String,String>();
210 
211     String headers = conf.get(getConfigKey(triggerEvent, reqNumber, "headers"), null);
212 
213     if (headers != null) {
214       String[] headersSplit = headers.split(",");
215       for (String header : headersSplit) {
216         String[] nvp = header.split(":", 2);
217         if (nvp.length < 2) {
218           log.error("Invalid HTTP header found: " + nvp);
219           continue;
220         }
221         headerMap.put(nvp[0].trim(), nvp[1].trim());
222       }
223     }
224 
225     return headerMap;
226   }
227 
228   protected String getBody(Configuration conf,
229                            TriggerEvent triggerEvent,
230                            int reqNumber) {
231     return conf.get(getConfigKey(triggerEvent, reqNumber, "body"), "GET");
232   }
233 
234   protected int getConnectTimeout(Configuration conf,
235                                  TriggerEvent triggerEvent,
236                                  int reqNumber) {
237     String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "connect.timeout"), null);
238     return timeout != null ? Integer.parseInt(timeout) : 5000;
239   }
240 
241   
242   protected int getReadTimeout(Configuration conf,
243                               TriggerEvent triggerEvent,
244                               int reqNumber) {
245     String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "read.timeout"), null);
246     return timeout != null ? Integer.parseInt(timeout) : 5000;
247   }
248 
249   private String getConfigKey(TriggerEvent triggerEvent, int reqNumber, String name) {
250     return triggerEvent.getConfigKeyBase() + ".http." + reqNumber + "." + name;
251   }
252 }