This project has retired. For details please refer to its
Attic page.
DataFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection;
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.lang.reflect.Constructor;
25 import java.util.Iterator;
26
27 import org.apache.hadoop.conf.*;
28 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29 import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
30 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
31 import org.apache.log4j.Logger;
32
33 public class DataFactory {
34 static Logger log = Logger.getLogger(DataFactory.class);
35 static final String COLLECTORS_FILENAME = "collectors";
36 static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
37
38 private static DataFactory dataFactory = null;
39 private ChunkQueue chunkQueue = null;
40
41 private String defaultTags = "";
42
43 static {
44 dataFactory = new DataFactory();
45 }
46
47 private DataFactory() {
48 }
49
50 public static DataFactory getInstance() {
51 return dataFactory;
52 }
53
54 public synchronized ChunkQueue getEventQueue() {
55 if (chunkQueue == null) {
56 chunkQueue = createEventQueue();
57 }
58 return chunkQueue;
59 }
60
61 public synchronized ChunkQueue createEventQueue() {
62 Configuration conf = ChukwaAgent.getStaticConfiguration();
63 if(conf == null){
64
65 return new MemLimitQueue(null);
66 }
67 String receiver = conf.get(CHUNK_QUEUE);
68 ChunkQueue queue = null;
69 if(receiver == null){
70 log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
71 queue = new MemLimitQueue(conf);
72 return queue;
73 }
74
75 try {
76 Class<?> clazz = Class.forName(receiver);
77 log.info(clazz);
78 if(!ChunkQueue.class.isAssignableFrom(clazz)){
79 throw new Exception(receiver + " is not an instance of ChunkQueue");
80 }
81 try {
82 Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
83 queue = (ChunkQueue) ctor.newInstance(conf);
84 } catch(NoSuchMethodException nsme){
85
86 queue = (ChunkQueue) clazz.newInstance();
87 }
88 } catch(Exception e) {
89 log.error("Could not instantiate configured ChunkQueue due to: " + e);
90 log.error("Defaulting to MemLimitQueue");
91 queue = new MemLimitQueue(conf);
92 }
93 return queue;
94 }
95
96 public String getDefaultTags() {
97 return defaultTags;
98 }
99
100 public void addDefaultTag(String tag) {
101 this.defaultTags += " " + tag.trim();
102 }
103
104
105
106
107
108 public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws IOException {
109 String chukwaHome = System.getenv("CHUKWA_HOME");
110 if (chukwaHome == null) {
111 chukwaHome = ".";
112 }
113
114 if (!chukwaHome.endsWith("/")) {
115 chukwaHome = chukwaHome + File.separator;
116 }
117 log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
118
119 String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
120 if (chukwaConf == null) {
121 chukwaConf = chukwaHome + "conf" + File.separator;
122 }
123
124 log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
125
126 log.info("setting up collectors file: " + chukwaConf + File.separator
127 + COLLECTORS_FILENAME);
128 File collectors = new File(chukwaConf + File.separator + filename);
129 try {
130 return new RetryListOfCollectors(collectors, conf);
131 } catch (java.io.IOException e) {
132 log.error("failed to read collectors file: ", e);
133 throw e;
134 }
135 }
136 public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
137 return getCollectorURLs(conf, "collectors");
138 }
139
140 }