This project has retired. For details please refer to its Attic page.
ServletCollector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20  
21  
22  import java.io.DataInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.PrintStream;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  
31  import javax.servlet.ServletConfig;
32  import javax.servlet.ServletException;
33  import javax.servlet.ServletOutputStream;
34  import javax.servlet.http.HttpServlet;
35  import javax.servlet.http.HttpServletRequest;
36  import javax.servlet.http.HttpServletResponse;
37  
38  import org.apache.hadoop.chukwa.Chunk;
39  import org.apache.hadoop.chukwa.ChunkImpl;
40  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
42  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.io.compress.CompressionCodec;
45  import org.apache.hadoop.util.ReflectionUtils;
46  import org.apache.log4j.Logger;
47  
48  @Deprecated
49  public class ServletCollector extends HttpServlet {
50  
51    static final boolean FANCY_DIAGNOSTICS = false;
52    public static final String PATH = "chukwa";
53    /**
54     * If a chunk is committed; then the ack will start with the following string.
55     */
56    public static final String ACK_PREFIX = "ok: ";
57    transient ChukwaWriter writer = null;
58  
59    private static final long serialVersionUID = 6286162898591407111L;
60    transient Logger log = Logger.getLogger(ServletCollector.class);
61    
62    boolean COMPRESS;
63    String CODEC_NAME;
64    transient CompressionCodec codec;
65  
66    public void setWriter(ChukwaWriter w) {
67      writer = w;
68    }
69    
70    public ChukwaWriter getWriter() {
71      return writer;
72    }
73  
74    long statTime = 0L;
75    int numberHTTPConnection = 0;
76    int numberchunks = 0;
77    long lifetimechunks = 0;
78  
79    transient Configuration conf;
80  
81    public ServletCollector(Configuration c) {
82      conf = c;
83    }
84  
85    public void init(ServletConfig servletConf) throws ServletException {
86  
87      log.info("initing servletCollector");
88      if (servletConf == null) {
89        log.fatal("no servlet config");
90        return;
91      }
92  
93      Timer statTimer = new Timer();
94      statTimer.schedule(new TimerTask() {
95        public void run() {
96          log.info("stats:ServletCollector,numberHTTPConnection:"
97              + numberHTTPConnection + ",numberchunks:" + numberchunks);
98          statTime = System.currentTimeMillis();
99          numberHTTPConnection = 0;
100         numberchunks = 0;
101       }
102     }, (1000), (60 * 1000));
103 
104     if (writer != null) {
105       log.info("writer set up statically, no need for Collector.init() to do it");
106       return;
107     }
108 
109     try {
110       String writerClassName = conf.get("chukwaCollector.writerClass",
111           SeqFileWriter.class.getCanonicalName());
112       Class<?> writerClass = Class.forName(writerClassName);
113       if (writerClass != null
114           && ChukwaWriter.class.isAssignableFrom(writerClass))
115         writer = (ChukwaWriter) writerClass.newInstance();
116     } catch (Exception e) {
117       log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
118     }
119 
120     COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
121     if( COMPRESS) {
122 	    CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
123 	    Class<?> codecClass = null;
124 	    try {
125 			codecClass = Class.forName( CODEC_NAME);
126 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
127 			log.info("codec " + CODEC_NAME + " loaded for network compression");
128 		} catch (ClassNotFoundException e) {
129 			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
130 			COMPRESS = false;
131 		}
132     }
133     
134     // We default to here if the pipeline construction failed or didn't happen.
135     try {
136       if (writer == null) {
137         writer =  new SeqFileWriter();
138       }
139       
140       writer.init(conf);
141     } catch (Throwable e) {
142       log.warn("Exception trying to initialize SeqFileWriter",e);
143     }
144   }
145 
146   @Override
147   protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
148     resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
149   }
150 
151   protected void accept(HttpServletRequest req, HttpServletResponse resp)
152       throws ServletException {
153     numberHTTPConnection++;
154     final long currentTime = System.currentTimeMillis();
155     try {
156 
157       log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
158       java.io.InputStream in = req.getInputStream();
159 
160       ServletOutputStream l_out = resp.getOutputStream();
161       
162       DataInputStream di = null;
163       boolean compressNetwork = COMPRESS;
164       if( compressNetwork){
165           InputStream cin = codec.createInputStream( in);
166           di = new DataInputStream(cin);
167       }
168       else {
169     	  di = new DataInputStream(in);
170       }
171 
172       final int numEvents = di.readInt();
173       // log.info("saw " + numEvents+ " in request");
174 
175       List<Chunk> events = new LinkedList<Chunk>();
176       StringBuilder sb = new StringBuilder();
177 
178       for (int i = 0; i < numEvents; i++) {
179         ChunkImpl logEvent = ChunkImpl.read(di);
180         events.add(logEvent);
181 
182       }
183 
184       int responseStatus = HttpServletResponse.SC_OK;
185 
186       // write new data to data sync file
187       if (writer != null) {
188         ChukwaWriter.CommitStatus result = writer.add(events);
189 
190         // this is where we ACK this connection
191 
192         if(result == ChukwaWriter.COMMIT_OK) {
193           // only count the chunks if result is commit or commit pending
194           numberchunks += events.size();
195           lifetimechunks += events.size();
196 
197           for(Chunk receivedChunk: events) {
198             sb.append(ACK_PREFIX);
199             sb.append(receivedChunk.getData().length);
200             sb.append(" bytes ending at offset ");
201             sb.append(receivedChunk.getSeqID() - 1).append("\n");
202           }
203         } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
204 
205           // only count the chunks if result is commit or commit pending
206           numberchunks += events.size();
207           lifetimechunks += events.size();
208 
209           for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
210             sb.append(s);
211         } else if(result == ChukwaWriter.COMMIT_FAIL) {
212           sb.append("Commit failed");
213           responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
214         }
215 
216         l_out.print(sb.toString());
217       } else {
218         l_out.println("can't write: no writer");
219       }
220 
221       resp.setStatus(responseStatus);
222 
223     } catch (Throwable e) {
224       log.warn("Exception talking to " + req.getRemoteHost() + " at t="
225           + currentTime, e);
226       throw new ServletException(e);
227     }
228   }
229 
230   @Override
231   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
232       throws ServletException, IOException {
233     accept(req, resp);
234   }
235 
236   @Override
237   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
238       throws ServletException, IOException {
239 
240 
241     log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
242     PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
243     resp.setStatus(200);
244 
245     String pingAtt = req.getParameter("ping");
246     if (pingAtt != null) {
247       out.println("Date:" + statTime);
248       out.println("Now:" + System.currentTimeMillis());
249       out.println("numberHTTPConnection in time window:"
250           + numberHTTPConnection);
251       out.println("numberchunks in time window:" + numberchunks);
252       out.println("lifetimechunks:" + lifetimechunks);
253     } else {
254       out.println("<html><body><h2>Chukwa servlet running</h2>");
255       out.println("</body></html>");
256     }
257 
258   }
259 
260   @Override
261   public String getServletInfo() {
262     return "Chukwa Servlet Collector";
263   }
264 
265   @Override
266   public void destroy() {
267     try {
268       writer.close();
269     } catch (WriterException e) {
270       log.warn("Exception during close", e);
271       e.printStackTrace();
272     }
273     super.destroy();
274   }
275 }