1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.sender;
202122import java.io.*;
23import java.nio.charset.Charset;
24import java.util.*;
2526import org.apache.hadoop.conf.Configuration;
2728/***29 * An iterator returning a list of Collectors to try. This class is30 * nondeterministic, since it puts collectors back on the list after some31 * period.32 * 33 * No node will be polled more than once per maxRetryRateMs milliseconds.34 * hasNext() will continue return true if you have not called it recently.35 * 36 * 37 */38publicclassRetryListOfCollectorsimplements Iterator<String>, Cloneable {
3940int maxRetryRateMs;
41 List<String> collectors;
42long lastLookAtFirstNode;
43int nextCollector = 0;
44private String portNo;
45publicstaticfinal String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
4647publicRetryListOfCollectors(File collectorFile, Configuration conf)
48throws IOException {
49this(conf);
50try {
51 FileInputStream fis = new FileInputStream(collectorFile);
52 BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
53 String line, parsedline;
54while ((line = br.readLine()) != null) {
55 parsedline = canonicalizeLine(line);
56 collectors.add(parsedline);
57 }
5859 br.close();
60 } catch (FileNotFoundException e) {
61 System.err.println("Error in RetryListOfCollectors() opening file"62 + collectorFile.getCanonicalPath() + ", double check that you have"63 + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"64 + " exists and is in classpath");
65throw e;
66 } catch (IOException e) {
67 System.err
68 .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
69throw e;
70 }
71 shuffleList();
72 }
7374private String canonicalizeLine(String line) {
75 String parsedline;
76if (!line.contains("://")) {
77// no protocol, assume http78if (line.matches(".*:\\d+.*")) {
79 parsedline = "http://" + line+"/";
80 } else {
81 parsedline = "http://" + line + ":" + portNo;
82 }
83 } else {
84if (line.matches(".*:\\d+.*")) {
85 parsedline = line;
86 } else {
87 parsedline = line + ":" + portNo;
88 }
89 }
90if(!parsedline.matches(".*\\w/.*")) //no resource name91 parsedline = parsedline+"/";
92return parsedline;
93 }
9495/**96 * This is only used for debugging. Possibly it should sanitize urls the same way the other97 * constructor does.98 * @param collectors is list of collector hostname99 * @param conf is Chukwa configuration100 */101publicRetryListOfCollectors(final List<String> collectors, Configuration conf) {
102this(conf);
103this.collectors.addAll(collectors);
104//we don't shuffle the list here -- this constructor is only used for test purposes105 }
106107publicRetryListOfCollectors(Configuration conf) {
108 collectors = new ArrayList<String>();
109 portNo = conf.get("chukwaCollector.http.port", "8080");
110 maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
111 lastLookAtFirstNode = 0;
112 }
113114// for now, use a simple O(n^2) algorithm.115// safe, because we only do this once, and on smallish lists116publicvoid shuffleList() {
117 ArrayList<String> newList = new ArrayList<String>();
118 Random r = new java.util.Random();
119while (!collectors.isEmpty()) {
120int toRemove = r.nextInt(collectors.size());
121 String next = collectors.remove(toRemove);
122 newList.add(next);
123 }
124 collectors = newList;
125 }
126127publicboolean hasNext() {
128return collectors.size() > 0
129 && ((nextCollector != 0) || (System.currentTimeMillis()
130 - lastLookAtFirstNode > maxRetryRateMs));
131 }
132133public String next() {
134if (hasNext()) {
135int currCollector = nextCollector;
136 nextCollector = (nextCollector + 1) % collectors.size();
137if (currCollector == 0)
138 lastLookAtFirstNode = System.currentTimeMillis();
139return collectors.get(currCollector);
140 } else141returnnull;
142 }
143144publicvoid add(String collector) {
145 collectors.add(collector);
146 }
147148publicvoid remove() {
149thrownew UnsupportedOperationException();
150// FIXME: maybe just remove a collector from our list and then151// FIXME: make sure next doesn't break (i.e. reset nextCollector if152// necessary)153 }
154155/**156 * 157 * @return total number of collectors in list158 */159int total() {
160return collectors.size();
161 }
162163publicRetryListOfCollectors clone() {
164try {
165RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
166return clone;
167 } catch(CloneNotSupportedException e) {
168returnnull;
169 }
170 }
171172 }