This project has retired. For details please refer to its
Attic page.
DatasetMapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.hicc;
20
21
22 import java.util.TreeMap;
23 import java.util.HashMap;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.sql.*;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.chukwa.util.ExceptionUtil;
30
31 public class DatasetMapper {
32 private String jdbc;
33 private static Log log = LogFactory.getLog(DatasetMapper.class);
34 private TreeMap<String, TreeMap<String, Double>> dataset;
35 private List<String> labels;
36
37 public DatasetMapper(String jdbc) {
38 this.jdbc = jdbc;
39 this.dataset = new TreeMap<String, TreeMap<String, Double>>();
40 this.labels = new ArrayList<String>();
41 }
42
43 public void execute(String query, boolean groupBySecondColumn,
44 boolean calculateSlope, String formatTime, List<Object> parameters) {
45 dataset.clear();
46 try {
47
48
49 org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
50 } catch (Exception ex) {
51 log.error("failed to load driver", ex);
52
53 }
54 Connection conn = null;
55 PreparedStatement stmt = null;
56 ResultSet rs = null;
57 labels.clear();
58 double max = 0.0;
59 long timeWindowSize=0;
60 long previousTime=0;
61 try {
62 conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
63 stmt = conn.prepareStatement(query);
64 if(query.indexOf("?")!=-1) {
65 for(int i=0;i<parameters.size();i++) {
66 int index = i+1;
67 stmt.setObject(index,parameters.get(i));
68 }
69 }
70
71 if (stmt.execute()) {
72 rs = stmt.getResultSet();
73 ResultSetMetaData rmeta = rs.getMetaData();
74 int col = rmeta.getColumnCount();
75 double[] previousArray = new double[col + 1];
76 for (int k = 0; k < col; k++) {
77 previousArray[k] = 0.0;
78 }
79 int i = 0;
80 java.util.TreeMap<String, Double> data = null;
81 HashMap<String, Double> previousHash = new HashMap<String, Double>();
82 HashMap<String, Integer> xAxisMap = new HashMap<String, Integer>();
83 while (rs.next()) {
84 String label = "";
85 if (rmeta.getColumnType(1) == java.sql.Types.TIMESTAMP) {
86 long time = rs.getTimestamp(1).getTime();
87 if(timeWindowSize==0) {
88 timeWindowSize=1;
89 previousTime=time;
90 } else if(time!=previousTime) {
91 timeWindowSize=(time-previousTime)/60000;
92 previousTime=time;
93 }
94 label = "" + time;
95 } else {
96 label = rs.getString(1);
97 }
98 if (!xAxisMap.containsKey(label)) {
99 xAxisMap.put(label, i);
100 labels.add(label);
101 i++;
102 }
103 if (groupBySecondColumn) {
104 String item = rs.getString(2);
105
106 for (int j = 3; j <= col; j++) {
107 item = rs.getString(2) + " " + rmeta.getColumnName(j);
108 data = dataset.get(item);
109 if (data == null) {
110 data = new java.util.TreeMap<String, Double>();
111 }
112 if (calculateSlope) {
113 double current = rs.getDouble(j);
114 double tmp = 0L;
115 if (data.size() > 1) {
116 tmp = (current - previousHash.get(item).doubleValue())/timeWindowSize;
117 if(timeWindowSize<=0) {
118 tmp = Double.NaN;
119 }
120 } else {
121 tmp = 0;
122 }
123 if (tmp < 0) {
124 tmp = Double.NaN;
125 }
126 previousHash.put(item, current);
127 if (tmp > max) {
128 max = tmp;
129 }
130 data.put(label, tmp);
131 } else {
132 double current = rs.getDouble(j);
133 if (current > max) {
134 max = current;
135 }
136 data.put(label, current);
137 }
138 dataset.put(item, data);
139 }
140 } else {
141 for (int j = 2; j <= col; j++) {
142 String item = rmeta.getColumnName(j);
143
144 double current = rs.getDouble(j);
145 if (current > max) {
146 max = current;
147 }
148 data = dataset.get(item);
149 if (data == null) {
150 data = new java.util.TreeMap<String, Double>();
151 }
152 if (calculateSlope) {
153 double tmp = current;
154 if (data.size() > 1) {
155 tmp = (tmp - previousArray[j])/timeWindowSize;
156 if(timeWindowSize<=0) {
157 tmp = Double.NaN;
158 }
159 } else {
160 tmp = 0.0;
161 }
162 if (tmp < 0) {
163 tmp = Double.NaN;
164 }
165 previousArray[j] = current;
166 data.put(label, tmp);
167 } else {
168 data.put(label, current);
169 }
170 dataset.put(item, data);
171 }
172 }
173 }
174 } else {
175 log.error("query is not executed.");
176 }
177
178 } catch (SQLException ex) {
179
180 log.error("SQLException: " + ex.getMessage() + " on query: " + query);
181 log.error("SQLState: " + ex.getSQLState());
182 log.error("VendorError: " + ex.getErrorCode());
183 } catch (Exception ex) {
184 log.debug(ExceptionUtil.getStackTrace(ex));
185 } finally {
186
187
188
189
190 if (rs != null) {
191 try {
192 rs.close();
193 } catch (SQLException sqlEx) {
194 log.debug(ExceptionUtil.getStackTrace(sqlEx));
195 }
196 rs = null;
197 }
198 if (stmt != null) {
199 try {
200 stmt.close();
201 } catch (SQLException sqlEx) {
202 log.debug(ExceptionUtil.getStackTrace(sqlEx));
203 }
204 stmt = null;
205 }
206 if (conn != null) {
207 try {
208 conn.close();
209 } catch (SQLException sqlEx) {
210 log.debug(ExceptionUtil.getStackTrace(sqlEx));
211 }
212 conn = null;
213 }
214 }
215 }
216
217 public List<String> getXAxisMap() {
218 return labels;
219 }
220
221 public TreeMap<String, TreeMap<String, Double>> getDataset() {
222 return dataset;
223 }
224 }