This project has retired. For details please refer to its
Attic page.
DataFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection;
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.lang.reflect.Constructor;
25 import java.util.Iterator;
26
27 import org.apache.hadoop.conf.*;
28 import org.apache.hadoop.chukwa.Chunk;
29 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
30 import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
31 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
32 import org.apache.log4j.Logger;
33
34 public class DataFactory {
35 static Logger log = Logger.getLogger(DataFactory.class);
36 static final String COLLECTORS_FILENAME = "collectors";
37 static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
38
39 protected static final DataFactory dataFactory = new DataFactory();
40 private ChunkQueue chunkQueue = null;
41
42 private String defaultTags = "";
43
44 private DataFactory() {
45 }
46
47 public static DataFactory getInstance() {
48 return dataFactory;
49 }
50
51 public synchronized ChunkQueue getEventQueue() {
52 if (chunkQueue == null) {
53 chunkQueue = createEventQueue();
54 }
55 return chunkQueue;
56 }
57
58 public void put(Chunk c) throws InterruptedException {
59 chunkQueue.add(c);
60 }
61
62 public synchronized ChunkQueue createEventQueue() {
63 Configuration conf = ChukwaAgent.getStaticConfiguration();
64 if(conf == null){
65
66 return new MemLimitQueue(null);
67 }
68 String receiver = conf.get(CHUNK_QUEUE);
69 ChunkQueue queue = null;
70 if(receiver == null){
71 log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
72 queue = new MemLimitQueue(conf);
73 return queue;
74 }
75
76 try {
77 Class<?> clazz = Class.forName(receiver);
78 log.info(clazz);
79 if(!ChunkQueue.class.isAssignableFrom(clazz)){
80 throw new Exception(receiver + " is not an instance of ChunkQueue");
81 }
82 try {
83 Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
84 queue = (ChunkQueue) ctor.newInstance(conf);
85 } catch(NoSuchMethodException nsme){
86
87 queue = (ChunkQueue) clazz.newInstance();
88 }
89 } catch(Exception e) {
90 log.error("Could not instantiate configured ChunkQueue due to: " + e);
91 log.error("Defaulting to MemLimitQueue");
92 queue = new MemLimitQueue(conf);
93 }
94 return queue;
95 }
96
97 public String getDefaultTags() {
98 return defaultTags;
99 }
100
101 public void setDefaultTags(String tags) {
102 defaultTags = tags;
103 }
104
105 public void addDefaultTag(String tag) {
106 this.defaultTags += " " + tag.trim();
107 }
108
109
110
111
112
113
114
115 public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws IOException {
116 String chukwaHome = System.getenv("CHUKWA_HOME");
117 if (chukwaHome == null) {
118 chukwaHome = ".";
119 }
120
121 if (!chukwaHome.endsWith("/")) {
122 chukwaHome = chukwaHome + File.separator;
123 }
124 log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
125
126 String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
127 if (chukwaConf == null) {
128 chukwaConf = chukwaHome + "conf" + File.separator;
129 }
130
131 log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
132
133 log.info("setting up collectors file: " + chukwaConf + File.separator
134 + COLLECTORS_FILENAME);
135 File collectors = new File(chukwaConf + File.separator + filename);
136 try {
137 return new RetryListOfCollectors(collectors, conf);
138 } catch (java.io.IOException e) {
139 log.error("failed to read collectors file: ", e);
140 throw e;
141 }
142 }
143 public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
144 return getCollectorURLs(conf, "collectors");
145 }
146
147 }