This project has retired. For details please refer to its
Attic page.
DatabaseDS xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.chukwa.extraction.engine.datasource.database;
22
23
24 import java.sql.Connection;
25 import java.sql.ResultSet;
26 import java.sql.ResultSetMetaData;
27 import java.sql.SQLException;
28 import java.sql.Statement;
29 import java.text.SimpleDateFormat;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.TreeMap;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
34 import org.apache.hadoop.chukwa.extraction.engine.Record;
35 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36 import org.apache.hadoop.chukwa.extraction.engine.Token;
37 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39 import org.apache.hadoop.chukwa.util.ExceptionUtil;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43
44
45 public class DatabaseDS implements DataSource {
46 private static final Log log = LogFactory.getLog(DatabaseDS.class);
47
48 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
49 "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE",
50 justification = "Dynamic based upon tables in the database")
51 public SearchResult search(SearchResult result, String cluster,
52 String dataSource, long t0, long t1, String filter, Token token)
53 throws DataSourceException {
54 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
55 String timeField = null;
56 TreeMap<Long, List<Record>> records = result.getRecords();
57
58 if (cluster == null) {
59 cluster = "demo";
60 }
61
62 if (dataSource.equalsIgnoreCase("MRJob")) {
63 timeField = "LAUNCH_TIME";
64 } else if (dataSource.equalsIgnoreCase("HodJob")) {
65 timeField = "StartTime";
66 } else {
67 timeField = "timestamp";
68 }
69 String startS = formatter.format(t0);
70 String endS = formatter.format(t1);
71 Statement stmt = null;
72 ResultSet rs = null;
73 try {
74 String dateclause = timeField + " >= '" + startS + "' and " + timeField
75 + " <= '" + endS + "'";
76
77
78 String jdbc = "";
79
80 Connection conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
81
82 stmt = conn.createStatement();
83 String query = "";
84 query = "select * from " + dataSource + " where " + dateclause + ";";
85 rs = stmt.executeQuery(query);
86 if (stmt.execute(query)) {
87 rs = stmt.getResultSet();
88 ResultSetMetaData rmeta = rs.getMetaData();
89 int col = rmeta.getColumnCount();
90 while (rs.next()) {
91 ChukwaRecord event = new ChukwaRecord();
92 StringBuilder cell = new StringBuilder();;
93 long timestamp = 0;
94
95 for (int i = 1; i < col; i++) {
96 String value = rs.getString(i);
97 if (value != null) {
98 cell.append(" ");
99 cell.append(rmeta.getColumnName(i));
100 cell.append(":");
101 cell.append(value);
102 }
103 if (rmeta.getColumnName(i).equals(timeField)) {
104 timestamp = rs.getLong(i);
105 event.setTime(timestamp);
106 }
107 }
108 boolean isValid = false;
109 if (filter == null || filter.equals("")) {
110 isValid = true;
111 } else if (cell.indexOf(filter) > 0) {
112 isValid = true;
113 }
114 if (!isValid) {
115 continue;
116 }
117
118 event.add(Record.bodyField, cell.toString());
119 event.add(Record.sourceField, cluster + "." + dataSource);
120 if (records.containsKey(timestamp)) {
121 records.get(timestamp).add(event);
122 } else {
123 List<Record> list = new LinkedList<Record>();
124 list.add(event);
125 records.put(event.getTime(), list);
126 }
127 }
128 }
129 } catch (SQLException e) {
130 e.printStackTrace();
131 throw new DataSourceException(e);
132 } finally {
133 if (rs != null) {
134 try {
135 rs.close();
136 } catch (SQLException sqlEx) {
137 log.debug(ExceptionUtil.getStackTrace(sqlEx));
138 }
139 rs = null;
140 }
141 if (stmt != null) {
142 try {
143 stmt.close();
144 } catch (SQLException sqlEx) {
145 log.debug(ExceptionUtil.getStackTrace(sqlEx));
146 }
147 stmt = null;
148 }
149 }
150 return result;
151 }
152
153 public boolean isThreadSafe() {
154 return true;
155 }
156
157 }