This project has retired. For details please refer to its Attic page.
MetricDataLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.dataloader;
20  
21  import java.io.IOException;
22  import java.sql.Connection;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Statement;
26  import java.text.SimpleDateFormat;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.Map.Entry;
31  import java.util.regex.Matcher;
32  import java.util.regex.Pattern;
33  import java.util.concurrent.Callable;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
38  import org.apache.hadoop.chukwa.database.DatabaseConfig;
39  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
40  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
41  import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
42  import org.apache.hadoop.chukwa.util.ClusterConfig;
43  import org.apache.hadoop.chukwa.util.DatabaseWriter;
44  import org.apache.hadoop.chukwa.util.ExceptionUtil;
45  import org.apache.hadoop.chukwa.util.RegexUtil;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.io.SequenceFile;
49  
50  public class MetricDataLoader implements Callable {
51    private static Log log = LogFactory.getLog(MetricDataLoader.class);
52  
53    private Statement stmt = null;
54    private ResultSet rs = null;
55    private DatabaseConfig mdlConfig = null;
56    private HashMap<String, String> normalize = null;
57    private HashMap<String, String> transformer = null;
58    private HashMap<String, Float> conversion = null;
59    private HashMap<String, String> dbTables = null;
60    private HashMap<String, HashMap<String, Integer>> dbSchema = null;
61    private String newSpace = "-";
62    private boolean batchMode = true;
63    private Connection conn = null;
64    private Path source = null;
65  
66    private ChukwaConfiguration conf = null;
67    private FileSystem fs = null;
68    private String jdbc_url = "";
69  
70    public MetricDataLoader(String fileName) throws IOException {
71      conf = new ChukwaConfiguration();
72      fs = FileSystem.get(conf);
73    }
74  
75    /** Creates a new instance of DBWriter 
76     * @param conf Chukwa Configuration
77     * @param fs Hadoop File System
78     * @param fileName Chukwa Sequence file */
79    public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
80      source = new Path(fileName);
81      this.conf = conf;
82      this.fs = fs;
83    }
84  
85    private void initEnv(String cluster) throws Exception {
86      mdlConfig = new DatabaseConfig();
87      transformer = mdlConfig.startWith("metric.");
88      conversion = new HashMap<String, Float>();
89      normalize = mdlConfig.startWith("normalize.");
90      dbTables = mdlConfig.startWith("report.db.name.");
91      Iterator<?> entries = mdlConfig.iterator();
92      while (entries.hasNext()) {
93        String entry = entries.next().toString();
94        if (entry.startsWith("conversion.")) {
95          String[] metrics = entry.split("=");
96          try {
97            float convertNumber = Float.parseFloat(metrics[1]);
98            conversion.put(metrics[0], convertNumber);
99          } catch (NumberFormatException ex) {
100           log.error(metrics[0] + " is not a number.");
101         }
102       }
103     }
104     log.debug("cluster name:" + cluster);
105     if (!cluster.equals("")) {
106       ClusterConfig cc = new ClusterConfig();
107       jdbc_url = cc.getURL(cluster);
108     }
109     try {
110       DatabaseWriter dbWriter = new DatabaseWriter(cluster);
111       conn = dbWriter.getConnection();
112     } catch(Exception ex) {
113       throw new Exception("JDBC URL does not exist for:"+jdbc_url);
114     }
115     log.debug("Initialized JDBC URL: " + jdbc_url);
116     HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
117     Iterator<String> ki = dbNames.keySet().iterator();
118     dbSchema = new HashMap<String, HashMap<String, Integer>>();
119     while (ki.hasNext()) {
120       String recordType = ki.next().toString();
121       String table = dbNames.get(recordType);
122       try {
123         ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
124         HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
125         while(rs.next()) {
126           String name = rs.getString("COLUMN_NAME");
127           int type = rs.getInt("DATA_TYPE");
128           tableSchema.put(name, type);
129           StringBuilder metricName = new StringBuilder();
130           metricName.append("metric.");
131           metricName.append(recordType.substring(15));
132           metricName.append(".");
133           metricName.append(name);
134           String mdlKey = metricName.toString().toLowerCase();
135           if(!transformer.containsKey(mdlKey)) {
136             transformer.put(mdlKey, name);
137           }          
138         }
139         rs.close();
140         dbSchema.put(table, tableSchema);
141       } catch (SQLException ex) {
142         log.debug("table: " + table
143           + " template does not exist, MDL will not load data for this table.");
144       }
145     }
146     stmt = conn.createStatement();
147     conn.setAutoCommit(false);
148   }
149 
150   public void interrupt() {
151   }
152 
153   private String escape(String s, String c) {
154 
155     String ns = s.trim();
156     Pattern pattern = Pattern.compile(" +");
157     Matcher matcher = pattern.matcher(ns);
158     String s2 = matcher.replaceAll(c);
159 
160     return s2;
161 
162   }
163 
164   public static String escapeQuotes( String s ) {
165     StringBuffer sb = new StringBuffer(); 
166     int index; 
167     int length = s.length(); 
168     char ch;
169     for( index = 0; index < length; ++index ) {
170       if(( ch = s.charAt( index )) == '\"' ) {
171         sb.append( "\\\"" ); 
172       } else if( ch == '\\' ) {
173         sb.append( "\\\\" ); 
174       } else if( ch == '\'' ) {
175         sb.append( "\\'" );
176       } else {
177         sb.append( ch );
178       }
179     }
180     return( sb.toString()); 
181   }
182   
183   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
184       "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", 
185       justification = "Dynamic based upon tables in the database")
186   public boolean run() throws IOException {
187     boolean first=true;
188     log.info("StreamName: " + source.getName());
189     SequenceFile.Reader reader = null;
190 
191     try {
192       // The newInstance() call is a work around for some
193       // broken Java implementations
194       reader = new SequenceFile.Reader(fs, source, conf);
195     } catch (Exception ex) {
196       // handle the error
197       log.error(ex, ex);
198     }
199     long currentTimeMillis = System.currentTimeMillis();
200     boolean isSuccessful = true;
201     String recordType = null;
202 
203     ChukwaRecordKey key = new ChukwaRecordKey();
204     ChukwaRecord record = new ChukwaRecord();
205     String cluster = null;
206     int numOfRecords = 0;
207     try {
208       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
209       int batch = 0;
210       while (reader !=null && reader.next(key, record)) {
211     	numOfRecords++;
212         if(first) { 
213           try {
214             cluster = RecordUtil.getClusterName(record);
215             initEnv(cluster);
216             first=false;
217           } catch(Exception ex) {
218             log.error("Initialization failed for: "+cluster+".  Please check jdbc configuration.");
219             return false;
220           }
221         }
222         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
223         log.debug("Timestamp: " + record.getTime());
224         log.debug("DataType: " + key.getReduceType());
225 
226         String[] fields = record.getFields();
227         String table = null;
228         String[] priKeys = null;
229         HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
230         StringBuilder normKey = new StringBuilder();
231         String node = record.getValue("csource");
232         recordType = key.getReduceType().toLowerCase();
233         String dbKey = "report.db.name." + recordType;
234         Matcher m = p.matcher(recordType);
235         if (dbTables.containsKey(dbKey)) {
236           String tableName = mdlConfig.get(dbKey);
237           if (!RegexUtil.isRegex(tableName)) {
238             log.error("Error parsing 'tableName' as a regex: "
239                 + RegexUtil.regexError(tableName));
240             return false;
241           }
242           String[] tmp = mdlConfig.findTableName(tableName, record
243               .getTime(), record.getTime());
244           table = tmp[0];
245         } else if(m.matches()) {
246           String timePartition = "_week";
247           int timeSize = Integer.parseInt(m.group(2));
248           if(timeSize == 5) {
249             timePartition = "_month";
250           } else if(timeSize == 30) {
251             timePartition = "_quarter";
252           } else if(timeSize == 180) {
253             timePartition = "_year";
254           } else if(timeSize == 720) {
255             timePartition = "_decade";
256           }
257           int partition = (int) (record.getTime() / timeSize);
258           StringBuilder tmpDbKey = new StringBuilder();
259           tmpDbKey.append("report.db.name.");
260           tmpDbKey.append(m.group(1));
261           if(dbTables.containsKey(tmpDbKey.toString())) {
262             StringBuilder tmpTable = new StringBuilder();
263             tmpTable.append(dbTables.get(tmpDbKey.toString()));
264             tmpTable.append("_");
265             tmpTable.append(partition);
266             tmpTable.append("_");
267             tmpTable.append(timePartition);
268             table = tmpTable.toString();
269           } else {
270             log.debug(tmpDbKey.toString() + " does not exist.");
271             continue;            
272           }
273         } else {
274           log.debug(dbKey + " does not exist.");
275           continue;
276         }
277         log.debug("table name:" + table);
278         try {
279           priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
280               ",");
281         } catch (Exception nullException) {
282           log.debug(ExceptionUtil.getStackTrace(nullException));
283         }
284         for (String field : fields) {
285           String keyName = escape(field.toLowerCase(), newSpace);
286           String keyValue = escape(record.getValue(field).toLowerCase(),
287               newSpace);
288           StringBuilder buildKey = new StringBuilder();
289           buildKey.append("normalize.");
290           buildKey.append(recordType);
291           buildKey.append(".");
292           buildKey.append(keyName);
293           if (normalize.containsKey(buildKey.toString())) {
294             if (normKey.toString().equals("")) {
295               normKey.append(keyName);
296               normKey.append(".");
297               normKey.append(keyValue);
298             } else {
299               normKey.append(".");
300               normKey.append(keyName);
301               normKey.append(".");
302               normKey.append(keyValue);
303             }
304           }
305           StringBuilder normalizedKey = new StringBuilder();
306           normalizedKey.append("metric.");
307           normalizedKey.append(recordType);
308           normalizedKey.append(".");
309           normalizedKey.append(normKey);
310           if (hashReport.containsKey(node)) {
311             HashMap<String, String> tmpHash = hashReport.get(node);
312             tmpHash.put(normalizedKey.toString(), keyValue);
313             hashReport.put(node, tmpHash);
314           } else {
315             HashMap<String, String> tmpHash = new HashMap<String, String>();
316             tmpHash.put(normalizedKey.toString(), keyValue);
317             hashReport.put(node, tmpHash);
318           }
319         }
320         for (String field : fields) {
321           String valueName = escape(field.toLowerCase(), newSpace);
322           String valueValue = escape(record.getValue(field).toLowerCase(),
323               newSpace);
324           StringBuilder buildKey = new StringBuilder();
325           buildKey.append("metric.");
326           buildKey.append(recordType);
327           buildKey.append(".");
328           buildKey.append(valueName);
329           if (!normKey.toString().equals("")) {
330             buildKey = new StringBuilder();
331             buildKey.append("metric.");
332             buildKey.append(recordType);
333             buildKey.append(".");
334             buildKey.append(normKey);
335             buildKey.append(".");
336             buildKey.append(valueName);
337           }
338           String normalizedKey = buildKey.toString();
339           if (hashReport.containsKey(node)) {
340             HashMap<String, String> tmpHash = hashReport.get(node);
341             tmpHash.put(normalizedKey, valueValue);
342             hashReport.put(node, tmpHash);
343           } else {
344             HashMap<String, String> tmpHash = new HashMap<String, String>();
345             tmpHash.put(normalizedKey, valueValue);
346             hashReport.put(node, tmpHash);
347 
348           }
349 
350         }
351         for(Entry<String, HashMap<String, String>> entry : hashReport.entrySet()) {
352           HashMap<String, String> recordSet = entry.getValue();
353        // Map any primary key that was not included in the report keyName
354           StringBuilder sqlPriKeys = new StringBuilder();
355           try {
356             for (String priKey : priKeys) {
357               if (priKey.equals("timestamp")) {
358                 sqlPriKeys.append(priKey);
359                 sqlPriKeys.append(" = \"");
360                 sqlPriKeys.append(sqlTime);
361                 sqlPriKeys.append("\"");
362               }
363               if (!priKey.equals(priKeys[priKeys.length - 1])) {
364                 sqlPriKeys.append(sqlPriKeys);
365                 sqlPriKeys.append(", ");
366               }
367             }
368           } catch (Exception nullException) {
369             // ignore if primary key is empty
370             log.debug(ExceptionUtil.getStackTrace(nullException));
371           }
372           // Map the hash objects to database table columns
373           StringBuilder sqlValues = new StringBuilder();
374           boolean firstValue = true;
375           for(Entry<String, String> fi : recordSet.entrySet()) {
376             String fieldKey = fi.getKey();
377             String fieldValue = fi.getValue();
378             if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
379               if (!firstValue) {
380                 sqlValues.append(", ");
381               }
382               try {
383                 if (dbSchema.get(dbTables.get(dbKey)).get(
384                     transformer.get(fieldKey)) == java.sql.Types.VARCHAR
385                     || dbSchema.get(dbTables.get(dbKey)).get(
386                         transformer.get(fieldKey)) == java.sql.Types.BLOB) {
387                   String conversionKey = "conversion." + fieldKey;
388                   if (conversion.containsKey(conversionKey)) {
389                     sqlValues.append(transformer.get(fieldKey));
390                     sqlValues.append("=");
391                     sqlValues.append(fieldValue);
392                     sqlValues.append(conversion.get(conversionKey).toString());
393                   } else {
394                     sqlValues.append(transformer.get(fieldKey));
395                     sqlValues.append("=\'");
396                     sqlValues.append(escapeQuotes(fieldValue));
397                     sqlValues.append("\'");
398                   }
399                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
400                     transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
401                   SimpleDateFormat formatter = new SimpleDateFormat(
402                       "yyyy-MM-dd HH:mm:ss");
403                   Date recordDate = new Date();
404                   recordDate.setTime(Long.parseLong(fieldValue));
405                   sqlValues.append(transformer.get(fieldKey));
406                   sqlValues.append("=\"");
407                   sqlValues.append(formatter.format(recordDate));
408                   sqlValues.append("\"");
409                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
410                     transformer.get(fieldKey)) == java.sql.Types.BIGINT
411                     || dbSchema.get(dbTables.get(dbKey)).get(
412                         transformer.get(fieldKey)) == java.sql.Types.TINYINT
413                     || dbSchema.get(dbTables.get(dbKey)).get(
414                         transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
415                   long tmp = 0;
416                   try {
417                     tmp = Long.parseLong(fieldValue);
418                     String conversionKey = "conversion." + fieldKey;
419                     if (conversion.containsKey(conversionKey)) {
420                       tmp = tmp
421                           * Long.parseLong(conversion.get(conversionKey)
422                               .toString());
423                     }
424                   } catch (Exception e) {
425                     tmp = 0;
426                   }
427                   sqlValues.append(transformer.get(fieldKey));
428                   sqlValues.append("=");
429                   sqlValues.append(tmp);
430                 } else {
431                   double tmp = 0;
432                   tmp = Double.parseDouble(fieldValue);
433                   String conversionKey = "conversion." + fieldKey;
434                   if (conversion.containsKey(conversionKey)) {
435                     tmp = tmp
436                         * Double.parseDouble(conversion.get(conversionKey)
437                             .toString());
438                   }
439                   if (Double.isNaN(tmp)) {
440                     tmp = 0;
441                   }
442                   sqlValues.append(transformer.get(fieldKey));
443                   sqlValues.append("=");
444                   sqlValues.append(tmp);
445                 }
446                 firstValue = false;
447               } catch (NumberFormatException ex) {
448                 String conversionKey = "conversion." + fieldKey;
449                 if (conversion.containsKey(conversionKey)) {
450                   sqlValues.append(transformer.get(fieldKey));
451                   sqlValues.append("=");
452                   sqlValues.append(recordSet.get(fieldKey));
453                   sqlValues.append(conversion.get(conversionKey).toString());
454                 } else {
455                   sqlValues.append(transformer.get(fieldKey));
456                   sqlValues.append("='");
457                   sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
458                   sqlValues.append("'");
459                 }
460                 firstValue = false;
461               } catch (NullPointerException ex) {
462                 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
463                     + " does not contain valid MDL structure.");
464               }
465             }
466           }
467           StringBuilder sql = new StringBuilder();
468           if (sqlPriKeys.length() > 0) {
469             sql.append("INSERT INTO ");
470             sql.append(table);
471             sql.append(" SET ");
472             sql.append(sqlPriKeys.toString());
473             sql.append(",");
474             sql.append(sqlValues.toString());
475             sql.append(" ON DUPLICATE KEY UPDATE ");
476             sql.append(sqlPriKeys.toString());
477             sql.append(",");
478             sql.append(sqlValues.toString());
479             sql.append(";");
480           } else {
481             if(sqlValues.length() > 0) {
482               sql.append("INSERT INTO ");
483               sql.append(table);
484               sql.append(" SET ");
485               sql.append(sqlValues.toString());
486               sql.append(" ON DUPLICATE KEY UPDATE ");
487               sql.append(sqlValues.toString());
488               sql.append(";");
489             }
490           }
491           if(sql.length() > 0) {
492             log.trace(sql);
493           
494             if (batchMode) {
495               stmt.addBatch(sql.toString());
496               batch++;
497             } else {
498               stmt.execute(sql.toString());
499             }
500             if (batchMode && batch > 20000) {
501               int[] updateCounts = stmt.executeBatch();
502               log.info("Batch mode inserted=" + updateCounts.length + "records.");
503               batch = 0;
504             }
505           }
506         }
507 
508       }
509 
510       if (batchMode) {
511         int[] updateCounts = stmt.executeBatch();
512         log.info("Batch mode inserted=" + updateCounts.length + "records.");
513       }
514     } catch (SQLException ex) {
515       // handle any errors
516       isSuccessful = false;
517       log.error(ex, ex);
518       log.error("SQLException: " + ex.getMessage());
519       log.error("SQLState: " + ex.getSQLState());
520       log.error("VendorError: " + ex.getErrorCode());
521       // throw an exception up the chain to give the PostProcessorManager a chance to retry
522       throw new IOException (ex);
523     } catch (Exception e) {
524       isSuccessful = false;
525       log.error(ExceptionUtil.getStackTrace(e));
526       // throw an exception up the chain to give the PostProcessorManager a chance to retry
527       throw new IOException (e);
528     } finally {
529       if (batchMode && conn!=null) {
530         try {
531           conn.commit();
532           log.info("batchMode commit done");
533         } catch (SQLException ex) {
534           log.error(ex, ex);
535           log.error("SQLException: " + ex.getMessage());
536           log.error("SQLState: " + ex.getSQLState());
537           log.error("VendorError: " + ex.getErrorCode());
538         }
539       }
540       long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
541       int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
542       String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
543       log.info(logMsg + " (" + recordType + ","
544           + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
545       if (rs != null) {
546         try {
547           rs.close();
548         } catch (SQLException ex) {
549           log.error(ex, ex);
550           log.error("SQLException: " + ex.getMessage());
551           log.error("SQLState: " + ex.getSQLState());
552           log.error("VendorError: " + ex.getErrorCode());
553         }
554         rs = null;
555       }
556       if (stmt != null) {
557         try {
558           stmt.close();
559         } catch (SQLException ex) {
560           log.error(ex, ex);
561           log.error("SQLException: " + ex.getMessage());
562           log.error("SQLState: " + ex.getSQLState());
563           log.error("VendorError: " + ex.getErrorCode());
564         }
565         stmt = null;
566       }
567       if (conn != null) {
568         try {
569           conn.close();
570         } catch (SQLException ex) {
571           log.error(ex, ex);
572           log.error("SQLException: " + ex.getMessage());
573           log.error("SQLState: " + ex.getSQLState());
574           log.error("VendorError: " + ex.getErrorCode());
575         }
576         conn = null;
577       }
578       
579       if (reader != null) {
580         try {
581           reader.close();
582         } catch (Exception e) {
583           log.warn("Could not close SequenceFile.Reader:" ,e);
584         }
585         reader = null;
586       }
587     }
588     return true;
589   }
590 
591   public Boolean call() throws IOException {
592     return run();  
593   }
594   
595 
596   public static void main(String[] args) {
597     try {
598       MetricDataLoader mdl = new MetricDataLoader(args[0]);
599       mdl.run();
600     } catch (Exception e) {
601       e.printStackTrace();
602     }
603   }
604 
605 }