1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.sender;
20
21
22 import java.io.*;
23 import java.nio.charset.Charset;
24 import java.util.*;
25
26 import org.apache.hadoop.conf.Configuration;
27
28
29
30
31
32
33
34
35
36
37
38 public class RetryListOfCollectors implements Iterator<String>, Cloneable {
39
40 int maxRetryRateMs;
41 List<String> collectors;
42 long lastLookAtFirstNode;
43 int nextCollector = 0;
44 private String portNo;
45 public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
46
47 public RetryListOfCollectors(File collectorFile, Configuration conf)
48 throws IOException {
49 this(conf);
50 try {
51 FileInputStream fis = new FileInputStream(collectorFile);
52 BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
53 String line, parsedline;
54 while ((line = br.readLine()) != null) {
55 parsedline = canonicalizeLine(line);
56 collectors.add(parsedline);
57 }
58
59 br.close();
60 } catch (FileNotFoundException e) {
61 System.err.println("Error in RetryListOfCollectors() opening file"
62 + collectorFile.getCanonicalPath() + ", double check that you have"
63 + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"
64 + " exists and is in classpath");
65 throw e;
66 } catch (IOException e) {
67 System.err
68 .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
69 throw e;
70 }
71 shuffleList();
72 }
73
74 private String canonicalizeLine(String line) {
75 String parsedline;
76 if (!line.contains("://")) {
77
78 if (line.matches(".*:\\d+.*")) {
79 parsedline = "http://" + line+"/";
80 } else {
81 parsedline = "http://" + line + ":" + portNo;
82 }
83 } else {
84 if (line.matches(".*:\\d+.*")) {
85 parsedline = line;
86 } else {
87 parsedline = line + ":" + portNo;
88 }
89 }
90 if(!parsedline.matches(".*\\w/.*"))
91 parsedline = parsedline+"/";
92 return parsedline;
93 }
94
95
96
97
98
99
100
101 public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
102 this(conf);
103 this.collectors.addAll(collectors);
104
105 }
106
107 public RetryListOfCollectors(Configuration conf) {
108 collectors = new ArrayList<String>();
109 portNo = conf.get("chukwaCollector.http.port", "8080");
110 maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
111 lastLookAtFirstNode = 0;
112 }
113
114
115
116 public void shuffleList() {
117 ArrayList<String> newList = new ArrayList<String>();
118 Random r = new java.util.Random();
119 while (!collectors.isEmpty()) {
120 int toRemove = r.nextInt(collectors.size());
121 String next = collectors.remove(toRemove);
122 newList.add(next);
123 }
124 collectors = newList;
125 }
126
127 public boolean hasNext() {
128 return collectors.size() > 0
129 && ((nextCollector != 0) || (System.currentTimeMillis()
130 - lastLookAtFirstNode > maxRetryRateMs));
131 }
132
133 public String next() {
134 if (hasNext()) {
135 int currCollector = nextCollector;
136 nextCollector = (nextCollector + 1) % collectors.size();
137 if (currCollector == 0)
138 lastLookAtFirstNode = System.currentTimeMillis();
139 return collectors.get(currCollector);
140 } else
141 return null;
142 }
143
144 public void add(String collector) {
145 collectors.add(collector);
146 }
147
148 public void remove() {
149 throw new UnsupportedOperationException();
150
151
152
153 }
154
155
156
157
158
159 int total() {
160 return collectors.size();
161 }
162
163 public RetryListOfCollectors clone() {
164 try {
165 RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
166 return clone;
167 } catch(CloneNotSupportedException e) {
168 return null;
169 }
170 }
171
172 }