This project has retired. For details please refer to its
Attic page.
DatabaseDS xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.chukwa.extraction.engine.datasource.database;
22
23
24 import java.sql.Connection;
25 import java.sql.ResultSet;
26 import java.sql.ResultSetMetaData;
27 import java.sql.SQLException;
28 import java.sql.Statement;
29 import java.text.SimpleDateFormat;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.TreeMap;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
34 import org.apache.hadoop.chukwa.extraction.engine.Record;
35 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36 import org.apache.hadoop.chukwa.extraction.engine.Token;
37 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39 import org.apache.hadoop.chukwa.util.ExceptionUtil;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43
44
45 public class DatabaseDS implements DataSource {
46 private static final Log log = LogFactory.getLog(DatabaseDS.class);
47
48 public SearchResult search(SearchResult result, String cluster,
49 String dataSource, long t0, long t1, String filter, Token token)
50 throws DataSourceException {
51 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
52 String timeField = null;
53 TreeMap<Long, List<Record>> records = result.getRecords();
54
55 if (cluster == null) {
56 cluster = "demo";
57 }
58
59 if (dataSource.equalsIgnoreCase("MRJob")) {
60 timeField = "LAUNCH_TIME";
61 } else if (dataSource.equalsIgnoreCase("HodJob")) {
62 timeField = "StartTime";
63 } else if (dataSource.equalsIgnoreCase("QueueInfo")) {
64 timeField = "timestamp";
65 } else {
66 timeField = "timestamp";
67 }
68 String startS = formatter.format(t0);
69 String endS = formatter.format(t1);
70 Statement stmt = null;
71 ResultSet rs = null;
72 try {
73 String dateclause = timeField + " >= '" + startS + "' and " + timeField
74 + " <= '" + endS + "'";
75
76
77 String jdbc = "";
78
79 Connection conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
80
81 stmt = conn.createStatement();
82 String query = "";
83 query = "select * from " + dataSource + " where " + dateclause + ";";
84 rs = stmt.executeQuery(query);
85 if (stmt.execute(query)) {
86 rs = stmt.getResultSet();
87 ResultSetMetaData rmeta = rs.getMetaData();
88 int col = rmeta.getColumnCount();
89 while (rs.next()) {
90 ChukwaRecord event = new ChukwaRecord();
91 String cell = "";
92 long timestamp = 0;
93
94 for (int i = 1; i < col; i++) {
95 String value = rs.getString(i);
96 if (value != null) {
97 cell = cell + " " + rmeta.getColumnName(i) + ":" + value;
98 }
99 if (rmeta.getColumnName(i).equals(timeField)) {
100 timestamp = rs.getLong(i);
101 event.setTime(timestamp);
102 }
103 }
104 boolean isValid = false;
105 if (filter == null || filter.equals("")) {
106 isValid = true;
107 } else if (cell.indexOf(filter) > 0) {
108 isValid = true;
109 }
110 if (!isValid) {
111 continue;
112 }
113
114 event.add(Record.bodyField, cell);
115 event.add(Record.sourceField, cluster + "." + dataSource);
116 if (records.containsKey(timestamp)) {
117 records.get(timestamp).add(event);
118 } else {
119 List<Record> list = new LinkedList<Record>();
120 list.add(event);
121 records.put(event.getTime(), list);
122 }
123 }
124 }
125 } catch (SQLException e) {
126 e.printStackTrace();
127 throw new DataSourceException(e);
128 } finally {
129 if (rs != null) {
130 try {
131 rs.close();
132 } catch (SQLException sqlEx) {
133 log.debug(ExceptionUtil.getStackTrace(sqlEx));
134 }
135 rs = null;
136 }
137 if (stmt != null) {
138 try {
139 stmt.close();
140 } catch (SQLException sqlEx) {
141 log.debug(ExceptionUtil.getStackTrace(sqlEx));
142 }
143 stmt = null;
144 }
145 }
146 return result;
147 }
148
149 public boolean isThreadSafe() {
150 return true;
151 }
152
153 }