This project has retired. For details please refer to its
Attic page.
CommitCheckServlet xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19
20 import java.io.IOException;
21 import java.io.PrintStream;
22 import java.net.URI;
23 import javax.servlet.ServletConfig;
24 import javax.servlet.ServletException;
25 import javax.servlet.http.HttpServlet;
26 import javax.servlet.http.HttpServletRequest;
27 import javax.servlet.http.HttpServletResponse;
28 import org.apache.log4j.Logger;
29 import java.util.*;
30 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.*;
35
36 @Deprecated
37 public class CommitCheckServlet extends HttpServlet {
38
39 private static final long serialVersionUID = -4627538252371890849L;
40
41 protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
42 CommitCheckThread commitCheck;
43 Configuration conf;
44
45 public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
46
47
48 public static final String PURGEDELAY_OPT = "chukwaCollector.asyncAcks.purgedelay";
49
50
51 public static final String SCANPATHS_OPT = "chukwaCollector.asyncAcks.scanpaths";
52
53 public static final String DEFAULT_PATH = "acks";
54 public CommitCheckServlet(Configuration conf) {
55 this.conf = conf;
56 }
57
58 public void init(ServletConfig servletConf) throws ServletException {
59 log.info("initing commit check servlet");
60 try {
61 FileSystem fs = FileSystem.get(
62 new URI(conf.get("writer.hdfs.filesystem", "file:///")), conf);
63 log.info("commitcheck fs is " + fs.getUri());
64 commitCheck = new CommitCheckThread(conf, fs);
65 commitCheck.start();
66 } catch(Exception e) {
67 log.error("couldn't start CommitCheckServlet", e);
68 throw new ServletException(e);
69 }
70 }
71
72 @Override
73 protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
74 resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
75 }
76
77 @Override
78 protected void doGet(HttpServletRequest req, HttpServletResponse resp)
79 throws ServletException, IOException {
80
81 PrintStream out = new PrintStream(resp.getOutputStream());
82 resp.setStatus(200);
83
84 out.println("<html><body><h2>Commit status</h2><ul>");
85 for(String s: commitCheck.getLengthList())
86 out.println("<li>" + s + "</li>");
87 out.println("</ul></body></html>");
88 }
89
90
91 @Override
92 public void destroy() {
93 commitCheck.shutdown();
94 }
95
96
97
98
99
100 private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
101 int checkInterval = 1000 * 30;
102 volatile boolean running = true;
103 final Collection<Path> pathsToSearch;
104 final FileSystem fs;
105 final Map<String, Long> lengthTable;
106 final PriorityQueue<PurgeTask> oldEntries;
107 long delayUntilPurge = 1000 * 60 * 60 * 12;
108
109 static class PurgeTask implements Comparable<PurgeTask>{
110 long purgeTime;
111 String toPurge;
112 long len;
113
114 public PurgeTask(String s, long time, long len) {
115 this.toPurge = s;
116 this.purgeTime = time;
117 this.len = len;
118 }
119
120 public int compareTo(PurgeTask p) {
121 if(purgeTime < p.purgeTime)
122 return -1;
123 else if (purgeTime == p.purgeTime)
124 return 0;
125 else
126 return 1;
127 }
128 }
129
130
131 public CommitCheckThread(Configuration conf, FileSystem fs) {
132 this.fs = fs;
133 pathsToSearch = new ArrayList<Path>();
134 lengthTable = new LinkedHashMap<String, Long>();
135 oldEntries = new PriorityQueue<PurgeTask>();
136 checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
137
138 String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
139 pathsToSearch.add(new Path(sinkPath));
140
141 String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
142 String[] paths = additionalSearchPaths.split(",");
143 for(String s: paths)
144 if(s.length() > 1) {
145 Path path = new Path(s);
146 if(!pathsToSearch.contains(path))
147 pathsToSearch.add(path);
148 }
149
150 delayUntilPurge = conf.getLong(PURGEDELAY_OPT, delayUntilPurge);
151 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
152 String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
153 String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
154 pathsToSearch.add(new Path(archivesMRInputDir));
155
156 }
157
158 public void shutdown() {
159 running = false;
160 this.interrupt();
161 }
162
163 public void run() {
164 while(running) {
165 try {
166 Thread.sleep(checkInterval);
167 scanFS();
168 purgeOldEntries();
169 } catch(InterruptedException e) {}
170 catch(IOException e) {
171 log.error("io problem", e);
172 }
173 }
174 }
175
176 private synchronized void purgeOldEntries() {
177 long now = System.currentTimeMillis();
178 PurgeTask p = oldEntries.peek();
179 while(p != null && p.purgeTime < now) {
180 oldEntries.remove();
181 Long curLen = lengthTable.get(p.toPurge);
182 if(curLen != null && p.len >= curLen)
183 lengthTable.remove(p.toPurge);
184 }
185
186 }
187
188 private void scanFS() throws IOException {
189 long nextPurgeTime = System.currentTimeMillis() + delayUntilPurge;
190 for(Path dir: pathsToSearch) {
191 int filesSeen = 0;
192
193 FileStatus[] dataSinkFiles = fs.listStatus(dir, SinkArchiver.DATA_SINK_FILTER);
194 if(dataSinkFiles == null || dataSinkFiles.length == 0)
195 continue;
196
197 synchronized(this) {
198 for(FileStatus fstatus: dataSinkFiles) {
199 filesSeen++;
200 String name = fstatus.getPath().getName();
201 long len = fstatus.getLen();
202 oldEntries.add(new PurgeTask(name, nextPurgeTime, len));
203 lengthTable.put(name, len);
204 }
205 }
206 log.info("scanning fs: " + dir + "; saw "+ filesSeen+ " files");
207 }
208 }
209
210 public synchronized List<String> getLengthList() {
211 ArrayList<String> list = new ArrayList<String>(lengthTable.size());
212 for(Map.Entry<String, Long> e: lengthTable.entrySet()) {
213 list.add(e.getKey() + " " + e.getValue());
214 }
215 return list;
216 }
217
218 }
219
220 }