This project has retired. For details please refer to its Attic page.
MetricsAggregation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.database;
19  
20  
21  import java.sql.Connection;
22  import java.sql.DatabaseMetaData;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Statement;
26  import java.text.SimpleDateFormat;
27  import java.util.ArrayList;
28  import java.util.Date;
29  import java.util.List;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  
33  public class MetricsAggregation {
34    private static Log log = LogFactory.getLog(MetricsAggregation.class);
35    private static Connection conn = null;
36    private static Statement stmt = null;
37    private static ResultSet rs = null;
38    private static DatabaseConfig mdlConfig;
39  
40    /**
41     * @param args
42     * @throws SQLException
43     */
44    public static void main(String[] args) throws SQLException {
45      mdlConfig = new DatabaseConfig();
46  
47      // Connect to the database
48      String jdbc_url = System.getenv("JDBC_URL_PREFIX")
49          + mdlConfig.get("jdbc.host") + "/" + mdlConfig.get("jdbc.db");
50      if (mdlConfig.get("jdbc.user") != null) {
51        jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
52        if (mdlConfig.get("jdbc.password") != null) {
53          jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
54        }
55      }
56      try {
57        // The newInstance() call is a work around for some
58        // broken Java implementations
59        org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
60        log.info("Initialized JDBC URL: " + jdbc_url);
61      } catch (Exception ex) {
62        // handle the error
63        ex.printStackTrace();
64        log.error(ex, ex);
65      }
66      try {
67        conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc_url);
68      } catch (SQLException ex) {
69        ex.printStackTrace();
70        log.error(ex, ex);
71      }
72  
73      // get the latest timestamp for aggregation on this table
74      // Start = latest
75  
76      SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
77  
78      long start = System.currentTimeMillis() - (1000 * 60 * 60 * 24);
79      long end = System.currentTimeMillis() - (1000 * 60 * 10);
80      // retrieve metadata for cluster_system_metrics
81      DatabaseConfig dbConf = new DatabaseConfig();
82      String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week",
83          start, end);
84      for (String table : tables) {
85        System.out.println("Table to aggregate per Ts: " + table);
86        stmt = conn.createStatement();
87        rs = stmt
88            .executeQuery("select table_ts from aggregation_admin_table where table_name=\""
89                + table + "\"");
90        if (rs.next()) {
91          start = rs.getLong(1);
92        } else {
93          start = 0;
94        }
95  
96        end = start + (1000 * 60 * 60 * 1); // do 1 hour aggregation max
97        long now = System.currentTimeMillis();
98        now = now - (1000 * 60 * 10); // wait for 10 minutes
99        end = Math.min(now, end);
100 
101       // TODO REMOVE DEBUG ONLY!
102       end = now;
103 
104       System.out.println("Start Date:" + new Date(start));
105       System.out.println("End Date:" + new Date(end));
106 
107       DatabaseMetaData dbm = conn.getMetaData();
108       rs = dbm.getColumns(null, null, table, null);
109 
110       List<String> cols = new ArrayList<String>();
111       while (rs.next()) {
112         String s = rs.getString(4); // 4 is column name, 5 data type etc.
113         System.out.println("Name: " + s);
114         int type = rs.getInt(5);
115         if (type == java.sql.Types.VARCHAR) {
116           System.out.println("Type: Varchar " + type);
117         } else {
118           cols.add(s);
119           System.out.println("Type: Number " + type);
120         }
121       }// end of while.
122 
123       // build insert into from select query
124       String initTable = table.replace("cluster_", "");
125       StringBuilder sb0 = new StringBuilder();
126       StringBuilder sb = new StringBuilder();
127       sb0.append("insert into ").append(table).append(" (");
128       sb.append(" ( select ");
129       for (int i = 0; i < cols.size(); i++) {
130         sb0.append(cols.get(i));
131         sb.append("avg(").append(cols.get(i)).append(") ");
132         if (i < cols.size() - 1) {
133           sb0.append(",");
134           sb.append(",");
135         }
136       }
137       sb.append(" from ").append(initTable);
138       sb.append(" where timestamp between \"");
139       sb.append(formatter.format(start));
140       sb.append("\" and \"").append(formatter.format(end));
141       sb.append("\" group by timestamp  )");
142 
143       // close fields
144       sb0.append(" )").append(sb);
145       System.out.println(sb0.toString());
146 
147       // run query
148       conn.setAutoCommit(false);
149       stmt = conn.createStatement();
150       stmt.execute(sb0.toString());
151 
152       // update last run
153       stmt = conn.createStatement();
154       stmt.execute("insert into aggregation_admin_table set table_ts=\""
155           + formatter.format(end) + "\" where table_name=\"" + table + "\"");
156       conn.commit();
157     }
158 
159   }
160 
161 }