This project has retired. For details please refer to its Attic page.
DatasetMapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.hicc;
20  
21  
22  import java.util.TreeMap;
23  import java.util.HashMap;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.sql.*;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.chukwa.util.ExceptionUtil;
30  
31  public class DatasetMapper {
32    private String jdbc;
33    private static Log log = LogFactory.getLog(DatasetMapper.class);
34    private TreeMap<String, TreeMap<String, Double>> dataset;
35    private List<String> labels;
36  
37    public DatasetMapper(String jdbc) {
38      this.jdbc = jdbc;
39      this.dataset = new TreeMap<String, TreeMap<String, Double>>();
40      this.labels = new ArrayList<String>();
41    }
42  
43    public void execute(String query, boolean groupBySecondColumn,
44        boolean calculateSlope, String formatTime, List<Object> parameters) {
45      dataset.clear();
46      try {
47        // The newInstance() call is a work around for some
48        // broken Java implementations
49        org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
50      } catch (Exception ex) {
51        log.error("failed to load driver", ex);
52        // handle the error
53      }
54      Connection conn = null;
55      PreparedStatement stmt = null;
56      ResultSet rs = null;
57      labels.clear();
58      double max = 0.0;
59      long timeWindowSize=0;
60      long previousTime=0;
61      try {
62        conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
63        stmt = conn.prepareStatement(query);
64        if(query.indexOf("?")!=-1) {
65          for(int i=0;i<parameters.size();i++) {
66            int index = i+1;
67            stmt.setObject(index,parameters.get(i));
68          }
69        }
70        // rs = stmt.executeQuery(query);
71        if (stmt.execute()) {
72          rs = stmt.getResultSet();
73          ResultSetMetaData rmeta = rs.getMetaData();
74          int col = rmeta.getColumnCount();
75          double[] previousArray = new double[col + 1];
76          for (int k = 0; k < col; k++) {
77            previousArray[k] = 0.0;
78          }
79          int i = 0;
80          java.util.TreeMap<String, Double> data = null;
81          HashMap<String, Double> previousHash = new HashMap<String, Double>();
82          HashMap<String, Integer> xAxisMap = new HashMap<String, Integer>();
83          while (rs.next()) {
84            String label = "";
85            if (rmeta.getColumnType(1) == java.sql.Types.TIMESTAMP) {
86              long time = rs.getTimestamp(1).getTime();
87              if(timeWindowSize==0) {
88                timeWindowSize=1;
89                previousTime=time;
90              } else if(time!=previousTime) {
91                timeWindowSize=(time-previousTime)/60000;
92                previousTime=time;
93              }
94              label = "" + time;
95            } else {
96              label = rs.getString(1);
97            }
98            if (!xAxisMap.containsKey(label)) {
99              xAxisMap.put(label, i);
100             labels.add(label);
101             i++;
102           }
103           if (groupBySecondColumn) {
104             String item = rs.getString(2);
105             // Get the data from the row using the series column
106             for (int j = 3; j <= col; j++) {
107               item = rs.getString(2) + " " + rmeta.getColumnName(j);
108               data = dataset.get(item);
109               if (data == null) {
110                 data = new java.util.TreeMap<String, Double>();
111               }
112               if (calculateSlope) {
113                 double current = rs.getDouble(j);
114                 double tmp = 0L;
115                 if (data.size() > 1) {
116                   tmp = (current - previousHash.get(item).doubleValue())/timeWindowSize;
117                   if(timeWindowSize<=0) {
118                     tmp = Double.NaN;
119                   }
120                 } else {
121                   tmp = 0;
122                 }
123                 if (tmp < 0) {
124                   tmp = Double.NaN;
125                 }
126                 previousHash.put(item, current);
127                 if (tmp > max) {
128                   max = tmp;
129                 }
130                 data.put(label, tmp);
131               } else {
132                 double current = rs.getDouble(j);
133                 if (current > max) {
134                   max = current;
135                 }
136                 data.put(label, current);
137               }
138               dataset.put(item, data);
139             }
140           } else {
141             for (int j = 2; j <= col; j++) {
142               String item = rmeta.getColumnName(j);
143               // Get the data from the row using the column name
144               double current = rs.getDouble(j);
145               if (current > max) {
146                 max = current;
147               }
148               data = dataset.get(item);
149               if (data == null) {
150                 data = new java.util.TreeMap<String, Double>();
151               }
152               if (calculateSlope) {
153                 double tmp = current;
154                 if (data.size() > 1) {
155                   tmp = (tmp - previousArray[j])/timeWindowSize;
156                   if(timeWindowSize<=0) {
157                     tmp = Double.NaN;
158                   }
159                 } else {
160                   tmp = 0.0;
161                 }
162                 if (tmp < 0) {
163                   tmp = Double.NaN;
164                 }
165                 previousArray[j] = current;
166                 data.put(label, tmp);
167               } else {
168                 data.put(label, current);
169               }
170               dataset.put(item, data);
171             }
172           }
173         }
174       } else {
175         log.error("query is not executed.");
176       }
177       // Now do something with the ResultSet ....
178     } catch (SQLException ex) {
179       // handle any errors
180       log.error("SQLException: " + ex.getMessage() + " on query: " + query);
181       log.error("SQLState: " + ex.getSQLState());
182       log.error("VendorError: " + ex.getErrorCode());
183     } catch (Exception ex) {
184       log.debug(ExceptionUtil.getStackTrace(ex));
185     } finally {
186       // it is a good idea to release
187       // resources in a finally{} block
188       // in reverse-order of their creation
189       // if they are no-longer needed
190       if (rs != null) {
191         try {
192           rs.close();
193         } catch (SQLException sqlEx) {
194           log.debug(ExceptionUtil.getStackTrace(sqlEx));
195         }
196         rs = null;
197       }
198       if (stmt != null) {
199         try {
200           stmt.close();
201         } catch (SQLException sqlEx) {
202           log.debug(ExceptionUtil.getStackTrace(sqlEx));
203         }
204         stmt = null;
205       }
206       if (conn != null) {
207         try {
208           conn.close();
209         } catch (SQLException sqlEx) {
210           log.debug(ExceptionUtil.getStackTrace(sqlEx));
211         }
212         conn = null;
213       }
214     }
215   }
216 
217   public List<String> getXAxisMap() {
218     return labels;
219   }
220 
221   public TreeMap<String, TreeMap<String, Double>> getDataset() {
222     return dataset;
223   }
224 }