This project has retired. For details please refer to its
Attic page.
ServletCollector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20
21
22 import java.io.DataInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.PrintStream;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Timer;
29 import java.util.TimerTask;
30
31 import javax.servlet.ServletConfig;
32 import javax.servlet.ServletException;
33 import javax.servlet.ServletOutputStream;
34 import javax.servlet.http.HttpServlet;
35 import javax.servlet.http.HttpServletRequest;
36 import javax.servlet.http.HttpServletResponse;
37
38 import org.apache.hadoop.chukwa.Chunk;
39 import org.apache.hadoop.chukwa.ChunkImpl;
40 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
42 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
43 import org.apache.hadoop.chukwa.util.DaemonWatcher;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.io.compress.CompressionCodec;
46 import org.apache.hadoop.util.ReflectionUtils;
47 import org.apache.log4j.Logger;
48
49 @Deprecated
50 public class ServletCollector extends HttpServlet {
51
52 static final boolean FANCY_DIAGNOSTICS = false;
53 public static final String PATH = "chukwa";
54
55
56
57 public static final String ACK_PREFIX = "ok: ";
58 org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
59
60 private static final long serialVersionUID = 6286162898591407111L;
61 Logger log = Logger.getRootLogger();
62
63 static boolean COMPRESS;
64 static String CODEC_NAME;
65 static CompressionCodec codec;
66
67 public void setWriter(ChukwaWriter w) {
68 writer = w;
69 }
70
71 public ChukwaWriter getWriter() {
72 return writer;
73 }
74
75 long statTime = 0L;
76 int numberHTTPConnection = 0;
77 int numberchunks = 0;
78 long lifetimechunks = 0;
79
80 Configuration conf;
81
82 public ServletCollector(Configuration c) {
83 conf = c;
84 }
85
86 public void init(ServletConfig servletConf) throws ServletException {
87
88 log.info("initing servletCollector");
89 if (servletConf == null) {
90 log.fatal("no servlet config");
91 return;
92 }
93
94 Timer statTimer = new Timer();
95 statTimer.schedule(new TimerTask() {
96 public void run() {
97 log.info("stats:ServletCollector,numberHTTPConnection:"
98 + numberHTTPConnection + ",numberchunks:" + numberchunks);
99 statTime = System.currentTimeMillis();
100 numberHTTPConnection = 0;
101 numberchunks = 0;
102 }
103 }, (1000), (60 * 1000));
104
105 if (writer != null) {
106 log.info("writer set up statically, no need for Collector.init() to do it");
107 return;
108 }
109
110 try {
111 String writerClassName = conf.get("chukwaCollector.writerClass",
112 SeqFileWriter.class.getCanonicalName());
113 Class<?> writerClass = Class.forName(writerClassName);
114 if (writerClass != null
115 && ChukwaWriter.class.isAssignableFrom(writerClass))
116 writer = (ChukwaWriter) writerClass.newInstance();
117 } catch (Exception e) {
118 log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
119 }
120
121 COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
122 if( COMPRESS) {
123 CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
124 Class<?> codecClass = null;
125 try {
126 codecClass = Class.forName( CODEC_NAME);
127 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
128 log.info("codec " + CODEC_NAME + " loaded for network compression");
129 } catch (ClassNotFoundException e) {
130 log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
131 COMPRESS = false;
132 }
133 }
134
135
136 try {
137 if (writer == null) {
138 writer = new SeqFileWriter();
139 }
140
141 writer.init(conf);
142 } catch (Throwable e) {
143 log.warn("Exception trying to initialize SeqFileWriter",e);
144 DaemonWatcher.bailout(-1);
145 }
146 }
147
148 @Override
149 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
150 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
151 }
152
153 protected void accept(HttpServletRequest req, HttpServletResponse resp)
154 throws ServletException {
155 numberHTTPConnection++;
156 ServletDiagnostics diagnosticPage = new ServletDiagnostics();
157 final long currentTime = System.currentTimeMillis();
158 try {
159
160 log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
161 java.io.InputStream in = req.getInputStream();
162
163 ServletOutputStream l_out = resp.getOutputStream();
164
165 DataInputStream di = null;
166 boolean compressNetwork = COMPRESS;
167 if( compressNetwork){
168 InputStream cin = codec.createInputStream( in);
169 di = new DataInputStream(cin);
170 }
171 else {
172 di = new DataInputStream(in);
173 }
174
175 final int numEvents = di.readInt();
176
177
178 if (FANCY_DIAGNOSTICS) {
179 diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
180 }
181
182 List<Chunk> events = new LinkedList<Chunk>();
183 StringBuilder sb = new StringBuilder();
184
185 for (int i = 0; i < numEvents; i++) {
186 ChunkImpl logEvent = ChunkImpl.read(di);
187 events.add(logEvent);
188
189 if (FANCY_DIAGNOSTICS) {
190 diagnosticPage.sawChunk(logEvent, i);
191 }
192 }
193
194 int responseStatus = HttpServletResponse.SC_OK;
195
196
197 if (writer != null) {
198 ChukwaWriter.CommitStatus result = writer.add(events);
199
200
201
202 if(result == ChukwaWriter.COMMIT_OK) {
203
204 numberchunks += events.size();
205 lifetimechunks += events.size();
206
207 for(Chunk receivedChunk: events) {
208 sb.append(ACK_PREFIX);
209 sb.append(receivedChunk.getData().length);
210 sb.append(" bytes ending at offset ");
211 sb.append(receivedChunk.getSeqID() - 1).append("\n");
212 }
213 } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
214
215
216 numberchunks += events.size();
217 lifetimechunks += events.size();
218
219 for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
220 sb.append(s);
221 } else if(result == ChukwaWriter.COMMIT_FAIL) {
222 sb.append("Commit failed");
223 responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
224 }
225
226 l_out.print(sb.toString());
227 } else {
228 l_out.println("can't write: no writer");
229 }
230
231 if (FANCY_DIAGNOSTICS) {
232 diagnosticPage.doneWithPost();
233 }
234
235 resp.setStatus(responseStatus);
236
237 } catch (Throwable e) {
238 log.warn("Exception talking to " + req.getRemoteHost() + " at t="
239 + currentTime, e);
240 throw new ServletException(e);
241 }
242 }
243
244 @Override
245 protected void doPost(HttpServletRequest req, HttpServletResponse resp)
246 throws ServletException, IOException {
247 accept(req, resp);
248 }
249
250 @Override
251 protected void doGet(HttpServletRequest req, HttpServletResponse resp)
252 throws ServletException, IOException {
253
254
255 log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
256 PrintStream out = new PrintStream(resp.getOutputStream());
257 resp.setStatus(200);
258
259 String pingAtt = req.getParameter("ping");
260 if (pingAtt != null) {
261 out.println("Date:" + statTime);
262 out.println("Now:" + System.currentTimeMillis());
263 out.println("numberHTTPConnection in time window:"
264 + numberHTTPConnection);
265 out.println("numberchunks in time window:" + numberchunks);
266 out.println("lifetimechunks:" + lifetimechunks);
267 } else {
268 out.println("<html><body><h2>Chukwa servlet running</h2>");
269 if (FANCY_DIAGNOSTICS)
270 ServletDiagnostics.printPage(out);
271 out.println("</body></html>");
272 }
273
274 }
275
276 @Override
277 public String getServletInfo() {
278 return "Chukwa Servlet Collector";
279 }
280
281 @Override
282 public void destroy() {
283 try {
284 writer.close();
285 } catch (WriterException e) {
286 log.warn("Exception during close", e);
287 e.printStackTrace();
288 }
289 super.destroy();
290 }
291 }