This project has retired. For details please refer to its Attic page.
ChukwaHBaseStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datastore;
19  
20  import java.io.IOException;
21  import java.util.Calendar;
22  import java.util.HashMap;
23  import java.util.HashSet;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.CopyOnWriteArraySet;
28  import java.util.concurrent.TimeUnit;
29  import java.util.regex.Matcher;
30  import java.util.regex.Pattern;
31  
32  import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint;
33  import org.apache.hadoop.chukwa.hicc.bean.Heatmap;
34  import org.apache.hadoop.chukwa.hicc.bean.Series;
35  import org.apache.hadoop.chukwa.util.ExceptionUtil;
36  
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.HTableInterface;
43  import org.apache.hadoop.hbase.client.HTablePool;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.ResultScanner;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
48  import org.apache.hadoop.hbase.filter.RowFilter;
49  import org.apache.hadoop.hbase.filter.RegexStringComparator;
50  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
51  import org.apache.log4j.Logger;
52  
53  public class ChukwaHBaseStore {
54    private static Configuration hconf = HBaseConfiguration.create();
55    private static HTablePool pool = new HTablePool(hconf, 60);
56    static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
57    
58    public static Series getSeries(String tableName, String rkey, String family, String column,
59        long startTime, long endTime, boolean filterByRowKey) {
60      StringBuilder seriesName = new StringBuilder();
61      seriesName.append(rkey);
62      seriesName.append(":");
63      seriesName.append(family);
64      seriesName.append(":");
65      seriesName.append(column);
66  
67      Series series = new Series(seriesName.toString());
68      try {
69        HTableInterface table = pool.getTable(tableName);
70        Calendar c = Calendar.getInstance();
71        c.setTimeInMillis(startTime);
72        c.set(Calendar.MINUTE, 0);
73        c.set(Calendar.SECOND, 0);
74        c.set(Calendar.MILLISECOND, 0);
75        String startRow = c.getTimeInMillis()+rkey;
76        Scan scan = new Scan();
77        scan.addColumn(family.getBytes(), column.getBytes());
78        scan.setStartRow(startRow.getBytes());
79        scan.setTimeRange(startTime, endTime);
80        scan.setMaxVersions();
81        if(filterByRowKey) {
82          RowFilter rf = new RowFilter(CompareOp.EQUAL, new 
83              RegexStringComparator("[0-9]+-"+rkey+"$")); 
84          scan.setFilter(rf);
85        }
86        ResultScanner results = table.getScanner(scan);
87        Iterator<Result> it = results.iterator();
88        // TODO: Apply discrete wavelet transformation to limit the output
89        // size to 1000 data points for graphing optimization. (i.e jwave)
90        while(it.hasNext()) {
91          Result result = it.next();
92          String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
93          double value = Double.parseDouble(temp);
94          // TODO: Pig Store function does not honor HBase timestamp, hence need to parse rowKey for timestamp.
95          String buf = new String(result.getRow());
96          Long timestamp = Long.parseLong(buf.split("-")[0]);
97          // If Pig Store function can honor HBase timestamp, use the following line is better.
98          // series.add(result.getCellValue().getTimestamp(), value);
99          series.add(timestamp, value);
100       }
101       results.close();
102       table.close();
103     } catch(Exception e) {
104       log.error(ExceptionUtil.getStackTrace(e));
105     }
106     return series;
107   }
108 
109   public static Set<String> getFamilyNames(String tableName) {
110     Set<String> familyNames = new CopyOnWriteArraySet<String>();
111     try {
112       HTableInterface table = pool.getTable(tableName);
113       Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
114       for(byte[] name : families) {
115         familyNames.add(new String(name));
116       }
117       table.close();
118     } catch(Exception e) {
119       log.error(ExceptionUtil.getStackTrace(e));
120     }
121     return familyNames;
122     
123   }
124   
125   public static Set<String> getTableNames() {
126     Set<String> tableNames = new CopyOnWriteArraySet<String>();
127     try {
128       HBaseAdmin admin = new HBaseAdmin(hconf);
129       HTableDescriptor[] td = admin.listTables();
130       for(HTableDescriptor table : td) {
131         tableNames.add(new String(table.getName()));
132       }
133     } catch(Exception e) {
134       log.error(ExceptionUtil.getStackTrace(e));
135     }
136     return tableNames;
137   }
138 
139   public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
140     Result result = it.next();
141     if(result!=null) {
142       List<KeyValue> kvList = result.list();
143       for(KeyValue kv : kvList) {
144         columnNames.add(new String(kv.getQualifier()));
145       }
146     }
147   }
148   
149   public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
150     Set<String> columnNames = new CopyOnWriteArraySet<String>();
151     try {
152       HTableInterface table = pool.getTable(tableName);
153       Scan scan = new Scan();
154       if(!fullScan) {
155         // Take sample columns of the recent time.
156         StringBuilder temp = new StringBuilder();
157         temp.append(endTime-300000L);
158         scan.setStartRow(temp.toString().getBytes());
159         temp.setLength(0);
160         temp.append(endTime);
161         scan.setStopRow(temp.toString().getBytes());
162       } else {
163         StringBuilder temp = new StringBuilder();
164         temp.append(startTime);
165         scan.setStartRow(temp.toString().getBytes());
166         temp.setLength(0);
167         temp.append(endTime);
168         scan.setStopRow(temp.toString().getBytes());
169       }
170       scan.addFamily(family.getBytes());
171       ResultScanner results = table.getScanner(scan);
172       Iterator<Result> it = results.iterator();
173       if(fullScan) {
174         while(it.hasNext()) {
175           getColumnNamesHelper(columnNames, it);
176         }        
177       } else {
178         getColumnNamesHelper(columnNames, it);        
179       }
180       results.close();
181       table.close();
182     } catch(Exception e) {
183       log.error(ExceptionUtil.getStackTrace(e));
184     }
185     return columnNames;
186   }
187   
188   public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
189     Set<String> rows = new HashSet<String>();
190     HTableInterface table = pool.getTable(tableName);
191     try {
192       Scan scan = new Scan();
193       scan.addColumn(family.getBytes(), qualifier.getBytes());
194       if(!fullScan) {
195         // Take sample columns of the recent time.
196         StringBuilder temp = new StringBuilder();
197         temp.append(endTime-300000L);
198         scan.setStartRow(temp.toString().getBytes());
199         temp.setLength(0);
200         temp.append(endTime);
201         scan.setStopRow(temp.toString().getBytes());
202       } else {
203         StringBuilder temp = new StringBuilder();
204         temp.append(startTime);
205         scan.setStartRow(temp.toString().getBytes());
206         temp.setLength(0);
207         temp.append(endTime);
208         scan.setStopRow(temp.toString().getBytes());
209       }
210       ResultScanner results = table.getScanner(scan);
211       Iterator<Result> it = results.iterator();
212       while(it.hasNext()) {
213         Result result = it.next();
214         String buffer = new String(result.getRow());
215         String[] parts = buffer.split("-", 2);
216         if(!rows.contains(parts[1])) {
217           rows.add(parts[1]);
218         }    
219       }
220       results.close();
221       table.close();
222     } catch(Exception e) {
223       log.error(ExceptionUtil.getStackTrace(e));
224     }
225     return rows;    
226   }
227   
228   public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
229     return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
230   }
231   
232   public static Set<String> getClusterNames(long startTime, long endTime) {
233     String tableName = "SystemMetrics";
234     String family = "system";
235     String column = "ctags";
236     Set<String> clusters = new HashSet<String>();
237     HTableInterface table = pool.getTable(tableName);
238     Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
239     try {
240       Scan scan = new Scan();
241       scan.addColumn(family.getBytes(), column.getBytes());
242       scan.setTimeRange(startTime, endTime);
243       ResultScanner results = table.getScanner(scan);
244       Iterator<Result> it = results.iterator();
245       while(it.hasNext()) {
246         Result result = it.next();
247         String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
248         Matcher m = p.matcher(buffer);
249         if(m.matches()) {
250           clusters.add(m.group(1));
251         }
252       }
253       results.close();
254       table.close();
255     } catch(Exception e) {
256       log.error(ExceptionUtil.getStackTrace(e));
257     }
258     return clusters;
259   }
260   
261   public static Heatmap getHeatmap(String tableName, String family, String column, 
262 		  long startTime, long endTime, double max, double scale, int height) {
263 	final long MINUTE = TimeUnit.MINUTES.toMillis(1);
264 	Heatmap heatmap = new Heatmap();
265     HTableInterface table = pool.getTable(tableName);
266     try {
267         Scan scan = new Scan();
268         ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes());
269         scan.addFamily(family.getBytes());
270         scan.setFilter(cpf);
271     	scan.setTimeRange(startTime, endTime);
272     	scan.setBatch(10000);
273         ResultScanner results = table.getScanner(scan);
274 	    Iterator<Result> it = results.iterator();
275 		int index = 0;
276 		// Series display in y axis
277 		int y = 0;
278 		HashMap<String, Integer> keyMap = new HashMap<String, Integer>();
279 	    while(it.hasNext()) {
280 		  Result result = it.next();
281 		  List<KeyValue> kvList = result.list();
282 	      for(KeyValue kv : kvList) {
283 			String key = parseRowKey(result.getRow());
284 			StringBuilder tmp = new StringBuilder();
285 			tmp.append(key);
286 			tmp.append(":");
287 			tmp.append(new String(kv.getQualifier()));
288 			String seriesName = tmp.toString();
289 			long time = parseTime(result.getRow());
290 			// Time display in x axis
291 			int x = (int) ((time - startTime) / MINUTE);
292 			if(keyMap.containsKey(seriesName)) {
293 			  y = keyMap.get(seriesName);
294 		    } else {
295 			  keyMap.put(seriesName, new Integer(index));
296 		      y = index;
297 		      index++;
298 			}
299 			double v = Double.parseDouble(new String(kv.getValue()));
300 			heatmap.put(x, y, v);
301 			if(v > max) {
302 				max = v;
303 			}
304 	      }
305 	    }
306 	    results.close();
307 	    table.close();
308 	    int radius = height / index;
309 	    // Usually scale max from 0 to 100 for visualization
310 	    heatmap.putMax(scale);
311 	    for(HeatMapPoint point : heatmap.getHeatmap()) {
312 	      double round = point.count / max * scale;
313 	      round = Math.round(round * 100.0) / 100.0;
314 	      point.put(point.x, point.y * radius, round);
315 	    }
316 	    heatmap.putRadius(radius);
317 	    heatmap.putSeries(index);
318 	} catch (IOException e) {
319 	    log.error(ExceptionUtil.getStackTrace(e));
320 	}
321 	return heatmap;
322   }
323   
324   private static String parseRowKey(byte[] row) {
325 	  String key = new String(row);
326 	  String[] parts = key.split("-", 2);
327 	  return parts[1];
328   }
329 
330   private static long parseTime(byte[] row) {
331 	  String key = new String(row);
332 	  String[] parts = key.split("-", 2);
333 	  long time = Long.parseLong(parts[0]);
334 	  return time;
335   }
336 
337 }