This project has retired. For details please refer to its Attic page.
DatabaseDS xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  // Log Event Prototype 
20  // From event_viewer.jsp
21  package org.apache.hadoop.chukwa.extraction.engine.datasource.database;
22  
23  
24  import java.sql.Connection;
25  import java.sql.ResultSet;
26  import java.sql.ResultSetMetaData;
27  import java.sql.SQLException;
28  import java.sql.Statement;
29  import java.text.SimpleDateFormat;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.TreeMap;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
34  import org.apache.hadoop.chukwa.extraction.engine.Record;
35  import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36  import org.apache.hadoop.chukwa.extraction.engine.Token;
37  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39  import org.apache.hadoop.chukwa.util.ExceptionUtil;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  //import org.apache.hadoop.chukwa.hicc.ClusterConfig;
44  
45  public class DatabaseDS implements DataSource {
46    private static final Log log = LogFactory.getLog(DatabaseDS.class);
47  
48    public SearchResult search(SearchResult result, String cluster,
49        String dataSource, long t0, long t1, String filter, Token token)
50        throws DataSourceException {
51      SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
52      String timeField = null;
53      TreeMap<Long, List<Record>> records = result.getRecords();
54  
55      if (cluster == null) {
56        cluster = "demo";
57      }
58  
59      if (dataSource.equalsIgnoreCase("MRJob")) {
60        timeField = "LAUNCH_TIME";
61      } else if (dataSource.equalsIgnoreCase("HodJob")) {
62        timeField = "StartTime";
63      } else if (dataSource.equalsIgnoreCase("QueueInfo")) {
64        timeField = "timestamp";
65      } else {
66        timeField = "timestamp";
67      }
68      String startS = formatter.format(t0);
69      String endS = formatter.format(t1);
70      Statement stmt = null;
71      ResultSet rs = null;
72      try {
73        String dateclause = timeField + " >= '" + startS + "' and " + timeField
74            + " <= '" + endS + "'";
75  
76        // ClusterConfig cc = new ClusterConfig();
77        String jdbc = ""; // cc.getURL(cluster);
78  
79        Connection conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc);
80  
81        stmt = conn.createStatement();
82        String query = "";
83        query = "select * from " + dataSource + " where " + dateclause + ";";
84        rs = stmt.executeQuery(query);
85        if (stmt.execute(query)) {
86          rs = stmt.getResultSet();
87          ResultSetMetaData rmeta = rs.getMetaData();
88          int col = rmeta.getColumnCount();
89          while (rs.next()) {
90            ChukwaRecord event = new ChukwaRecord();
91            String cell = "";
92            long timestamp = 0;
93  
94            for (int i = 1; i < col; i++) {
95              String value = rs.getString(i);
96              if (value != null) {
97                cell = cell + " " + rmeta.getColumnName(i) + ":" + value;
98              }
99              if (rmeta.getColumnName(i).equals(timeField)) {
100               timestamp = rs.getLong(i);
101               event.setTime(timestamp);
102             }
103           }
104           boolean isValid = false;
105           if (filter == null || filter.equals("")) {
106             isValid = true;
107           } else if (cell.indexOf(filter) > 0) {
108             isValid = true;
109           }
110           if (!isValid) {
111             continue;
112           }
113 
114           event.add(Record.bodyField, cell);
115           event.add(Record.sourceField, cluster + "." + dataSource);
116           if (records.containsKey(timestamp)) {
117             records.get(timestamp).add(event);
118           } else {
119             List<Record> list = new LinkedList<Record>();
120             list.add(event);
121             records.put(event.getTime(), list);
122           }
123         }
124       }
125     } catch (SQLException e) {
126       e.printStackTrace();
127       throw new DataSourceException(e);
128     } finally {
129       if (rs != null) {
130         try {
131           rs.close();
132         } catch (SQLException sqlEx) {
133           log.debug(ExceptionUtil.getStackTrace(sqlEx));
134         }
135         rs = null;
136       }
137       if (stmt != null) {
138         try {
139           stmt.close();
140         } catch (SQLException sqlEx) {
141           log.debug(ExceptionUtil.getStackTrace(sqlEx));
142         }
143         stmt = null;
144       }
145     }
146     return result;
147   }
148 
149   public boolean isThreadSafe() {
150     return true;
151   }
152 
153 }