This project has retired. For details please refer to its
Attic page.
CollectorStub xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector;
20
21
22 import org.mortbay.jetty.*;
23 import org.mortbay.jetty.nio.SelectChannelConnector;
24 import org.mortbay.jetty.servlet.*;
25 import org.apache.hadoop.chukwa.datacollection.collector.servlet.*;
26 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
27 import org.apache.hadoop.chukwa.datacollection.writer.*;
28 import org.apache.hadoop.chukwa.util.DaemonWatcher;
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import edu.berkeley.confspell.Checker;
33 import edu.berkeley.confspell.HSlurper;
34 import edu.berkeley.confspell.OptDictionary;
35 import javax.servlet.http.HttpServlet;
36 import java.io.File;
37 import java.util.*;
38
39 @Deprecated
40 public class CollectorStub {
41
42 static int THREADS = 120;
43 public static Server jettyServer = null;
44
45 public static void main(String[] args) {
46
47 DaemonWatcher.createInstance("Collector");
48 try {
49 if (args.length > 0 && (args[0].equalsIgnoreCase("help")|| args[0].equalsIgnoreCase("-help"))) {
50 System.out.println("usage: Normally you should just invoke CollectorStub without arguments.");
51 System.out.println("A number of options can be specified here for debugging or special uses. e.g.: ");
52 System.out.println("Options include:\n\tportno=<#> \n\t" + "writer=pretend | <classname>"
53 + "\n\tservlet=<classname>@path");
54 System.out.println("Command line options will override normal configuration.");
55 System.exit(0);
56 }
57
58 ChukwaConfiguration conf = new ChukwaConfiguration();
59
60 try {
61 Configuration collectorConf = new Configuration(false);
62 collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-common.xml"));
63 collectorConf.addResource(new Path(conf.getChukwaConf() + "/chukwa-collector-conf.xml"));
64 Checker.checkConf(new OptDictionary(new File(new File(conf.getChukwaHome(), "share/chukwa/lib"), "collector.dict")),
65 HSlurper.fromHConf(collectorConf));
66 } catch(Exception e) {e.printStackTrace();}
67
68 int portNum = conf.getInt("chukwaCollector.http.port", 9999);
69 THREADS = conf.getInt("chukwaCollector.http.threads", THREADS);
70
71
72 ChukwaWriter w = null;
73 Map<String, HttpServlet> servletsToAdd = new TreeMap<String, HttpServlet>();
74 ServletCollector servletCollector = new ServletCollector(conf);
75 for(String arg: args) {
76 if(arg.startsWith("writer=")) {
77 String writerCmd = arg.substring("writer=".length());
78 if (writerCmd.equals("pretend") || writerCmd.equals("pretend-quietly")) {
79 boolean verbose = !writerCmd.equals("pretend-quietly");
80 w = new ConsoleWriter(verbose);
81 w.init(conf);
82 servletCollector.setWriter(w);
83 } else
84 conf.set("chukwaCollector.writerClass", writerCmd);
85 } else if(arg.startsWith("servlet=")) {
86 String servletCmd = arg.substring("servlet=".length());
87 String[] halves = servletCmd.split("@");
88 try {
89 Class<?> servletClass = Class.forName(halves[0]);
90 HttpServlet srvlet = (HttpServlet) servletClass.newInstance();
91 if(!halves[1].startsWith("/"))
92 halves[1] = "/" + halves[1];
93 servletsToAdd.put(halves[1], srvlet);
94 } catch(Exception e) {
95 e.printStackTrace();
96 }
97 } else if(arg.startsWith("portno=")) {
98 portNum = Integer.parseInt(arg.substring("portno=".length()));
99 } else {
100 System.out.println("WARNING: unknown command line arg " + arg);
101 System.out.println("Invoke collector with command line arg 'help' for usage");
102 }
103 }
104
105
106 SelectChannelConnector jettyConnector = new SelectChannelConnector();
107 jettyConnector.setLowResourcesConnections(THREADS - 10);
108 jettyConnector.setLowResourceMaxIdleTime(1500);
109 jettyConnector.setPort(portNum);
110
111
112 jettyServer = new Server(portNum);
113 jettyServer.setConnectors(new Connector[] { jettyConnector });
114 org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
115 pool.setMaxThreads(THREADS);
116 jettyServer.setThreadPool(pool);
117
118
119 Context root = new Context(jettyServer, "/", Context.SESSIONS);
120 root.addServlet(new ServletHolder(servletCollector), "/*");
121
122 if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false))
123 root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
124
125 if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false))
126 root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH);
127
128
129 root.setAllowNullPathInfo(false);
130
131
132 for(Map.Entry<String, HttpServlet> e: servletsToAdd.entrySet()) {
133 root.addServlet(new ServletHolder(e.getValue()), e.getKey());
134 }
135
136
137 jettyServer.start();
138 jettyServer.setStopAtShutdown(true);
139
140 System.out.println("started Chukwa http collector on port " + portNum);
141 System.out.close();
142 System.err.close();
143 } catch (Exception e) {
144 e.printStackTrace();
145 DaemonWatcher.bailout(-1);
146 }
147
148 }
149
150 }