This project has retired. For details please refer to its Attic page.
CollectorStub xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector;
20  
21  
22  import org.mortbay.jetty.*;
23  import org.mortbay.jetty.nio.SelectChannelConnector;
24  import org.mortbay.jetty.servlet.*;
25  import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
26  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
27  import org.apache.hadoop.chukwa.datacollection.writer.*;
28  import org.apache.hadoop.chukwa.util.DaemonWatcher;
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.Path;
32  import edu.berkeley.confspell.Checker;
33  import edu.berkeley.confspell.HSlurper;
34  import edu.berkeley.confspell.OptDictionary;
35  import javax.servlet.http.HttpServlet;
36  import java.io.File;
37  import java.util.*;
38  
39  @Deprecated
40  public class CollectorStub {
41  
42    static int THREADS = 120;
43    public static Server jettyServer = null;
44  
45    public static void main(String[] args) {
46  
47      DaemonWatcher.createInstance("Collector");
48      try {
49        if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
50          System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
51          System.out.println("A number of options can be specified here for debugging or special uses.  e.g.: ");
52          System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
53                      + "\n\tservlet=<classname>@path");
54          System.out.println("Command line options will override normal configuration.");
55          System.exit(0);
56        }
57  
58        ChukwaConfiguration conf = new ChukwaConfiguration();
59        
60        try {
61          Configuration collectorConf = new Configuration(false);
62          collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
63          collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
64          Checker.checkConf(new OptDictionary(new File(new File(conf.getChukwaHome(), "share/chukwa/lib"), "collector.dict")),
65              HSlurper.fromHConf(collectorConf));
66        } catch(Exception e) {e.printStackTrace();}
67        
68        int portNum = conf.getInt("chukwaCollector.http.port", 9999);
69        THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
70  
71        // pick a writer.
72        ChukwaWriter w = null;
73        Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
74        ServletCollector servletCollector = new ServletCollector(conf);
75        for(String arg: args) {
76          if(arg.startsWith("writer=")) {       //custom writer class
77            String writerCmd = arg.substring("writer=".length());
78            if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
79              boolean verbose = !writerCmd.equals("pretend-quietly");
80              w = new ConsoleWriter(verbose);
81              w.init(conf);
82              servletCollector.setWriter(w);
83            } else 
84              conf.set("chukwaCollector.writerClass", writerCmd);
85          } else if(arg.startsWith("servlet=")) {     //adding custom servlet
86             String servletCmd = arg.substring("servlet=".length()); 
87             String[] halves = servletCmd.split("@");
88             try {
89               Class<?> servletClass = Class.forName(halves[0]);
90               HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
91               if(!halves[1].startsWith("/"))
92                 halves[1] = "/" + halves[1];
93               servletsToAdd.put(halves[1], srvlet);
94             } catch(Exception e) {
95               e.printStackTrace();
96             }
97          } else if(arg.startsWith("portno=")) {
98            portNum = Integer.parseInt(arg.substring("portno=".length()));
99          } else { //unknown arg
100           System.out.println("WARNING: unknown command line arg " + arg);
101           System.out.println("Invoke collector with command line arg 'help' for usage");
102         }
103       }
104 
105       // Set up jetty connector
106       SelectChannelConnector jettyConnector = new SelectChannelConnector();
107       jettyConnector.setLowResourcesConnections(THREADS - 10);
108       jettyConnector.setLowResourceMaxIdleTime(1500);
109       jettyConnector.setPort(portNum);
110       
111       // Set up jetty server proper, using connector
112       jettyServer = new Server(portNum);
113       jettyServer.setConnectors(new Connector[] { jettyConnector });
114       org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
115       pool.setMaxThreads(THREADS);
116       jettyServer.setThreadPool(pool);
117       
118       // Add the collector servlet to server
119       Context root = new Context(jettyServer, "/", Context.SESSIONS);
120       root.addServlet(new ServletHolder(servletCollector), "/*");
121       
122       if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
123         root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
124 
125       if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
126         root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
127 
128       
129       root.setAllowNullPathInfo(false);
130 
131       // Add in any user-specified servlets
132       for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
133         root.addServlet(new ServletHolder(e.getValue()), e.getKey());
134       }
135       
136       // And finally, fire up the server
137       jettyServer.start();
138       jettyServer.setStopAtShutdown(true);
139 
140       System.out.println("started Chukwa http collector on port " + portNum);
141       System.out.close();
142       System.err.close();
143     } catch (Exception e) {
144       e.printStackTrace();
145       DaemonWatcher.bailout(-1);
146     }
147 
148   }
149 
150 }