This project has retired. For details please refer to its Attic page.
ServletCollector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20  
21  
22  import java.io.DataInputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.PrintStream;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  
31  import javax.servlet.ServletConfig;
32  import javax.servlet.ServletException;
33  import javax.servlet.ServletOutputStream;
34  import javax.servlet.http.HttpServlet;
35  import javax.servlet.http.HttpServletRequest;
36  import javax.servlet.http.HttpServletResponse;
37  
38  import org.apache.hadoop.chukwa.Chunk;
39  import org.apache.hadoop.chukwa.ChunkImpl;
40  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
42  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
43  import org.apache.hadoop.chukwa.util.DaemonWatcher;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.io.compress.CompressionCodec;
46  import org.apache.hadoop.util.ReflectionUtils;
47  import org.apache.log4j.Logger;
48  
49  @Deprecated
50  public class ServletCollector extends HttpServlet {
51  
52    static final boolean FANCY_DIAGNOSTICS = false;
53    public static final String PATH = "chukwa";
54    /**
55     * If a chunk is committed; then the ack will start with the following string.
56     */
57    public static final String ACK_PREFIX = "ok: ";
58    org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
59  
60    private static final long serialVersionUID = 6286162898591407111L;
61    Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
62    
63    static boolean COMPRESS;
64    static String CODEC_NAME;
65    static CompressionCodec codec;
66  
67    public void setWriter(ChukwaWriter w) {
68      writer = w;
69    }
70    
71    public ChukwaWriter getWriter() {
72      return writer;
73    }
74  
75    long statTime = 0L;
76    int numberHTTPConnection = 0;
77    int numberchunks = 0;
78    long lifetimechunks = 0;
79  
80    Configuration conf;
81  
82    public ServletCollector(Configuration c) {
83      conf = c;
84    }
85  
86    public void init(ServletConfig servletConf) throws ServletException {
87  
88      log.info("initing servletCollector");
89      if (servletConf == null) {
90        log.fatal("no servlet config");
91        return;
92      }
93  
94      Timer statTimer = new Timer();
95      statTimer.schedule(new TimerTask() {
96        public void run() {
97          log.info("stats:ServletCollector,numberHTTPConnection:"
98              + numberHTTPConnection + ",numberchunks:" + numberchunks);
99          statTime = System.currentTimeMillis();
100         numberHTTPConnection = 0;
101         numberchunks = 0;
102       }
103     }, (1000), (60 * 1000));
104 
105     if (writer != null) {
106       log.info("writer set up statically, no need for Collector.init() to do it");
107       return;
108     }
109 
110     try {
111       String writerClassName = conf.get("chukwaCollector.writerClass",
112           SeqFileWriter.class.getCanonicalName());
113       Class<?> writerClass = Class.forName(writerClassName);
114       if (writerClass != null
115           && ChukwaWriter.class.isAssignableFrom(writerClass))
116         writer = (ChukwaWriter) writerClass.newInstance();
117     } catch (Exception e) {
118       log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
119     }
120 
121     COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
122     if( COMPRESS) {
123 	    CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
124 	    Class<?> codecClass = null;
125 	    try {
126 			codecClass = Class.forName( CODEC_NAME);
127 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
128 			log.info("codec " + CODEC_NAME + " loaded for network compression");
129 		} catch (ClassNotFoundException e) {
130 			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
131 			COMPRESS = false;
132 		}
133     }
134     
135     // We default to here if the pipeline construction failed or didn't happen.
136     try {
137       if (writer == null) {
138         writer =  new SeqFileWriter();
139       }
140       
141       writer.init(conf);
142     } catch (Throwable e) {
143       log.warn("Exception trying to initialize SeqFileWriter",e);
144       DaemonWatcher.bailout(-1);
145     }
146   }
147 
148   @Override
149   protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
150     resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
151   }
152 
153   protected void accept(HttpServletRequest req, HttpServletResponse resp)
154       throws ServletException {
155     numberHTTPConnection++;
156     ServletDiagnostics diagnosticPage = new ServletDiagnostics();
157     final long currentTime = System.currentTimeMillis();
158     try {
159 
160       log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
161       java.io.InputStream in = req.getInputStream();
162 
163       ServletOutputStream l_out = resp.getOutputStream();
164       
165       DataInputStream di = null;
166       boolean compressNetwork = COMPRESS;
167       if( compressNetwork){
168           InputStream cin = codec.createInputStream( in);
169           di = new DataInputStream(cin);
170       }
171       else {
172     	  di = new DataInputStream(in);
173       }
174 
175       final int numEvents = di.readInt();
176       // log.info("saw " + numEvents+ " in request");
177 
178       if (FANCY_DIAGNOSTICS) {
179         diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
180       }
181 
182       List<Chunk> events = new LinkedList<Chunk>();
183       StringBuilder sb = new StringBuilder();
184 
185       for (int i = 0; i < numEvents; i++) {
186         ChunkImpl logEvent = ChunkImpl.read(di);
187         events.add(logEvent);
188 
189         if (FANCY_DIAGNOSTICS) {
190           diagnosticPage.sawChunk(logEvent, i);
191         }
192       }
193 
194       int responseStatus = HttpServletResponse.SC_OK;
195 
196       // write new data to data sync file
197       if (writer != null) {
198         ChukwaWriter.CommitStatus result = writer.add(events);
199 
200         // this is where we ACK this connection
201 
202         if(result == ChukwaWriter.COMMIT_OK) {
203           // only count the chunks if result is commit or commit pending
204           numberchunks += events.size();
205           lifetimechunks += events.size();
206 
207           for(Chunk receivedChunk: events) {
208             sb.append(ACK_PREFIX);
209             sb.append(receivedChunk.getData().length);
210             sb.append(" bytes ending at offset ");
211             sb.append(receivedChunk.getSeqID() - 1).append("\n");
212           }
213         } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
214 
215           // only count the chunks if result is commit or commit pending
216           numberchunks += events.size();
217           lifetimechunks += events.size();
218 
219           for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
220             sb.append(s);
221         } else if(result == ChukwaWriter.COMMIT_FAIL) {
222           sb.append("Commit failed");
223           responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
224         }
225 
226         l_out.print(sb.toString());
227       } else {
228         l_out.println("can't write: no writer");
229       }
230 
231       if (FANCY_DIAGNOSTICS) {
232         diagnosticPage.doneWithPost();
233       }
234 
235       resp.setStatus(responseStatus);
236 
237     } catch (Throwable e) {
238       log.warn("Exception talking to " + req.getRemoteHost() + " at t="
239           + currentTime, e);
240       throw new ServletException(e);
241     }
242   }
243 
244   @Override
245   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
246       throws ServletException, IOException {
247     accept(req, resp);
248   }
249 
250   @Override
251   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
252       throws ServletException, IOException {
253 
254 
255     log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
256     PrintStream out = new PrintStream(resp.getOutputStream());
257     resp.setStatus(200);
258 
259     String pingAtt = req.getParameter("ping");
260     if (pingAtt != null) {
261       out.println("Date:" + statTime);
262       out.println("Now:" + System.currentTimeMillis());
263       out.println("numberHTTPConnection in time window:"
264           + numberHTTPConnection);
265       out.println("numberchunks in time window:" + numberchunks);
266       out.println("lifetimechunks:" + lifetimechunks);
267     } else {
268       out.println("<html><body><h2>Chukwa servlet running</h2>");
269       if (FANCY_DIAGNOSTICS)
270         ServletDiagnostics.printPage(out);
271       out.println("</body></html>");
272     }
273 
274   }
275 
276   @Override
277   public String getServletInfo() {
278     return "Chukwa Servlet Collector";
279   }
280 
281   @Override
282   public void destroy() {
283     try {
284       writer.close();
285     } catch (WriterException e) {
286       log.warn("Exception during close", e);
287       e.printStackTrace();
288     }
289     super.destroy();
290   }
291 }