This project has retired. For details please refer to its
Attic page.
MetricsAggregation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.database;
19
20
21 import java.sql.Connection;
22 import java.sql.DatabaseMetaData;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.sql.Statement;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.List;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 public class MetricsAggregation {
34 private static Log log = LogFactory.getLog(MetricsAggregation.class);
35 private static Connection conn = null;
36 private static Statement stmt = null;
37 private static ResultSet rs = null;
38 private static DatabaseConfig mdlConfig;
39
40
41
42
43
44 public static void main(String[] args) throws SQLException {
45 mdlConfig = new DatabaseConfig();
46
47
48 String jdbc_url = System.getenv("JDBC_URL_PREFIX")
49 + mdlConfig.get("jdbc.host") + "/" + mdlConfig.get("jdbc.db");
50 if (mdlConfig.get("jdbc.user") != null) {
51 jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
52 if (mdlConfig.get("jdbc.password") != null) {
53 jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
54 }
55 }
56 try {
57
58
59 org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
60 log.info("Initialized JDBC URL: " + jdbc_url);
61 } catch (Exception ex) {
62
63 ex.printStackTrace();
64 log.error(ex, ex);
65 }
66 try {
67 conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc_url);
68 } catch (SQLException ex) {
69 ex.printStackTrace();
70 log.error(ex, ex);
71 }
72
73
74
75
76 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
77
78 long start = System.currentTimeMillis() - (1000 * 60 * 60 * 24);
79 long end = System.currentTimeMillis() - (1000 * 60 * 10);
80
81 DatabaseConfig dbConf = new DatabaseConfig();
82 String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week",
83 start, end);
84 for (String table : tables) {
85 System.out.println("Table to aggregate per Ts: " + table);
86 stmt = conn.createStatement();
87 rs = stmt
88 .executeQuery("select table_ts from aggregation_admin_table where table_name=\""
89 + table + "\"");
90 if (rs.next()) {
91 start = rs.getLong(1);
92 } else {
93 start = 0;
94 }
95
96 end = start + (1000 * 60 * 60 * 1);
97 long now = System.currentTimeMillis();
98 now = now - (1000 * 60 * 10);
99 end = Math.min(now, end);
100
101
102 end = now;
103
104 System.out.println("Start Date:" + new Date(start));
105 System.out.println("End Date:" + new Date(end));
106
107 DatabaseMetaData dbm = conn.getMetaData();
108 rs = dbm.getColumns(null, null, table, null);
109
110 List<String> cols = new ArrayList<String>();
111 while (rs.next()) {
112 String s = rs.getString(4);
113 System.out.println("Name: " + s);
114 int type = rs.getInt(5);
115 if (type == java.sql.Types.VARCHAR) {
116 System.out.println("Type: Varchar " + type);
117 } else {
118 cols.add(s);
119 System.out.println("Type: Number " + type);
120 }
121 }
122
123
124 String initTable = table.replace("cluster_", "");
125 StringBuilder sb0 = new StringBuilder();
126 StringBuilder sb = new StringBuilder();
127 sb0.append("insert into ").append(table).append(" (");
128 sb.append(" ( select ");
129 for (int i = 0; i < cols.size(); i++) {
130 sb0.append(cols.get(i));
131 sb.append("avg(").append(cols.get(i)).append(") ");
132 if (i < cols.size() - 1) {
133 sb0.append(",");
134 sb.append(",");
135 }
136 }
137 sb.append(" from ").append(initTable);
138 sb.append(" where timestamp between \"");
139 sb.append(formatter.format(start));
140 sb.append("\" and \"").append(formatter.format(end));
141 sb.append("\" group by timestamp )");
142
143
144 sb0.append(" )").append(sb);
145 System.out.println(sb0.toString());
146
147
148 conn.setAutoCommit(false);
149 stmt = conn.createStatement();
150 stmt.execute(sb0.toString());
151
152
153 stmt = conn.createStatement();
154 stmt.execute("insert into aggregation_admin_table set table_ts=\""
155 + formatter.format(end) + "\" where table_name=\"" + table + "\"");
156 conn.commit();
157 }
158
159 }
160
161 }