This project has retired. For details please refer to its Attic page.
DataFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.lang.reflect.Constructor;
25  import java.util.Iterator;
26  
27  import org.apache.hadoop.conf.*;
28  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29  import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
30  import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
31  import org.apache.log4j.Logger;
32  
33  public class DataFactory {
34    static Logger log = Logger.getLogger(DataFactory.class);
35    static final String COLLECTORS_FILENAME = "collectors";
36    static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
37    
38    private static DataFactory dataFactory = null;
39    private ChunkQueue chunkQueue = null;
40  
41    private String defaultTags = "";
42    
43    static {
44      dataFactory = new DataFactory();
45    }
46  
47    private DataFactory() {
48    }
49  
50    public static DataFactory getInstance() {
51      return dataFactory;
52    }
53  
54    public synchronized ChunkQueue getEventQueue() {
55      if (chunkQueue == null) {
56        chunkQueue = createEventQueue();
57      }
58      return chunkQueue;
59    }
60    
61    public synchronized ChunkQueue createEventQueue() {
62      Configuration conf = ChukwaAgent.getStaticConfiguration();
63      if(conf == null){
64      //Must be a unit test, use default queue with default configuration
65        return new MemLimitQueue(null);
66      }
67      String receiver = conf.get(CHUNK_QUEUE);
68      ChunkQueue queue = null;
69      if(receiver == null){
70        log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
71        queue = new MemLimitQueue(conf);
72        return queue;
73      }
74      
75      try {
76        Class<?> clazz = Class.forName(receiver);
77        log.info(clazz);
78        if(!ChunkQueue.class.isAssignableFrom(clazz)){
79          throw new Exception(receiver + " is not an instance of ChunkQueue");
80        }
81        try {
82          Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
83          queue = (ChunkQueue) ctor.newInstance(conf);
84        } catch(NoSuchMethodException nsme){
85          //Queue implementations which take no configuration parameter
86          queue = (ChunkQueue) clazz.newInstance();
87        }
88      } catch(Exception e) {
89        log.error("Could not instantiate configured ChunkQueue due to: " + e);
90        log.error("Defaulting to MemLimitQueue");
91        queue = new MemLimitQueue(conf);
92      }
93      return queue;
94    }
95  
96    public String getDefaultTags() {
97      return defaultTags;
98    }
99    
100   public void addDefaultTag(String tag) {
101     this.defaultTags += " " + tag.trim();
102   }
103   
104   /**
105    * @return empty list if file does not exist
106    * @throws IOException on other error
107    */
108   public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws IOException {
109     String chukwaHome = System.getenv("CHUKWA_HOME");
110     if (chukwaHome == null) {
111       chukwaHome = ".";
112     }
113 
114     if (!chukwaHome.endsWith("/")) {
115       chukwaHome = chukwaHome + File.separator;
116     }
117     log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
118 
119     String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
120     if (chukwaConf == null) {
121       chukwaConf = chukwaHome + "conf" + File.separator;
122     }
123 
124     log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
125 
126     log.info("setting up collectors file: " + chukwaConf + File.separator
127         + COLLECTORS_FILENAME);
128     File collectors = new File(chukwaConf + File.separator + filename);
129     try {
130       return new RetryListOfCollectors(collectors, conf);
131     } catch (java.io.IOException e) {
132       log.error("failed to read collectors file: ", e);
133       throw e;
134     }
135   }
136   public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
137     return getCollectorURLs(conf, "collectors");
138   }
139 
140 }