This project has retired. For details please refer to its Attic page.
DatabaseDS xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  // Log Event Prototype 
20  // From event_viewer.jsp
21  package org.apache.hadoop.chukwa.extraction.engine.datasource.database;
22  
23  
24  import java.sql.Connection;
25  import java.sql.ResultSet;
26  import java.sql.ResultSetMetaData;
27  import java.sql.SQLException;
28  import java.sql.Statement;
29  import java.text.SimpleDateFormat;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.TreeMap;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
34  import org.apache.hadoop.chukwa.extraction.engine.Record;
35  import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36  import org.apache.hadoop.chukwa.extraction.engine.Token;
37  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39  import org.apache.hadoop.chukwa.util.ExceptionUtil;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  //import org.apache.hadoop.chukwa.hicc.ClusterConfig;
44  
45  public class DatabaseDS implements DataSource {
46    private static final Log log = LogFactory.getLog(DatabaseDS.class);
47  
48    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
49        "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", 
50        justification = "Dynamic based upon tables in the database")
51    public SearchResult search(SearchResult result, String cluster,
52        String dataSource, long t0, long t1, String filter, Token token)
53        throws DataSourceException {
54      SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
55      String timeField = null;
56      TreeMap<Long, List<Record>> records = result.getRecords();
57  
58      if (cluster == null) {
59        cluster = "demo";
60      }
61  
62      if (dataSource.equalsIgnoreCase("MRJob")) {
63        timeField = "LAUNCH_TIME";
64      } else if (dataSource.equalsIgnoreCase("HodJob")) {
65        timeField = "StartTime";
66      } else {
67        timeField = "timestamp";
68      }
69      String startS = formatter.format(t0);
70      String endS = formatter.format(t1);
71      Statement stmt = null;
72      ResultSet rs = null;
73      try {
74        String dateclause = timeField + " >= '" + startS + "' and " + timeField
75            + " <= '" + endS + "'";
76  
77        // ClusterConfig cc = new ClusterConfig();
78        String jdbc = ""; // cc.getURL(cluster);
79  
80        Connection conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
81  
82        stmt = conn.createStatement();
83        String query = "";
84        query = "select * from " + dataSource + " where " + dateclause + ";";
85        rs = stmt.executeQuery(query);
86        if (stmt.execute(query)) {
87          rs = stmt.getResultSet();
88          ResultSetMetaData rmeta = rs.getMetaData();
89          int col = rmeta.getColumnCount();
90          while (rs.next()) {
91            ChukwaRecord event = new ChukwaRecord();
92            StringBuilder cell = new StringBuilder();;
93            long timestamp = 0;
94  
95            for (int i = 1; i < col; i++) {
96              String value = rs.getString(i);
97              if (value != null) {
98                cell.append(" ");
99                cell.append(rmeta.getColumnName(i));
100               cell.append(":");
101               cell.append(value);
102             }
103             if (rmeta.getColumnName(i).equals(timeField)) {
104               timestamp = rs.getLong(i);
105               event.setTime(timestamp);
106             }
107           }
108           boolean isValid = false;
109           if (filter == null || filter.equals("")) {
110             isValid = true;
111           } else if (cell.indexOf(filter) > 0) {
112             isValid = true;
113           }
114           if (!isValid) {
115             continue;
116           }
117 
118           event.add(Record.bodyField, cell.toString());
119           event.add(Record.sourceField, cluster + "." + dataSource);
120           if (records.containsKey(timestamp)) {
121             records.get(timestamp).add(event);
122           } else {
123             List<Record> list = new LinkedList<Record>();
124             list.add(event);
125             records.put(event.getTime(), list);
126           }
127         }
128       }
129     } catch (SQLException e) {
130       e.printStackTrace();
131       throw new DataSourceException(e);
132     } finally {
133       if (rs != null) {
134         try {
135           rs.close();
136         } catch (SQLException sqlEx) {
137           log.debug(ExceptionUtil.getStackTrace(sqlEx));
138         }
139         rs = null;
140       }
141       if (stmt != null) {
142         try {
143           stmt.close();
144         } catch (SQLException sqlEx) {
145           log.debug(ExceptionUtil.getStackTrace(sqlEx));
146         }
147         stmt = null;
148       }
149     }
150     return result;
151   }
152 
153   public boolean isThreadSafe() {
154     return true;
155   }
156 
157 }