This project has retired. For details please refer to its Attic page.
MetricDataLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.dataloader;
20  
21  import java.io.IOException;
22  import java.sql.Connection;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Statement;
26  import java.text.SimpleDateFormat;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.regex.Matcher;
31  import java.util.regex.Pattern;
32  import java.util.concurrent.Callable;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37  import org.apache.hadoop.chukwa.database.DatabaseConfig;
38  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40  import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
41  import org.apache.hadoop.chukwa.util.ClusterConfig;
42  import org.apache.hadoop.chukwa.util.DatabaseWriter;
43  import org.apache.hadoop.chukwa.util.ExceptionUtil;
44  import org.apache.hadoop.chukwa.util.RegexUtil;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.io.SequenceFile;
48  
49  public class MetricDataLoader implements Callable {
50    private static Log log = LogFactory.getLog(MetricDataLoader.class);
51  
52    private Statement stmt = null;
53    private ResultSet rs = null;
54    private DatabaseConfig mdlConfig = null;
55    private HashMap<String, String> normalize = null;
56    private HashMap<String, String> transformer = null;
57    private HashMap<String, Float> conversion = null;
58    private HashMap<String, String> dbTables = null;
59    private HashMap<String, HashMap<String, Integer>> dbSchema = null;
60    private String newSpace = "-";
61    private boolean batchMode = true;
62    private Connection conn = null;
63    private Path source = null;
64  
65    private static ChukwaConfiguration conf = null;
66    private static FileSystem fs = null;
67    private String jdbc_url = "";
68  
69    /** Creates a new instance of DBWriter */
70    public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
71      source = new Path(fileName);
72      this.conf = conf;
73      this.fs = fs;
74    }
75  
76    private void initEnv(String cluster) throws Exception {
77      mdlConfig = new DatabaseConfig();
78      transformer = mdlConfig.startWith("metric.");
79      conversion = new HashMap<String, Float>();
80      normalize = mdlConfig.startWith("normalize.");
81      dbTables = mdlConfig.startWith("report.db.name.");
82      Iterator<?> entries = mdlConfig.iterator();
83      while (entries.hasNext()) {
84        String entry = entries.next().toString();
85        if (entry.startsWith("conversion.")) {
86          String[] metrics = entry.split("=");
87          try {
88            float convertNumber = Float.parseFloat(metrics[1]);
89            conversion.put(metrics[0], convertNumber);
90          } catch (NumberFormatException ex) {
91            log.error(metrics[0] + " is not a number.");
92          }
93        }
94      }
95      log.debug("cluster name:" + cluster);
96      if (!cluster.equals("")) {
97        ClusterConfig cc = new ClusterConfig();
98        jdbc_url = cc.getURL(cluster);
99      }
100     try {
101       DatabaseWriter dbWriter = new DatabaseWriter(cluster);
102       conn = dbWriter.getConnection();
103     } catch(Exception ex) {
104       throw new Exception("JDBC URL does not exist for:"+jdbc_url);
105     }
106     log.debug("Initialized JDBC URL: " + jdbc_url);
107     HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
108     Iterator<String> ki = dbNames.keySet().iterator();
109     dbSchema = new HashMap<String, HashMap<String, Integer>>();
110     while (ki.hasNext()) {
111       String recordType = ki.next().toString();
112       String table = dbNames.get(recordType);
113       try {
114         ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
115         HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
116         while(rs.next()) {
117           String name = rs.getString("COLUMN_NAME");
118           int type = rs.getInt("DATA_TYPE");
119           tableSchema.put(name, type);
120           StringBuilder metricName = new StringBuilder();
121           metricName.append("metric.");
122           metricName.append(recordType.substring(15));
123           metricName.append(".");
124           metricName.append(name);
125           String mdlKey = metricName.toString().toLowerCase();
126           if(!transformer.containsKey(mdlKey)) {
127             transformer.put(mdlKey, name);
128           }          
129         }
130         rs.close();
131         dbSchema.put(table, tableSchema);
132       } catch (SQLException ex) {
133         log.debug("table: " + table
134           + " template does not exist, MDL will not load data for this table.");
135       }
136     }
137     stmt = conn.createStatement();
138     conn.setAutoCommit(false);
139   }
140 
141   public void interrupt() {
142   }
143 
144   private String escape(String s, String c) {
145 
146     String ns = s.trim();
147     Pattern pattern = Pattern.compile(" +");
148     Matcher matcher = pattern.matcher(ns);
149     String s2 = matcher.replaceAll(c);
150 
151     return s2;
152 
153   }
154 
155   public static String escapeQuotes( String s ) {
156     StringBuffer sb = new StringBuffer(); 
157     int index; 
158     int length = s.length(); 
159     char ch;
160     for( index = 0; index < length; ++index ) {
161       if(( ch = s.charAt( index )) == '\"' ) {
162         sb.append( "\\\"" ); 
163       } else if( ch == '\\' ) {
164         sb.append( "\\\\" ); 
165       } else if( ch == '\'' ) {
166         sb.append( "\\'" );
167       } else {
168         sb.append( ch );
169       }
170     }
171     return( sb.toString()); 
172   }
173   
174   public boolean run() throws IOException {
175     boolean first=true;
176     log.info("StreamName: " + source.getName());
177     SequenceFile.Reader reader = null;
178 
179     try {
180       // The newInstance() call is a work around for some
181       // broken Java implementations
182       reader = new SequenceFile.Reader(fs, source, conf);
183     } catch (Exception ex) {
184       // handle the error
185       log.error(ex, ex);
186     }
187     long currentTimeMillis = System.currentTimeMillis();
188     boolean isSuccessful = true;
189     String recordType = null;
190 
191     ChukwaRecordKey key = new ChukwaRecordKey();
192     ChukwaRecord record = new ChukwaRecord();
193     String cluster = null;
194     int numOfRecords = 0;
195     try {
196       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
197       int batch = 0;
198       while (reader.next(key, record)) {
199     	numOfRecords++;
200         if(first) { 
201           try {
202             cluster = RecordUtil.getClusterName(record);
203             initEnv(cluster);
204             first=false;
205           } catch(Exception ex) {
206             log.error("Initialization failed for: "+cluster+".  Please check jdbc configuration.");
207             return false;
208           }
209         }
210         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
211         log.debug("Timestamp: " + record.getTime());
212         log.debug("DataType: " + key.getReduceType());
213 
214         String[] fields = record.getFields();
215         String table = null;
216         String[] priKeys = null;
217         HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
218         StringBuilder normKey = new StringBuilder();
219         String node = record.getValue("csource");
220         recordType = key.getReduceType().toLowerCase();
221         String dbKey = "report.db.name." + recordType;
222         Matcher m = p.matcher(recordType);
223         if (dbTables.containsKey(dbKey)) {
224           String tableName = mdlConfig.get(dbKey);
225           if (!RegexUtil.isRegex(tableName)) {
226             log.error("Error parsing 'tableName' as a regex: "
227                 + RegexUtil.regexError(tableName));
228             return false;
229           }
230           String[] tmp = mdlConfig.findTableName(tableName, record
231               .getTime(), record.getTime());
232           table = tmp[0];
233         } else if(m.matches()) {
234           String timePartition = "_week";
235           int timeSize = Integer.parseInt(m.group(2));
236           if(timeSize == 5) {
237             timePartition = "_month";
238           } else if(timeSize == 30) {
239             timePartition = "_quarter";
240           } else if(timeSize == 180) {
241             timePartition = "_year";
242           } else if(timeSize == 720) {
243             timePartition = "_decade";
244           }
245           int partition = (int) (record.getTime() / timeSize);
246           StringBuilder tmpDbKey = new StringBuilder();
247           tmpDbKey.append("report.db.name.");
248           tmpDbKey.append(m.group(1));
249           if(dbTables.containsKey(tmpDbKey.toString())) {
250             StringBuilder tmpTable = new StringBuilder();
251             tmpTable.append(dbTables.get(tmpDbKey.toString()));
252             tmpTable.append("_");
253             tmpTable.append(partition);
254             tmpTable.append("_");
255             tmpTable.append(timePartition);
256             table = tmpTable.toString();
257           } else {
258             log.debug(tmpDbKey.toString() + " does not exist.");
259             continue;            
260           }
261         } else {
262           log.debug(dbKey + " does not exist.");
263           continue;
264         }
265         log.debug("table name:" + table);
266         try {
267           priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
268               ",");
269         } catch (Exception nullException) {
270           log.debug(ExceptionUtil.getStackTrace(nullException));
271         }
272         for (String field : fields) {
273           String keyName = escape(field.toLowerCase(), newSpace);
274           String keyValue = escape(record.getValue(field).toLowerCase(),
275               newSpace);
276           StringBuilder buildKey = new StringBuilder();
277           buildKey.append("normalize.");
278           buildKey.append(recordType);
279           buildKey.append(".");
280           buildKey.append(keyName);
281           if (normalize.containsKey(buildKey.toString())) {
282             if (normKey.toString().equals("")) {
283               normKey.append(keyName);
284               normKey.append(".");
285               normKey.append(keyValue);
286             } else {
287               normKey.append(".");
288               normKey.append(keyName);
289               normKey.append(".");
290               normKey.append(keyValue);
291             }
292           }
293           StringBuilder normalizedKey = new StringBuilder();
294           normalizedKey.append("metric.");
295           normalizedKey.append(recordType);
296           normalizedKey.append(".");
297           normalizedKey.append(normKey);
298           if (hashReport.containsKey(node)) {
299             HashMap<String, String> tmpHash = hashReport.get(node);
300             tmpHash.put(normalizedKey.toString(), keyValue);
301             hashReport.put(node, tmpHash);
302           } else {
303             HashMap<String, String> tmpHash = new HashMap<String, String>();
304             tmpHash.put(normalizedKey.toString(), keyValue);
305             hashReport.put(node, tmpHash);
306           }
307         }
308         for (String field : fields) {
309           String valueName = escape(field.toLowerCase(), newSpace);
310           String valueValue = escape(record.getValue(field).toLowerCase(),
311               newSpace);
312           StringBuilder buildKey = new StringBuilder();
313           buildKey.append("metric.");
314           buildKey.append(recordType);
315           buildKey.append(".");
316           buildKey.append(valueName);
317           if (!normKey.toString().equals("")) {
318             buildKey = new StringBuilder();
319             buildKey.append("metric.");
320             buildKey.append(recordType);
321             buildKey.append(".");
322             buildKey.append(normKey);
323             buildKey.append(".");
324             buildKey.append(valueName);
325           }
326           String normalizedKey = buildKey.toString();
327           if (hashReport.containsKey(node)) {
328             HashMap<String, String> tmpHash = hashReport.get(node);
329             tmpHash.put(normalizedKey, valueValue);
330             hashReport.put(node, tmpHash);
331           } else {
332             HashMap<String, String> tmpHash = new HashMap<String, String>();
333             tmpHash.put(normalizedKey, valueValue);
334             hashReport.put(node, tmpHash);
335 
336           }
337 
338         }
339         Iterator<String> i = hashReport.keySet().iterator();
340         while (i.hasNext()) {
341           Object iteratorNode = i.next();
342           HashMap<String, String> recordSet = hashReport.get(iteratorNode);
343           Iterator<String> fi = recordSet.keySet().iterator();
344           // Map any primary key that was not included in the report keyName
345           StringBuilder sqlPriKeys = new StringBuilder();
346           try {
347             for (String priKey : priKeys) {
348               if (priKey.equals("timestamp")) {
349                 sqlPriKeys.append(priKey);
350                 sqlPriKeys.append(" = \"");
351                 sqlPriKeys.append(sqlTime);
352                 sqlPriKeys.append("\"");
353               }
354               if (!priKey.equals(priKeys[priKeys.length - 1])) {
355                 sqlPriKeys.append(sqlPriKeys);
356                 sqlPriKeys.append(", ");
357               }
358             }
359           } catch (Exception nullException) {
360             // ignore if primary key is empty
361             log.debug(ExceptionUtil.getStackTrace(nullException));
362           }
363           // Map the hash objects to database table columns
364           StringBuilder sqlValues = new StringBuilder();
365           boolean firstValue = true;
366           while (fi.hasNext()) {
367             String fieldKey = fi.next();
368             if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
369               if (!firstValue) {
370                 sqlValues.append(", ");
371               }
372               try {
373                 if (dbSchema.get(dbTables.get(dbKey)).get(
374                     transformer.get(fieldKey)) == java.sql.Types.VARCHAR
375                     || dbSchema.get(dbTables.get(dbKey)).get(
376                         transformer.get(fieldKey)) == java.sql.Types.BLOB) {
377                   String conversionKey = "conversion." + fieldKey;
378                   if (conversion.containsKey(conversionKey)) {
379                     sqlValues.append(transformer.get(fieldKey));
380                     sqlValues.append("=");
381                     sqlValues.append(recordSet.get(fieldKey));
382                     sqlValues.append(conversion.get(conversionKey).toString());
383                   } else {
384                     sqlValues.append(transformer.get(fieldKey));
385                     sqlValues.append("=\'");
386                     sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
387                     sqlValues.append("\'");
388                   }
389                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
390                     transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
391                   SimpleDateFormat formatter = new SimpleDateFormat(
392                       "yyyy-MM-dd HH:mm:ss");
393                   Date recordDate = new Date();
394                   recordDate.setTime(Long.parseLong(recordSet
395                       .get(fieldKey)));
396                   sqlValues.append(transformer.get(fieldKey));
397                   sqlValues.append("=\"");
398                   sqlValues.append(formatter.format(recordDate));
399                   sqlValues.append("\"");
400                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
401                     transformer.get(fieldKey)) == java.sql.Types.BIGINT
402                     || dbSchema.get(dbTables.get(dbKey)).get(
403                         transformer.get(fieldKey)) == java.sql.Types.TINYINT
404                     || dbSchema.get(dbTables.get(dbKey)).get(
405                         transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
406                   long tmp = 0;
407                   try {
408                     tmp = Long.parseLong(recordSet.get(fieldKey).toString());
409                     String conversionKey = "conversion." + fieldKey;
410                     if (conversion.containsKey(conversionKey)) {
411                       tmp = tmp
412                           * Long.parseLong(conversion.get(conversionKey)
413                               .toString());
414                     }
415                   } catch (Exception e) {
416                     tmp = 0;
417                   }
418                   sqlValues.append(transformer.get(fieldKey));
419                   sqlValues.append("=");
420                   sqlValues.append(tmp);
421                 } else {
422                   double tmp = 0;
423                   tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
424                   String conversionKey = "conversion." + fieldKey;
425                   if (conversion.containsKey(conversionKey)) {
426                     tmp = tmp
427                         * Double.parseDouble(conversion.get(conversionKey)
428                             .toString());
429                   }
430                   if (Double.isNaN(tmp)) {
431                     tmp = 0;
432                   }
433                   sqlValues.append(transformer.get(fieldKey));
434                   sqlValues.append("=");
435                   sqlValues.append(tmp);
436                 }
437                 firstValue = false;
438               } catch (NumberFormatException ex) {
439                 String conversionKey = "conversion." + fieldKey;
440                 if (conversion.containsKey(conversionKey)) {
441                   sqlValues.append(transformer.get(fieldKey));
442                   sqlValues.append("=");
443                   sqlValues.append(recordSet.get(fieldKey));
444                   sqlValues.append(conversion.get(conversionKey).toString());
445                 } else {
446                   sqlValues.append(transformer.get(fieldKey));
447                   sqlValues.append("='");
448                   sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
449                   sqlValues.append("'");
450                 }
451                 firstValue = false;
452               } catch (NullPointerException ex) {
453                 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
454                     + " does not contain valid MDL structure.");
455               }
456             }
457           }
458 
459           StringBuilder sql = new StringBuilder();
460           if (sqlPriKeys.length() > 0) {
461             sql.append("INSERT INTO ");
462             sql.append(table);
463             sql.append(" SET ");
464             sql.append(sqlPriKeys.toString());
465             sql.append(",");
466             sql.append(sqlValues.toString());
467             sql.append(" ON DUPLICATE KEY UPDATE ");
468             sql.append(sqlPriKeys.toString());
469             sql.append(",");
470             sql.append(sqlValues.toString());
471             sql.append(";");
472           } else {
473             if(sqlValues.length() > 0) {
474               sql.append("INSERT INTO ");
475               sql.append(table);
476               sql.append(" SET ");
477               sql.append(sqlValues.toString());
478               sql.append(" ON DUPLICATE KEY UPDATE ");
479               sql.append(sqlValues.toString());
480               sql.append(";");
481             }
482           }
483           if(sql.length() > 0) {
484             log.trace(sql);
485           
486             if (batchMode) {
487               stmt.addBatch(sql.toString());
488               batch++;
489             } else {
490               stmt.execute(sql.toString());
491             }
492             if (batchMode && batch > 20000) {
493               int[] updateCounts = stmt.executeBatch();
494               log.info("Batch mode inserted=" + updateCounts.length + "records.");
495               batch = 0;
496             }
497           }
498         }
499 
500       }
501 
502       if (batchMode) {
503         int[] updateCounts = stmt.executeBatch();
504         log.info("Batch mode inserted=" + updateCounts.length + "records.");
505       }
506     } catch (SQLException ex) {
507       // handle any errors
508       isSuccessful = false;
509       log.error(ex, ex);
510       log.error("SQLException: " + ex.getMessage());
511       log.error("SQLState: " + ex.getSQLState());
512       log.error("VendorError: " + ex.getErrorCode());
513       // throw an exception up the chain to give the PostProcessorManager a chance to retry
514       throw new IOException (ex);
515     } catch (Exception e) {
516       isSuccessful = false;
517       log.error(ExceptionUtil.getStackTrace(e));
518       // throw an exception up the chain to give the PostProcessorManager a chance to retry
519       throw new IOException (e);
520     } finally {
521       if (batchMode && conn!=null) {
522         try {
523           conn.commit();
524           log.info("batchMode commit done");
525         } catch (SQLException ex) {
526           log.error(ex, ex);
527           log.error("SQLException: " + ex.getMessage());
528           log.error("SQLState: " + ex.getSQLState());
529           log.error("VendorError: " + ex.getErrorCode());
530         }
531       }
532       long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
533       int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
534       String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
535       log.info(logMsg + " (" + recordType + ","
536           + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
537       if (rs != null) {
538         try {
539           rs.close();
540         } catch (SQLException ex) {
541           log.error(ex, ex);
542           log.error("SQLException: " + ex.getMessage());
543           log.error("SQLState: " + ex.getSQLState());
544           log.error("VendorError: " + ex.getErrorCode());
545         }
546         rs = null;
547       }
548       if (stmt != null) {
549         try {
550           stmt.close();
551         } catch (SQLException ex) {
552           log.error(ex, ex);
553           log.error("SQLException: " + ex.getMessage());
554           log.error("SQLState: " + ex.getSQLState());
555           log.error("VendorError: " + ex.getErrorCode());
556         }
557         stmt = null;
558       }
559       if (conn != null) {
560         try {
561           conn.close();
562         } catch (SQLException ex) {
563           log.error(ex, ex);
564           log.error("SQLException: " + ex.getMessage());
565           log.error("SQLState: " + ex.getSQLState());
566           log.error("VendorError: " + ex.getErrorCode());
567         }
568         conn = null;
569       }
570       
571       if (reader != null) {
572         try {
573           reader.close();
574         } catch (Exception e) {
575           log.warn("Could not close SequenceFile.Reader:" ,e);
576         }
577         reader = null;
578       }
579     }
580     return true;
581   }
582 
583   public Boolean call() throws IOException {
584     return run();  
585   }
586   
587 
588   public static void main(String[] args) {
589     try {
590       conf = new ChukwaConfiguration();
591       fs = FileSystem.get(conf);
592       MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
593       mdl.run();
594     } catch (Exception e) {
595       e.printStackTrace();
596     }
597   }
598 
599 }