This project has retired. For details please refer to its
Attic page.
ServletCollector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
20
21
22 import java.io.DataInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.PrintStream;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Timer;
29 import java.util.TimerTask;
30
31 import javax.servlet.ServletConfig;
32 import javax.servlet.ServletException;
33 import javax.servlet.ServletOutputStream;
34 import javax.servlet.http.HttpServlet;
35 import javax.servlet.http.HttpServletRequest;
36 import javax.servlet.http.HttpServletResponse;
37
38 import org.apache.hadoop.chukwa.Chunk;
39 import org.apache.hadoop.chukwa.ChunkImpl;
40 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
42 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.util.ReflectionUtils;
46 import org.apache.log4j.Logger;
47
48 @Deprecated
49 public class ServletCollector extends HttpServlet {
50
51 static final boolean FANCY_DIAGNOSTICS = false;
52 public static final String PATH = "chukwa";
53
54
55
56 public static final String ACK_PREFIX = "ok: ";
57 transient ChukwaWriter writer = null;
58
59 private static final long serialVersionUID = 6286162898591407111L;
60 transient Logger log = Logger.getLogger(ServletCollector.class);
61
62 boolean COMPRESS;
63 String CODEC_NAME;
64 transient CompressionCodec codec;
65
66 public void setWriter(ChukwaWriter w) {
67 writer = w;
68 }
69
70 public ChukwaWriter getWriter() {
71 return writer;
72 }
73
74 long statTime = 0L;
75 int numberHTTPConnection = 0;
76 int numberchunks = 0;
77 long lifetimechunks = 0;
78
79 transient Configuration conf;
80
81 public ServletCollector(Configuration c) {
82 conf = c;
83 }
84
85 public void init(ServletConfig servletConf) throws ServletException {
86
87 log.info("initing servletCollector");
88 if (servletConf == null) {
89 log.fatal("no servlet config");
90 return;
91 }
92
93 Timer statTimer = new Timer();
94 statTimer.schedule(new TimerTask() {
95 public void run() {
96 log.info("stats:ServletCollector,numberHTTPConnection:"
97 + numberHTTPConnection + ",numberchunks:" + numberchunks);
98 statTime = System.currentTimeMillis();
99 numberHTTPConnection = 0;
100 numberchunks = 0;
101 }
102 }, (1000), (60 * 1000));
103
104 if (writer != null) {
105 log.info("writer set up statically, no need for Collector.init() to do it");
106 return;
107 }
108
109 try {
110 String writerClassName = conf.get("chukwaCollector.writerClass",
111 SeqFileWriter.class.getCanonicalName());
112 Class<?> writerClass = Class.forName(writerClassName);
113 if (writerClass != null
114 && ChukwaWriter.class.isAssignableFrom(writerClass))
115 writer = (ChukwaWriter) writerClass.newInstance();
116 } catch (Exception e) {
117 log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
118 }
119
120 COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
121 if( COMPRESS) {
122 CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
123 Class<?> codecClass = null;
124 try {
125 codecClass = Class.forName( CODEC_NAME);
126 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
127 log.info("codec " + CODEC_NAME + " loaded for network compression");
128 } catch (ClassNotFoundException e) {
129 log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
130 COMPRESS = false;
131 }
132 }
133
134
135 try {
136 if (writer == null) {
137 writer = new SeqFileWriter();
138 }
139
140 writer.init(conf);
141 } catch (Throwable e) {
142 log.warn("Exception trying to initialize SeqFileWriter",e);
143 }
144 }
145
146 @Override
147 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
148 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
149 }
150
151 protected void accept(HttpServletRequest req, HttpServletResponse resp)
152 throws ServletException {
153 numberHTTPConnection++;
154 final long currentTime = System.currentTimeMillis();
155 try {
156
157 log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
158 java.io.InputStream in = req.getInputStream();
159
160 ServletOutputStream l_out = resp.getOutputStream();
161
162 DataInputStream di = null;
163 boolean compressNetwork = COMPRESS;
164 if( compressNetwork){
165 InputStream cin = codec.createInputStream( in);
166 di = new DataInputStream(cin);
167 }
168 else {
169 di = new DataInputStream(in);
170 }
171
172 final int numEvents = di.readInt();
173
174
175 List<Chunk> events = new LinkedList<Chunk>();
176 StringBuilder sb = new StringBuilder();
177
178 for (int i = 0; i < numEvents; i++) {
179 ChunkImpl logEvent = ChunkImpl.read(di);
180 events.add(logEvent);
181
182 }
183
184 int responseStatus = HttpServletResponse.SC_OK;
185
186
187 if (writer != null) {
188 ChukwaWriter.CommitStatus result = writer.add(events);
189
190
191
192 if(result == ChukwaWriter.COMMIT_OK) {
193
194 numberchunks += events.size();
195 lifetimechunks += events.size();
196
197 for(Chunk receivedChunk: events) {
198 sb.append(ACK_PREFIX);
199 sb.append(receivedChunk.getData().length);
200 sb.append(" bytes ending at offset ");
201 sb.append(receivedChunk.getSeqID() - 1).append("\n");
202 }
203 } else if(result instanceof ChukwaWriter.COMMIT_PENDING) {
204
205
206 numberchunks += events.size();
207 lifetimechunks += events.size();
208
209 for(String s: ((ChukwaWriter.COMMIT_PENDING) result).pendingEntries)
210 sb.append(s);
211 } else if(result == ChukwaWriter.COMMIT_FAIL) {
212 sb.append("Commit failed");
213 responseStatus = HttpServletResponse.SC_SERVICE_UNAVAILABLE;
214 }
215
216 l_out.print(sb.toString());
217 } else {
218 l_out.println("can't write: no writer");
219 }
220
221 resp.setStatus(responseStatus);
222
223 } catch (Throwable e) {
224 log.warn("Exception talking to " + req.getRemoteHost() + " at t="
225 + currentTime, e);
226 throw new ServletException(e);
227 }
228 }
229
230 @Override
231 protected void doPost(HttpServletRequest req, HttpServletResponse resp)
232 throws ServletException, IOException {
233 accept(req, resp);
234 }
235
236 @Override
237 protected void doGet(HttpServletRequest req, HttpServletResponse resp)
238 throws ServletException, IOException {
239
240
241 log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
242 PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
243 resp.setStatus(200);
244
245 String pingAtt = req.getParameter("ping");
246 if (pingAtt != null) {
247 out.println("Date:" + statTime);
248 out.println("Now:" + System.currentTimeMillis());
249 out.println("numberHTTPConnection in time window:"
250 + numberHTTPConnection);
251 out.println("numberchunks in time window:" + numberchunks);
252 out.println("lifetimechunks:" + lifetimechunks);
253 } else {
254 out.println("<html><body><h2>Chukwa servlet running</h2>");
255 out.println("</body></html>");
256 }
257
258 }
259
260 @Override
261 public String getServletInfo() {
262 return "Chukwa Servlet Collector";
263 }
264
265 @Override
266 public void destroy() {
267 try {
268 writer.close();
269 } catch (WriterException e) {
270 log.warn("Exception during close", e);
271 e.printStackTrace();
272 }
273 super.destroy();
274 }
275 }