This project has retired. For details please refer to its Attic page.
DataFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.lang.reflect.Constructor;
25  import java.util.Iterator;
26  
27  import org.apache.hadoop.conf.*;
28  import org.apache.hadoop.chukwa.Chunk;
29  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
30  import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
31  import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
32  import org.apache.log4j.Logger;
33  
34  public class DataFactory {
35    static Logger log = Logger.getLogger(DataFactory.class);
36    static final String COLLECTORS_FILENAME = "collectors";
37    static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
38    
39    protected static final DataFactory dataFactory = new DataFactory();
40    private ChunkQueue chunkQueue = null;
41  
42    private String defaultTags = "";
43    
44    private DataFactory() {
45    }
46  
47    public static DataFactory getInstance() {
48      return dataFactory;
49    }
50  
51    public synchronized ChunkQueue getEventQueue() {
52      if (chunkQueue == null) {
53        chunkQueue = createEventQueue();
54      }
55      return chunkQueue;
56    }
57  
58    public void put(Chunk c) throws InterruptedException {
59      chunkQueue.add(c);
60    }
61  
62    public synchronized ChunkQueue createEventQueue() {
63      Configuration conf = ChukwaAgent.getStaticConfiguration();
64      if(conf == null){
65      //Must be a unit test, use default queue with default configuration
66        return new MemLimitQueue(null);
67      }
68      String receiver = conf.get(CHUNK_QUEUE);
69      ChunkQueue queue = null;
70      if(receiver == null){
71        log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
72        queue = new MemLimitQueue(conf);
73        return queue;
74      }
75      
76      try {
77        Class<?> clazz = Class.forName(receiver);
78        log.info(clazz);
79        if(!ChunkQueue.class.isAssignableFrom(clazz)){
80          throw new Exception(receiver + " is not an instance of ChunkQueue");
81        }
82        try {
83          Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
84          queue = (ChunkQueue) ctor.newInstance(conf);
85        } catch(NoSuchMethodException nsme){
86          //Queue implementations which take no configuration parameter
87          queue = (ChunkQueue) clazz.newInstance();
88        }
89      } catch(Exception e) {
90        log.error("Could not instantiate configured ChunkQueue due to: " + e);
91        log.error("Defaulting to MemLimitQueue");
92        queue = new MemLimitQueue(conf);
93      }
94      return queue;
95    }
96  
97    public String getDefaultTags() {
98      return defaultTags;
99    }
100 
101   public void setDefaultTags(String tags) {
102     defaultTags = tags;
103   }
104 
105   public void addDefaultTag(String tag) {
106     this.defaultTags += " " + tag.trim();
107   }
108   
109   /**
110    * @param conf is Chukwa configuration
111    * @param filename is collector list
112    * @return empty list if file does not exist
113    * @throws IOException on other error
114    */
115   public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws IOException {
116     String chukwaHome = System.getenv("CHUKWA_HOME");
117     if (chukwaHome == null) {
118       chukwaHome = ".";
119     }
120 
121     if (!chukwaHome.endsWith("/")) {
122       chukwaHome = chukwaHome + File.separator;
123     }
124     log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
125 
126     String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
127     if (chukwaConf == null) {
128       chukwaConf = chukwaHome + "conf" + File.separator;
129     }
130 
131     log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
132 
133     log.info("setting up collectors file: " + chukwaConf + File.separator
134         + COLLECTORS_FILENAME);
135     File collectors = new File(chukwaConf + File.separator + filename);
136     try {
137       return new RetryListOfCollectors(collectors, conf);
138     } catch (java.io.IOException e) {
139       log.error("failed to read collectors file: ", e);
140       throw e;
141     }
142   }
143   public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
144     return getCollectorURLs(conf, "collectors");
145   }
146 
147 }