This project has retired. For details please refer to its
Attic page.
LogDisplayServlet xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19
20 import javax.servlet.ServletConfig;
21 import javax.servlet.ServletException;
22 import javax.servlet.http.HttpServlet;
23 import javax.servlet.http.HttpServletRequest;
24 import javax.servlet.http.HttpServletResponse;
25 import org.apache.log4j.Logger;
26 import java.io.*;
27 import java.security.MessageDigest;
28 import java.security.NoSuchAlgorithmException;
29 import java.util.*;
30 import org.apache.hadoop.chukwa.*;
31 import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
32 import org.apache.hadoop.conf.Configuration;
33
34 @Deprecated
35 public class LogDisplayServlet extends HttpServlet {
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public static final String DEFAULT_PATH = "logs";
60 public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled";
61 public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
62 long BUF_SIZE = 1024* 1024;
63
64 Configuration conf;
65 Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
66 Queue<String> receivedSIDs = new LinkedList<String>();
67 long totalStoredSize = 0;
68
69 private static final long serialVersionUID = -4602082382919009285L;
70 protected static Logger log = Logger.getLogger(LogDisplayServlet.class);
71
72 public LogDisplayServlet() {
73 conf = new Configuration();
74 ExtractorWriter.recipient = this;
75 }
76
77 public LogDisplayServlet(Configuration c) {
78 conf = c;
79 ExtractorWriter.recipient = this;
80 }
81
82 public void init(ServletConfig servletConf) throws ServletException {
83 BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE);
84 }
85
86 @Override
87 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
88 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
89 }
90
91 private String getSID(Chunk c) {
92 try {
93 MessageDigest md;
94 md = MessageDigest.getInstance("MD5");
95
96 md.update(c.getSource().getBytes());
97 md.update(c.getStreamName().getBytes());
98 md.update(c.getTags().getBytes());
99 StringBuilder sb = new StringBuilder();
100 byte[] bytes = md.digest();
101 for(int i=0; i < bytes.length; ++i) {
102 if( (bytes[i] & 0xF0) == 0)
103 sb.append('0');
104 sb.append( Integer.toHexString(0xFF & bytes[i]) );
105 }
106 return sb.toString();
107 } catch(NoSuchAlgorithmException n) {
108 log.fatal(n);
109 System.exit(0);
110 return null;
111 }
112 }
113
114
115 private void pruneOldEntries() {
116 while(totalStoredSize > BUF_SIZE) {
117 String queueToPrune = receivedSIDs.remove();
118 Deque<Chunk> stream = chunksBySID.get(queueToPrune);
119 assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune;
120 Chunk c = stream.poll();
121 if(c != null)
122 totalStoredSize -= c.getData().length;
123 if(stream.isEmpty()) {
124 chunksBySID.remove(queueToPrune);
125 }
126 }
127 }
128
129 public synchronized void add(List<Chunk> chunks) {
130 for(Chunk c : chunks) {
131 String sid = getSID(c);
132 Deque<Chunk> stream = chunksBySID.get(sid);
133 if(stream == null) {
134 stream = new LinkedList<Chunk>();
135 chunksBySID.put(sid, stream);
136 }
137 stream.add(c);
138 receivedSIDs.add(sid);
139 totalStoredSize += c.getData().length;
140 }
141 pruneOldEntries();
142 }
143
144
145 @Override
146 protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
147 throws ServletException, IOException {
148
149 PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
150 resp.setStatus(200);
151 String path = req.getServletPath();
152 String streamID = req.getParameter("sid");
153 if (streamID != null) {
154 try {
155 Deque<Chunk> chunks = chunksBySID.get(streamID);
156 if(chunks != null) {
157 String streamName = getFriendlyName(chunks.peek());
158 out.println("<html><title>Chukwa:Received Data</title><body><h2>Data from "+ streamName + "</h2>");
159 out.println("<pre>");
160 for(Chunk c: chunks) {
161 out.write(c.getData());
162 }
163 out.println("</pre><hr><a href=\""+path+"\">Back to list of streams</a>");
164 } else
165 out.println("No data");
166 } catch(Exception e) {
167 out.println("<html><body>No data</body></html>");
168 }
169 out.println("</body></html>");
170 } else {
171 out.println("<html><title>Chukwa:Received Data</title><body><h2>Recently-seen streams</h2><ul>");
172 for(Map.Entry<String, Deque<Chunk>> sid: chunksBySID.entrySet())
173 out.println("<li> <a href=\"" + path + "?sid="+sid.getKey() + "\">"+ getFriendlyName(sid.getValue().peek()) + "</a></li>");
174 out.println("</ul></body></html>");
175 }
176 out.flush();
177 }
178
179 private String getFriendlyName(Chunk chunk) {
180 if(chunk != null)
181 return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName();
182 else return "null";
183 }
184
185
186 }