This project has retired. For details please refer to its Attic page.
CommitCheckServlet xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19  
20  import java.io.IOException;
21  import java.io.PrintStream;
22  import java.net.URI;
23  import javax.servlet.ServletConfig;
24  import javax.servlet.ServletException;
25  import javax.servlet.http.HttpServlet;
26  import javax.servlet.http.HttpServletRequest;
27  import javax.servlet.http.HttpServletResponse;
28  import org.apache.log4j.Logger;
29  import java.util.*;
30  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32  import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.*;
35  
36  @Deprecated
37  public class CommitCheckServlet extends HttpServlet {
38  
39    private static final long serialVersionUID = -4627538252371890849L;
40    
41    protected static Logger log = Logger.getLogger(CommitCheckServlet.class);
42    CommitCheckThread commitCheck;
43    Configuration conf;
44      //interval at which to scan the filesystem, ms
45    public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
46    
47      //interval at which to discard seen files, ms
48    public static final String PURGEDELAY_OPT = "chukwaCollector.asyncAcks.purgedelay"; 
49      
50    //list of dirs to search, separated by commas
51    public static final String SCANPATHS_OPT = "chukwaCollector.asyncAcks.scanpaths";
52      
53    public static final String DEFAULT_PATH = "acks"; //path to this servlet on collector
54    public CommitCheckServlet(Configuration conf) {
55      this.conf = conf;
56    }
57    
58    public void init(ServletConfig servletConf) throws ServletException {
59      log.info("initing commit check servlet");
60      try {
61        FileSystem fs = FileSystem.get(
62            new URI(conf.get("writer.hdfs.filesystem", "file:///")), conf);
63        log.info("commitcheck fs is " + fs.getUri());
64        commitCheck = new CommitCheckThread(conf, fs);
65        commitCheck.start();
66      } catch(Exception e) {
67        log.error("couldn't start CommitCheckServlet", e);
68        throw new ServletException(e);
69      }
70    }
71  
72    @Override
73    protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
74      resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); 
75    }
76    
77    @Override
78    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
79        throws ServletException, IOException  {
80    
81      PrintStream out = new PrintStream(resp.getOutputStream());
82      resp.setStatus(200);
83  
84      out.println("<html><body><h2>Commit status</h2><ul>");
85      for(String s: commitCheck.getLengthList()) 
86        out.println("<li>" + s + "</li>");
87      out.println("</ul></body></html>");
88    }
89    
90  
91    @Override
92    public void destroy() {
93      commitCheck.shutdown();
94    }
95    
96    /**
97     * Ideally, we'd use zookeeper to monitor archiver/demux rotation.
98     * For now, instead, we'll just do an ls in a bunch of places.
99     */
100   private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
101     int checkInterval = 1000 * 30;
102     volatile boolean running = true;
103     final Collection<Path> pathsToSearch;
104     final FileSystem fs;
105     final Map<String, Long> lengthTable;
106     final PriorityQueue<PurgeTask> oldEntries;
107     long delayUntilPurge = 1000 * 60 * 60 * 12;
108     
109     static class PurgeTask implements Comparable<PurgeTask>{
110       long purgeTime;
111       String toPurge;
112       long len;
113       
114       public PurgeTask(String s, long time, long len) {
115         this.toPurge = s;
116         this.purgeTime = time;
117         this.len = len;
118       }
119       
120       public int compareTo(PurgeTask p) {
121         if(purgeTime < p.purgeTime)
122           return -1;
123         else if (purgeTime == p.purgeTime)
124           return 0;
125         else
126           return 1;
127       }
128     }
129     
130     
131     public CommitCheckThread(Configuration conf, FileSystem fs) {
132       this.fs = fs;
133       pathsToSearch = new ArrayList<Path>();
134       lengthTable = new LinkedHashMap<String, Long>();
135       oldEntries = new PriorityQueue<PurgeTask>();
136       checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
137       
138       String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
139       pathsToSearch.add(new Path(sinkPath));
140       
141       String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
142       String[] paths = additionalSearchPaths.split(",");
143       for(String s: paths)
144         if(s.length() > 1) {
145           Path path = new Path(s);
146           if(!pathsToSearch.contains(path))
147             pathsToSearch.add(path);
148         }
149       
150       delayUntilPurge = conf.getLong(PURGEDELAY_OPT, delayUntilPurge);
151       String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
152       String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
153       String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
154       pathsToSearch.add(new Path(archivesMRInputDir));
155       //TODO: set checkInterval using conf
156     }
157     
158     public void shutdown() {
159       running = false;
160       this.interrupt();
161     }
162     
163     public void run() {
164       while(running) {
165         try {
166           Thread.sleep(checkInterval);
167           scanFS();
168           purgeOldEntries();
169         } catch(InterruptedException e) {}
170           catch(IOException e) {
171            log.error("io problem", e);
172         }
173       }
174    }
175 
176     private synchronized void purgeOldEntries() {
177       long now = System.currentTimeMillis();
178       PurgeTask p = oldEntries.peek();
179       while(p != null && p.purgeTime < now) {
180         oldEntries.remove();
181         Long curLen = lengthTable.get(p.toPurge);
182         if(curLen != null && p.len >= curLen)
183           lengthTable.remove(p.toPurge);
184       }
185       
186     }
187 
188     private void scanFS() throws IOException {
189       long nextPurgeTime = System.currentTimeMillis() + delayUntilPurge;
190       for(Path dir: pathsToSearch) {
191         int filesSeen = 0;
192         
193         FileStatus[] dataSinkFiles = fs.listStatus(dir, SinkArchiver.DATA_SINK_FILTER);
194         if(dataSinkFiles == null || dataSinkFiles.length == 0)
195           continue;
196         
197         synchronized(this) {
198           for(FileStatus fstatus: dataSinkFiles) {
199             filesSeen++;
200             String name = fstatus.getPath().getName();
201             long len = fstatus.getLen();
202             oldEntries.add(new PurgeTask(name, nextPurgeTime, len));
203             lengthTable.put(name, len);
204           }
205         }
206         log.info("scanning fs: " + dir + "; saw "+ filesSeen+ " files");
207       }
208     }
209 
210     public synchronized List<String> getLengthList() {
211       ArrayList<String> list = new ArrayList<String>(lengthTable.size());
212       for(Map.Entry<String, Long> e: lengthTable.entrySet()) {
213         list.add(e.getKey() + " " + e.getValue());
214       }
215       return list;
216     }
217     
218   }
219 
220 }