This project has retired. For details please refer to its Attic page.
RetryListOfCollectors xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.*;
23  import java.nio.charset.Charset;
24  import java.util.*;
25  
26  import org.apache.hadoop.conf.Configuration;
27  
28  /***
29   * An iterator returning a list of Collectors to try. This class is
30   * nondeterministic, since it puts collectors back on the list after some
31   * period.
32   * 
33   * No node will be polled more than once per maxRetryRateMs milliseconds.
34   * hasNext() will continue return true if you have not called it recently.
35   * 
36   * 
37   */
38  public class RetryListOfCollectors implements Iterator<String>, Cloneable {
39  
40    int maxRetryRateMs;
41    List<String> collectors;
42    long lastLookAtFirstNode;
43    int nextCollector = 0;
44    private String portNo;
45    public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
46  
47    public RetryListOfCollectors(File collectorFile, Configuration conf)
48        throws IOException {
49      this(conf);
50      try {
51        FileInputStream fis = new FileInputStream(collectorFile);
52        BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
53        String line, parsedline;
54        while ((line = br.readLine()) != null) {
55          parsedline = canonicalizeLine(line);
56          collectors.add(parsedline);
57        }
58        
59        br.close();
60      } catch (FileNotFoundException e) {
61        System.err.println("Error in RetryListOfCollectors() opening file"
62              + collectorFile.getCanonicalPath() + ", double check that you have"
63              + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"
64              + " exists and is in classpath");
65        throw e;
66      } catch (IOException e) {
67        System.err
68            .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
69        throw e;
70      }
71      shuffleList();
72    }
73  
74    private String canonicalizeLine(String line) {
75      String parsedline;
76      if (!line.contains("://")) {
77        // no protocol, assume http
78        if (line.matches(".*:\\d+.*")) {
79          parsedline = "http://" + line+"/";
80        } else {
81          parsedline = "http://" + line + ":" + portNo;
82        }
83      } else {
84        if (line.matches(".*:\\d+.*")) {
85          parsedline = line;
86        } else {
87          parsedline = line + ":" + portNo;
88        }
89      }
90      if(!parsedline.matches(".*\\w/.*")) //no resource name
91        parsedline = parsedline+"/";
92      return parsedline;
93    }
94  
95    /**
96     * This is only used for debugging. Possibly it should sanitize urls the same way the other
97     * constructor does.
98     * @param collectors is list of collector hostname
99     * @param conf is Chukwa configuration
100    */
101   public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
102     this(conf);
103     this.collectors.addAll(collectors);
104     //we don't shuffle the list here -- this constructor is only used for test purposes
105   }
106   
107   public RetryListOfCollectors(Configuration conf) {
108     collectors = new ArrayList<String>();
109     portNo = conf.get("chukwaCollector.http.port", "8080");
110     maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
111     lastLookAtFirstNode = 0;
112   }
113 
114   // for now, use a simple O(n^2) algorithm.
115   // safe, because we only do this once, and on smallish lists
116   public void shuffleList() {
117     ArrayList<String> newList = new ArrayList<String>();
118     Random r = new java.util.Random();
119     while (!collectors.isEmpty()) {
120       int toRemove = r.nextInt(collectors.size());
121       String next = collectors.remove(toRemove);
122       newList.add(next);
123     }
124     collectors = newList;
125   }
126 
127   public boolean hasNext() {
128     return collectors.size() > 0
129         && ((nextCollector != 0) || (System.currentTimeMillis()
130             - lastLookAtFirstNode > maxRetryRateMs));
131   }
132 
133   public String next() {
134     if (hasNext()) {
135       int currCollector = nextCollector;
136       nextCollector = (nextCollector + 1) % collectors.size();
137       if (currCollector == 0)
138         lastLookAtFirstNode = System.currentTimeMillis();
139       return collectors.get(currCollector);
140     } else
141       return null;
142   }
143 
144   public void add(String collector) {
145     collectors.add(collector);
146   }
147 
148   public void remove() {
149     throw new UnsupportedOperationException();
150     // FIXME: maybe just remove a collector from our list and then
151     // FIXME: make sure next doesn't break (i.e. reset nextCollector if
152     // necessary)
153   }
154 
155   /**
156    * 
157    * @return total number of collectors in list
158    */
159   int total() {
160     return collectors.size();
161   }
162   
163   public RetryListOfCollectors clone() {
164     try {
165       RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
166       return clone;
167     } catch(CloneNotSupportedException e) {
168       return null;
169     }
170   }
171 
172 }