This project has retired. For details please refer to its
Attic page.
ChukwaHBaseStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datastore;
19
20 import java.io.IOException;
21 import java.util.Calendar;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.TimeUnit;
29 import java.util.regex.Matcher;
30 import java.util.regex.Pattern;
31
32 import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint;
33 import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
34 import org.apache.hadoop.chukwa.hicc.bean.Series;
35 import org.apache.hadoop.chukwa.util.ExceptionUtil;
36
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.client.HBaseAdmin;
42 import org.apache.hadoop.hbase.client.HTableInterface;
43 import org.apache.hadoop.hbase.client.HTablePool;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.client.ResultScanner;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
48 import org.apache.hadoop.hbase.filter.RowFilter;
49 import org.apache.hadoop.hbase.filter.RegexStringComparator;
50 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
51 import org.apache.log4j.Logger;
52
53 public class ChukwaHBaseStore {
54 private static Configuration hconf = HBaseConfiguration.create();
55 private static HTablePool pool = new HTablePool(hconf, 60);
56 static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
57
58 public static Series getSeries(String tableName, String rkey, String family, String column,
59 long startTime, long endTime, boolean filterByRowKey) {
60 StringBuilder seriesName = new StringBuilder();
61 seriesName.append(rkey);
62 seriesName.append(":");
63 seriesName.append(family);
64 seriesName.append(":");
65 seriesName.append(column);
66
67 Series series = new Series(seriesName.toString());
68 try {
69 HTableInterface table = pool.getTable(tableName);
70 Calendar c = Calendar.getInstance();
71 c.setTimeInMillis(startTime);
72 c.set(Calendar.MINUTE, 0);
73 c.set(Calendar.SECOND, 0);
74 c.set(Calendar.MILLISECOND, 0);
75 String startRow = c.getTimeInMillis()+rkey;
76 Scan scan = new Scan();
77 scan.addColumn(family.getBytes(), column.getBytes());
78 scan.setStartRow(startRow.getBytes());
79 scan.setTimeRange(startTime, endTime);
80 scan.setMaxVersions();
81 if(filterByRowKey) {
82 RowFilter rf = new RowFilter(CompareOp.EQUAL, new
83 RegexStringComparator("[0-9]+-"+rkey+"$"));
84 scan.setFilter(rf);
85 }
86 ResultScanner results = table.getScanner(scan);
87 Iterator<Result> it = results.iterator();
88
89
90 while(it.hasNext()) {
91 Result result = it.next();
92 String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
93 double value = Double.parseDouble(temp);
94
95 String buf = new String(result.getRow());
96 Long timestamp = Long.parseLong(buf.split("-")[0]);
97
98
99 series.add(timestamp, value);
100 }
101 results.close();
102 table.close();
103 } catch(Exception e) {
104 log.error(ExceptionUtil.getStackTrace(e));
105 }
106 return series;
107 }
108
109 public static Set<String> getFamilyNames(String tableName) {
110 Set<String> familyNames = new CopyOnWriteArraySet<String>();
111 try {
112 HTableInterface table = pool.getTable(tableName);
113 Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
114 for(byte[] name : families) {
115 familyNames.add(new String(name));
116 }
117 table.close();
118 } catch(Exception e) {
119 log.error(ExceptionUtil.getStackTrace(e));
120 }
121 return familyNames;
122
123 }
124
125 public static Set<String> getTableNames() {
126 Set<String> tableNames = new CopyOnWriteArraySet<String>();
127 try {
128 HBaseAdmin admin = new HBaseAdmin(hconf);
129 HTableDescriptor[] td = admin.listTables();
130 for(HTableDescriptor table : td) {
131 tableNames.add(new String(table.getName()));
132 }
133 } catch(Exception e) {
134 log.error(ExceptionUtil.getStackTrace(e));
135 }
136 return tableNames;
137 }
138
139 public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
140 Result result = it.next();
141 if(result!=null) {
142 List<KeyValue> kvList = result.list();
143 for(KeyValue kv : kvList) {
144 columnNames.add(new String(kv.getQualifier()));
145 }
146 }
147 }
148
149 public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
150 Set<String> columnNames = new CopyOnWriteArraySet<String>();
151 try {
152 HTableInterface table = pool.getTable(tableName);
153 Scan scan = new Scan();
154 if(!fullScan) {
155
156 StringBuilder temp = new StringBuilder();
157 temp.append(endTime-300000L);
158 scan.setStartRow(temp.toString().getBytes());
159 temp.setLength(0);
160 temp.append(endTime);
161 scan.setStopRow(temp.toString().getBytes());
162 } else {
163 StringBuilder temp = new StringBuilder();
164 temp.append(startTime);
165 scan.setStartRow(temp.toString().getBytes());
166 temp.setLength(0);
167 temp.append(endTime);
168 scan.setStopRow(temp.toString().getBytes());
169 }
170 scan.addFamily(family.getBytes());
171 ResultScanner results = table.getScanner(scan);
172 Iterator<Result> it = results.iterator();
173 if(fullScan) {
174 while(it.hasNext()) {
175 getColumnNamesHelper(columnNames, it);
176 }
177 } else {
178 getColumnNamesHelper(columnNames, it);
179 }
180 results.close();
181 table.close();
182 } catch(Exception e) {
183 log.error(ExceptionUtil.getStackTrace(e));
184 }
185 return columnNames;
186 }
187
188 public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
189 Set<String> rows = new HashSet<String>();
190 HTableInterface table = pool.getTable(tableName);
191 try {
192 Scan scan = new Scan();
193 scan.addColumn(family.getBytes(), qualifier.getBytes());
194 if(!fullScan) {
195
196 StringBuilder temp = new StringBuilder();
197 temp.append(endTime-300000L);
198 scan.setStartRow(temp.toString().getBytes());
199 temp.setLength(0);
200 temp.append(endTime);
201 scan.setStopRow(temp.toString().getBytes());
202 } else {
203 StringBuilder temp = new StringBuilder();
204 temp.append(startTime);
205 scan.setStartRow(temp.toString().getBytes());
206 temp.setLength(0);
207 temp.append(endTime);
208 scan.setStopRow(temp.toString().getBytes());
209 }
210 ResultScanner results = table.getScanner(scan);
211 Iterator<Result> it = results.iterator();
212 while(it.hasNext()) {
213 Result result = it.next();
214 String buffer = new String(result.getRow());
215 String[] parts = buffer.split("-", 2);
216 if(!rows.contains(parts[1])) {
217 rows.add(parts[1]);
218 }
219 }
220 results.close();
221 table.close();
222 } catch(Exception e) {
223 log.error(ExceptionUtil.getStackTrace(e));
224 }
225 return rows;
226 }
227
228 public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
229 return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
230 }
231
232 public static Set<String> getClusterNames(long startTime, long endTime) {
233 String tableName = "SystemMetrics";
234 String family = "system";
235 String column = "ctags";
236 Set<String> clusters = new HashSet<String>();
237 HTableInterface table = pool.getTable(tableName);
238 Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
239 try {
240 Scan scan = new Scan();
241 scan.addColumn(family.getBytes(), column.getBytes());
242 scan.setTimeRange(startTime, endTime);
243 ResultScanner results = table.getScanner(scan);
244 Iterator<Result> it = results.iterator();
245 while(it.hasNext()) {
246 Result result = it.next();
247 String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
248 Matcher m = p.matcher(buffer);
249 if(m.matches()) {
250 clusters.add(m.group(1));
251 }
252 }
253 results.close();
254 table.close();
255 } catch(Exception e) {
256 log.error(ExceptionUtil.getStackTrace(e));
257 }
258 return clusters;
259 }
260
261 public static Heatmap getHeatmap(String tableName, String family, String column,
262 long startTime, long endTime, double max, double scale, int height) {
263 final long MINUTE = TimeUnit.MINUTES.toMillis(1);
264 Heatmap heatmap = new Heatmap();
265 HTableInterface table = pool.getTable(tableName);
266 try {
267 Scan scan = new Scan();
268 ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
269 scan.addFamily(family.getBytes());
270 scan.setFilter(cpf);
271 scan.setTimeRange(startTime, endTime);
272 scan.setBatch(10000);
273 ResultScanner results = table.getScanner(scan);
274 Iterator<Result> it = results.iterator();
275 int index = 0;
276
277 int y = 0;
278 HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
279 while(it.hasNext()) {
280 Result result = it.next();
281 List<KeyValue> kvList = result.list();
282 for(KeyValue kv : kvList) {
283 String key = parseRowKey(result.getRow());
284 StringBuilder tmp = new StringBuilder();
285 tmp.append(key);
286 tmp.append(":");
287 tmp.append(new String(kv.getQualifier()));
288 String seriesName = tmp.toString();
289 long time = parseTime(result.getRow());
290
291 int x = (int) ((time - startTime) / MINUTE);
292 if(keyMap.containsKey(seriesName)) {
293 y = keyMap.get(seriesName);
294 } else {
295 keyMap.put(seriesName, new Integer(index));
296 y = index;
297 index++;
298 }
299 double v = Double.parseDouble(new String(kv.getValue()));
300 heatmap.put(x, y, v);
301 if(v > max) {
302 max = v;
303 }
304 }
305 }
306 results.close();
307 table.close();
308 int radius = height / index;
309
310 heatmap.putMax(scale);
311 for(HeatMapPoint point : heatmap.getHeatmap()) {
312 double round = point.count / max * scale;
313 round = Math.round(round * 100.0) / 100.0;
314 point.put(point.x, point.y * radius, round);
315 }
316 heatmap.putRadius(radius);
317 heatmap.putSeries(index);
318 } catch (IOException e) {
319 log.error(ExceptionUtil.getStackTrace(e));
320 }
321 return heatmap;
322 }
323
324 private static String parseRowKey(byte[] row) {
325 String key = new String(row);
326 String[] parts = key.split("-", 2);
327 return parts[1];
328 }
329
330 private static long parseTime(byte[] row) {
331 String key = new String(row);
332 String[] parts = key.split("-", 2);
333 long time = Long.parseLong(parts[0]);
334 return time;
335 }
336
337 }