1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.sender;
202122import java.io.*;
23import java.util.*;
24import org.apache.hadoop.conf.Configuration;
2526/***27 * An iterator returning a list of Collectors to try. This class is28 * nondeterministic, since it puts collectors back on the list after some29 * period.30 * 31 * No node will be polled more than once per maxRetryRateMs milliseconds.32 * hasNext() will continue return true if you have not called it recently.33 * 34 * 35 */36publicclassRetryListOfCollectorsimplements Iterator<String>, Cloneable {
3738int maxRetryRateMs;
39 List<String> collectors;
40long lastLookAtFirstNode;
41int nextCollector = 0;
42private String portNo;
43 Configuration conf;
44publicstaticfinal String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
4546publicRetryListOfCollectors(File collectorFile, Configuration conf)
47throws IOException {
48this(conf);
49try {
50 BufferedReader br = new BufferedReader(new FileReader(collectorFile));
51 String line, parsedline;
52while ((line = br.readLine()) != null) {
53 parsedline = canonicalizeLine(line);
54 collectors.add(parsedline);
55 }
5657 br.close();
58 } catch (FileNotFoundException e) {
59 System.err.println("Error in RetryListOfCollectors() opening file"60 + collectorFile.getCanonicalPath() + ", double check that you have"61 + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"62 + " exists and is in classpath");
63throw e;
64 } catch (IOException e) {
65 System.err
66 .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
67throw e;
68 }
69 shuffleList();
70 }
7172private String canonicalizeLine(String line) {
73 String parsedline;
74if (!line.contains("://")) {
75// no protocol, assume http76if (line.matches(".*:\\d+.*")) {
77 parsedline = "http://" + line+"/";
78 } else {
79 parsedline = "http://" + line + ":" + portNo;
80 }
81 } else {
82if (line.matches(".*:\\d+.*")) {
83 parsedline = line;
84 } else {
85 parsedline = line + ":" + portNo;
86 }
87 }
88if(!parsedline.matches(".*\\w/.*")) //no resource name89 parsedline = parsedline+"/";
90return parsedline;
91 }
9293/**94 * This is only used for debugging. Possibly it should sanitize urls the same way the other95 * constructor does.96 * @param collectors97 * @param maxRetryRateMs98 */99publicRetryListOfCollectors(final List<String> collectors, Configuration conf) {
100this(conf);
101this.collectors.addAll(collectors);
102//we don't shuffle the list here -- this constructor is only used for test purposes103 }
104105publicRetryListOfCollectors(Configuration conf) {
106 collectors = new ArrayList<String>();
107this.conf = conf;
108 portNo = conf.get("chukwaCollector.http.port", "8080");
109 maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
110 lastLookAtFirstNode = 0;
111 }
112113// for now, use a simple O(n^2) algorithm.114// safe, because we only do this once, and on smallish lists115publicvoid shuffleList() {
116 ArrayList<String> newList = new ArrayList<String>();
117 Random r = new java.util.Random();
118while (!collectors.isEmpty()) {
119int toRemove = r.nextInt(collectors.size());
120 String next = collectors.remove(toRemove);
121 newList.add(next);
122 }
123 collectors = newList;
124 }
125126publicboolean hasNext() {
127return collectors.size() > 0
128 && ((nextCollector != 0) || (System.currentTimeMillis()
129 - lastLookAtFirstNode > maxRetryRateMs));
130 }
131132public String next() {
133if (hasNext()) {
134int currCollector = nextCollector;
135 nextCollector = (nextCollector + 1) % collectors.size();
136if (currCollector == 0)
137 lastLookAtFirstNode = System.currentTimeMillis();
138return collectors.get(currCollector);
139 } else140returnnull;
141 }
142143publicvoid add(String collector) {
144 collectors.add(collector);
145 }
146147publicvoid remove() {
148thrownew UnsupportedOperationException();
149// FIXME: maybe just remove a collector from our list and then150// FIXME: make sure next doesn't break (i.e. reset nextCollector if151// necessary)152 }
153154/**155 * 156 * @return total number of collectors in list157 */158int total() {
159return collectors.size();
160 }
161162publicRetryListOfCollectors clone() {
163try {
164RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
165return clone;
166 } catch(CloneNotSupportedException e) {
167returnnull;
168 }
169 }
170171 }