This project has retired. For details please refer to its Attic page.
RetryListOfCollectors xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.*;
23  import java.util.*;
24  import org.apache.hadoop.conf.Configuration;
25  
26  /***
27   * An iterator returning a list of Collectors to try. This class is
28   * nondeterministic, since it puts collectors back on the list after some
29   * period.
30   * 
31   * No node will be polled more than once per maxRetryRateMs milliseconds.
32   * hasNext() will continue return true if you have not called it recently.
33   * 
34   * 
35   */
36  public class RetryListOfCollectors implements Iterator<String>, Cloneable {
37  
38    int maxRetryRateMs;
39    List<String> collectors;
40    long lastLookAtFirstNode;
41    int nextCollector = 0;
42    private String portNo;
43    Configuration conf;
44    public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
45  
46    public RetryListOfCollectors(File collectorFile, Configuration conf)
47        throws IOException {
48      this(conf);
49      try {
50        BufferedReader br = new BufferedReader(new FileReader(collectorFile));
51        String line, parsedline;
52        while ((line = br.readLine()) != null) {
53          parsedline = canonicalizeLine(line);
54          collectors.add(parsedline);
55        }
56        
57        br.close();
58      } catch (FileNotFoundException e) {
59        System.err.println("Error in RetryListOfCollectors() opening file"
60              + collectorFile.getCanonicalPath() + ", double check that you have"
61              + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"
62              + " exists and is in classpath");
63        throw e;
64      } catch (IOException e) {
65        System.err
66            .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
67        throw e;
68      }
69      shuffleList();
70    }
71  
72    private String canonicalizeLine(String line) {
73      String parsedline;
74      if (!line.contains("://")) {
75        // no protocol, assume http
76        if (line.matches(".*:\\d+.*")) {
77          parsedline = "http://" + line+"/";
78        } else {
79          parsedline = "http://" + line + ":" + portNo;
80        }
81      } else {
82        if (line.matches(".*:\\d+.*")) {
83          parsedline = line;
84        } else {
85          parsedline = line + ":" + portNo;
86        }
87      }
88      if(!parsedline.matches(".*\\w/.*")) //no resource name
89        parsedline = parsedline+"/";
90      return parsedline;
91    }
92  
93    /**
94     * This is only used for debugging. Possibly it should sanitize urls the same way the other
95     * constructor does.
96     * @param collectors
97     * @param maxRetryRateMs
98     */
99    public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
100     this(conf);
101     this.collectors.addAll(collectors);
102     //we don't shuffle the list here -- this constructor is only used for test purposes
103   }
104   
105   public RetryListOfCollectors(Configuration conf) {
106     collectors = new ArrayList<String>();
107     this.conf = conf;
108     portNo = conf.get("chukwaCollector.http.port", "8080");
109     maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
110     lastLookAtFirstNode = 0;
111   }
112 
113   // for now, use a simple O(n^2) algorithm.
114   // safe, because we only do this once, and on smallish lists
115   public void shuffleList() {
116     ArrayList<String> newList = new ArrayList<String>();
117     Random r = new java.util.Random();
118     while (!collectors.isEmpty()) {
119       int toRemove = r.nextInt(collectors.size());
120       String next = collectors.remove(toRemove);
121       newList.add(next);
122     }
123     collectors = newList;
124   }
125 
126   public boolean hasNext() {
127     return collectors.size() > 0
128         && ((nextCollector != 0) || (System.currentTimeMillis()
129             - lastLookAtFirstNode > maxRetryRateMs));
130   }
131 
132   public String next() {
133     if (hasNext()) {
134       int currCollector = nextCollector;
135       nextCollector = (nextCollector + 1) % collectors.size();
136       if (currCollector == 0)
137         lastLookAtFirstNode = System.currentTimeMillis();
138       return collectors.get(currCollector);
139     } else
140       return null;
141   }
142 
143   public void add(String collector) {
144     collectors.add(collector);
145   }
146 
147   public void remove() {
148     throw new UnsupportedOperationException();
149     // FIXME: maybe just remove a collector from our list and then
150     // FIXME: make sure next doesn't break (i.e. reset nextCollector if
151     // necessary)
152   }
153 
154   /**
155    * 
156    * @return total number of collectors in list
157    */
158   int total() {
159     return collectors.size();
160   }
161   
162   public RetryListOfCollectors clone() {
163     try {
164       RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
165       return clone;
166     } catch(CloneNotSupportedException e) {
167       return null;
168     }
169   }
170 
171 }