This project has retired. For details please refer to its
Attic page.
MetricDataLoader xref
19 package org.apache.hadoop.chukwa.dataloader;
21 import;
22 import java.sql.Connection;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.sql.Statement;
26 import java.text.SimpleDateFormat;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map.Entry;
31 import java.util.regex.Matcher;
32 import java.util.regex.Pattern;
33 import java.util.concurrent.Callable;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
38 import org.apache.hadoop.chukwa.database.DatabaseConfig;
39 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
40 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
41 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
42 import org.apache.hadoop.chukwa.util.ClusterConfig;
43 import org.apache.hadoop.chukwa.util.DatabaseWriter;
44 import org.apache.hadoop.chukwa.util.ExceptionUtil;
45 import org.apache.hadoop.chukwa.util.RegexUtil;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import;
50 public class MetricDataLoader implements Callable {
51 private static Log log = LogFactory.getLog(MetricDataLoader.class);
53 private Statement stmt = null;
54 private ResultSet rs = null;
55 private DatabaseConfig mdlConfig = null;
56 private HashMap<String, String> normalize = null;
57 private HashMap<String, String> transformer = null;
58 private HashMap<String, Float> conversion = null;
59 private HashMap<String, String> dbTables = null;
60 private HashMap<String, HashMap<String, Integer>> dbSchema = null;
61 private String newSpace = "-";
62 private boolean batchMode = true;
63 private Connection conn = null;
64 private Path source = null;
66 private ChukwaConfiguration conf = null;
67 private FileSystem fs = null;
68 private String jdbc_url = "";
70 public MetricDataLoader(String fileName) throws IOException {
71 conf = new ChukwaConfiguration();
72 fs = FileSystem.get(conf);
73 }
79 public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
80 source = new Path(fileName);
81 this.conf = conf;
82 this.fs = fs;
83 }
85 private void initEnv(String cluster) throws Exception {
86 mdlConfig = new DatabaseConfig();
87 transformer = mdlConfig.startWith("metric.");
88 conversion = new HashMap<String, Float>();
89 normalize = mdlConfig.startWith("normalize.");
90 dbTables = mdlConfig.startWith("");
91 Iterator<?> entries = mdlConfig.iterator();
92 while (entries.hasNext()) {
93 String entry =;
94 if (entry.startsWith("conversion.")) {
95 String[] metrics = entry.split("=");
96 try {
97 float convertNumber = Float.parseFloat(metrics[1]);
98 conversion.put(metrics[0], convertNumber);
99 } catch (NumberFormatException ex) {
100 log.error(metrics[0] + " is not a number.");
101 }
102 }
103 }
104 log.debug("cluster name:" + cluster);
105 if (!cluster.equals("")) {
106 ClusterConfig cc = new ClusterConfig();
107 jdbc_url = cc.getURL(cluster);
108 }
109 try {
110 DatabaseWriter dbWriter = new DatabaseWriter(cluster);
111 conn = dbWriter.getConnection();
112 } catch(Exception ex) {
113 throw new Exception("JDBC URL does not exist for:"+jdbc_url);
114 }
115 log.debug("Initialized JDBC URL: " + jdbc_url);
116 HashMap<String, String> dbNames = mdlConfig.startWith("");
117 Iterator<String> ki = dbNames.keySet().iterator();
118 dbSchema = new HashMap<String, HashMap<String, Integer>>();
119 while (ki.hasNext()) {
120 String recordType =;
121 String table = dbNames.get(recordType);
122 try {
123 ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
124 HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
125 while( {
126 String name = rs.getString("COLUMN_NAME");
127 int type = rs.getInt("DATA_TYPE");
128 tableSchema.put(name, type);
129 StringBuilder metricName = new StringBuilder();
130 metricName.append("metric.");
131 metricName.append(recordType.substring(15));
132 metricName.append(".");
133 metricName.append(name);
134 String mdlKey = metricName.toString().toLowerCase();
135 if(!transformer.containsKey(mdlKey)) {
136 transformer.put(mdlKey, name);
137 }
138 }
139 rs.close();
140 dbSchema.put(table, tableSchema);
141 } catch (SQLException ex) {
142 log.debug("table: " + table
143 + " template does not exist, MDL will not load data for this table.");
144 }
145 }
146 stmt = conn.createStatement();
147 conn.setAutoCommit(false);
148 }
150 public void interrupt() {
151 }
153 private String escape(String s, String c) {
155 String ns = s.trim();
156 Pattern pattern = Pattern.compile(" +");
157 Matcher matcher = pattern.matcher(ns);
158 String s2 = matcher.replaceAll(c);
160 return s2;
162 }
164 public static String escapeQuotes( String s ) {
165 StringBuffer sb = new StringBuffer();
166 int index;
167 int length = s.length();
168 char ch;
169 for( index = 0; index < length; ++index ) {
170 if(( ch = s.charAt( index )) == '\"' ) {
171 sb.append( "\\\"" );
172 } else if( ch == '\\' ) {
173 sb.append( "\\\\" );
174 } else if( ch == '\'' ) {
175 sb.append( "\\'" );
176 } else {
177 sb.append( ch );
178 }
179 }
180 return( sb.toString());
181 }
183 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
185 justification = "Dynamic based upon tables in the database")
186 public boolean run() throws IOException {
187 boolean first=true;
188"StreamName: " + source.getName());
189 SequenceFile.Reader reader = null;
191 try {
194 reader = new SequenceFile.Reader(fs, source, conf);
195 } catch (Exception ex) {
197 log.error(ex, ex);
198 }
199 long currentTimeMillis = System.currentTimeMillis();
200 boolean isSuccessful = true;
201 String recordType = null;
203 ChukwaRecordKey key = new ChukwaRecordKey();
204 ChukwaRecord record = new ChukwaRecord();
205 String cluster = null;
206 int numOfRecords = 0;
207 try {
208 Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
209 int batch = 0;
210 while (reader !=null &&, record)) {
211 numOfRecords++;
212 if(first) {
213 try {
214 cluster = RecordUtil.getClusterName(record);
215 initEnv(cluster);
216 first=false;
217 } catch(Exception ex) {
218 log.error("Initialization failed for: "+cluster+". Please check jdbc configuration.");
219 return false;
220 }
221 }
222 String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
223 log.debug("Timestamp: " + record.getTime());
224 log.debug("DataType: " + key.getReduceType());
226 String[] fields = record.getFields();
227 String table = null;
228 String[] priKeys = null;
229 HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
230 StringBuilder normKey = new StringBuilder();
231 String node = record.getValue("csource");
232 recordType = key.getReduceType().toLowerCase();
233 String dbKey = "" + recordType;
234 Matcher m = p.matcher(recordType);
235 if (dbTables.containsKey(dbKey)) {
236 String tableName = mdlConfig.get(dbKey);
237 if (!RegexUtil.isRegex(tableName)) {
238 log.error("Error parsing 'tableName' as a regex: "
239 + RegexUtil.regexError(tableName));
240 return false;
241 }
242 String[] tmp = mdlConfig.findTableName(tableName, record
243 .getTime(), record.getTime());
244 table = tmp[0];
245 } else if(m.matches()) {
246 String timePartition = "_week";
247 int timeSize = Integer.parseInt(;
248 if(timeSize == 5) {
249 timePartition = "_month";
250 } else if(timeSize == 30) {
251 timePartition = "_quarter";
252 } else if(timeSize == 180) {
253 timePartition = "_year";
254 } else if(timeSize == 720) {
255 timePartition = "_decade";
256 }
257 int partition = (int) (record.getTime() / timeSize);
258 StringBuilder tmpDbKey = new StringBuilder();
259 tmpDbKey.append("");
260 tmpDbKey.append(;
261 if(dbTables.containsKey(tmpDbKey.toString())) {
262 StringBuilder tmpTable = new StringBuilder();
263 tmpTable.append(dbTables.get(tmpDbKey.toString()));
264 tmpTable.append("_");
265 tmpTable.append(partition);
266 tmpTable.append("_");
267 tmpTable.append(timePartition);
268 table = tmpTable.toString();
269 } else {
270 log.debug(tmpDbKey.toString() + " does not exist.");
271 continue;
272 }
273 } else {
274 log.debug(dbKey + " does not exist.");
275 continue;
276 }
277 log.debug("table name:" + table);
278 try {
279 priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
280 ",");
281 } catch (Exception nullException) {
282 log.debug(ExceptionUtil.getStackTrace(nullException));
283 }
284 for (String field : fields) {
285 String keyName = escape(field.toLowerCase(), newSpace);
286 String keyValue = escape(record.getValue(field).toLowerCase(),
287 newSpace);
288 StringBuilder buildKey = new StringBuilder();
289 buildKey.append("normalize.");
290 buildKey.append(recordType);
291 buildKey.append(".");
292 buildKey.append(keyName);
293 if (normalize.containsKey(buildKey.toString())) {
294 if (normKey.toString().equals("")) {
295 normKey.append(keyName);
296 normKey.append(".");
297 normKey.append(keyValue);
298 } else {
299 normKey.append(".");
300 normKey.append(keyName);
301 normKey.append(".");
302 normKey.append(keyValue);
303 }
304 }
305 StringBuilder normalizedKey = new StringBuilder();
306 normalizedKey.append("metric.");
307 normalizedKey.append(recordType);
308 normalizedKey.append(".");
309 normalizedKey.append(normKey);
310 if (hashReport.containsKey(node)) {
311 HashMap<String, String> tmpHash = hashReport.get(node);
312 tmpHash.put(normalizedKey.toString(), keyValue);
313 hashReport.put(node, tmpHash);
314 } else {
315 HashMap<String, String> tmpHash = new HashMap<String, String>();
316 tmpHash.put(normalizedKey.toString(), keyValue);
317 hashReport.put(node, tmpHash);
318 }
319 }
320 for (String field : fields) {
321 String valueName = escape(field.toLowerCase(), newSpace);
322 String valueValue = escape(record.getValue(field).toLowerCase(),
323 newSpace);
324 StringBuilder buildKey = new StringBuilder();
325 buildKey.append("metric.");
326 buildKey.append(recordType);
327 buildKey.append(".");
328 buildKey.append(valueName);
329 if (!normKey.toString().equals("")) {
330 buildKey = new StringBuilder();
331 buildKey.append("metric.");
332 buildKey.append(recordType);
333 buildKey.append(".");
334 buildKey.append(normKey);
335 buildKey.append(".");
336 buildKey.append(valueName);
337 }
338 String normalizedKey = buildKey.toString();
339 if (hashReport.containsKey(node)) {
340 HashMap<String, String> tmpHash = hashReport.get(node);
341 tmpHash.put(normalizedKey, valueValue);
342 hashReport.put(node, tmpHash);
343 } else {
344 HashMap<String, String> tmpHash = new HashMap<String, String>();
345 tmpHash.put(normalizedKey, valueValue);
346 hashReport.put(node, tmpHash);
348 }
350 }
351 for(Entry<String, HashMap<String, String>> entry : hashReport.entrySet()) {
352 HashMap<String, String> recordSet = entry.getValue();
354 StringBuilder sqlPriKeys = new StringBuilder();
355 try {
356 for (String priKey : priKeys) {
357 if (priKey.equals("timestamp")) {
358 sqlPriKeys.append(priKey);
359 sqlPriKeys.append(" = \"");
360 sqlPriKeys.append(sqlTime);
361 sqlPriKeys.append("\"");
362 }
363 if (!priKey.equals(priKeys[priKeys.length - 1])) {
364 sqlPriKeys.append(sqlPriKeys);
365 sqlPriKeys.append(", ");
366 }
367 }
368 } catch (Exception nullException) {
370 log.debug(ExceptionUtil.getStackTrace(nullException));
371 }
373 StringBuilder sqlValues = new StringBuilder();
374 boolean firstValue = true;
375 for(Entry<String, String> fi : recordSet.entrySet()) {
376 String fieldKey = fi.getKey();
377 String fieldValue = fi.getValue();
378 if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
379 if (!firstValue) {
380 sqlValues.append(", ");
381 }
382 try {
383 if (dbSchema.get(dbTables.get(dbKey)).get(
384 transformer.get(fieldKey)) == java.sql.Types.VARCHAR
385 || dbSchema.get(dbTables.get(dbKey)).get(
386 transformer.get(fieldKey)) == java.sql.Types.BLOB) {
387 String conversionKey = "conversion." + fieldKey;
388 if (conversion.containsKey(conversionKey)) {
389 sqlValues.append(transformer.get(fieldKey));
390 sqlValues.append("=");
391 sqlValues.append(fieldValue);
392 sqlValues.append(conversion.get(conversionKey).toString());
393 } else {
394 sqlValues.append(transformer.get(fieldKey));
395 sqlValues.append("=\'");
396 sqlValues.append(escapeQuotes(fieldValue));
397 sqlValues.append("\'");
398 }
399 } else if (dbSchema.get(dbTables.get(dbKey)).get(
400 transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
401 SimpleDateFormat formatter = new SimpleDateFormat(
402 "yyyy-MM-dd HH:mm:ss");
403 Date recordDate = new Date();
404 recordDate.setTime(Long.parseLong(fieldValue));
405 sqlValues.append(transformer.get(fieldKey));
406 sqlValues.append("=\"");
407 sqlValues.append(formatter.format(recordDate));
408 sqlValues.append("\"");
409 } else if (dbSchema.get(dbTables.get(dbKey)).get(
410 transformer.get(fieldKey)) == java.sql.Types.BIGINT
411 || dbSchema.get(dbTables.get(dbKey)).get(
412 transformer.get(fieldKey)) == java.sql.Types.TINYINT
413 || dbSchema.get(dbTables.get(dbKey)).get(
414 transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
415 long tmp = 0;
416 try {
417 tmp = Long.parseLong(fieldValue);
418 String conversionKey = "conversion." + fieldKey;
419 if (conversion.containsKey(conversionKey)) {
420 tmp = tmp
421 * Long.parseLong(conversion.get(conversionKey)
422 .toString());
423 }
424 } catch (Exception e) {
425 tmp = 0;
426 }
427 sqlValues.append(transformer.get(fieldKey));
428 sqlValues.append("=");
429 sqlValues.append(tmp);
430 } else {
431 double tmp = 0;
432 tmp = Double.parseDouble(fieldValue);
433 String conversionKey = "conversion." + fieldKey;
434 if (conversion.containsKey(conversionKey)) {
435 tmp = tmp
436 * Double.parseDouble(conversion.get(conversionKey)
437 .toString());
438 }
439 if (Double.isNaN(tmp)) {
440 tmp = 0;
441 }
442 sqlValues.append(transformer.get(fieldKey));
443 sqlValues.append("=");
444 sqlValues.append(tmp);
445 }
446 firstValue = false;
447 } catch (NumberFormatException ex) {
448 String conversionKey = "conversion." + fieldKey;
449 if (conversion.containsKey(conversionKey)) {
450 sqlValues.append(transformer.get(fieldKey));
451 sqlValues.append("=");
452 sqlValues.append(recordSet.get(fieldKey));
453 sqlValues.append(conversion.get(conversionKey).toString());
454 } else {
455 sqlValues.append(transformer.get(fieldKey));
456 sqlValues.append("='");
457 sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
458 sqlValues.append("'");
459 }
460 firstValue = false;
461 } catch (NullPointerException ex) {
462 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
463 + " does not contain valid MDL structure.");
464 }
465 }
466 }
467 StringBuilder sql = new StringBuilder();
468 if (sqlPriKeys.length() > 0) {
469 sql.append("INSERT INTO ");
470 sql.append(table);
471 sql.append(" SET ");
472 sql.append(sqlPriKeys.toString());
473 sql.append(",");
474 sql.append(sqlValues.toString());
475 sql.append(" ON DUPLICATE KEY UPDATE ");
476 sql.append(sqlPriKeys.toString());
477 sql.append(",");
478 sql.append(sqlValues.toString());
479 sql.append(";");
480 } else {
481 if(sqlValues.length() > 0) {
482 sql.append("INSERT INTO ");
483 sql.append(table);
484 sql.append(" SET ");
485 sql.append(sqlValues.toString());
486 sql.append(" ON DUPLICATE KEY UPDATE ");
487 sql.append(sqlValues.toString());
488 sql.append(";");
489 }
490 }
491 if(sql.length() > 0) {
492 log.trace(sql);
494 if (batchMode) {
495 stmt.addBatch(sql.toString());
496 batch++;
497 } else {
498 stmt.execute(sql.toString());
499 }
500 if (batchMode && batch > 20000) {
501 int[] updateCounts = stmt.executeBatch();
502"Batch mode inserted=" + updateCounts.length + "records.");
503 batch = 0;
504 }
505 }
506 }
508 }
510 if (batchMode) {
511 int[] updateCounts = stmt.executeBatch();
512"Batch mode inserted=" + updateCounts.length + "records.");
513 }
514 } catch (SQLException ex) {
516 isSuccessful = false;
517 log.error(ex, ex);
518 log.error("SQLException: " + ex.getMessage());
519 log.error("SQLState: " + ex.getSQLState());
520 log.error("VendorError: " + ex.getErrorCode());
522 throw new IOException (ex);
523 } catch (Exception e) {
524 isSuccessful = false;
525 log.error(ExceptionUtil.getStackTrace(e));
527 throw new IOException (e);
528 } finally {
529 if (batchMode && conn!=null) {
530 try {
531 conn.commit();
532"batchMode commit done");
533 } catch (SQLException ex) {
534 log.error(ex, ex);
535 log.error("SQLException: " + ex.getMessage());
536 log.error("SQLState: " + ex.getSQLState());
537 log.error("VendorError: " + ex.getErrorCode());
538 }
539 }
540 long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
541 int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
542 String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
543 + " (" + recordType + ","
544 + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
545 if (rs != null) {
546 try {
547 rs.close();
548 } catch (SQLException ex) {
549 log.error(ex, ex);
550 log.error("SQLException: " + ex.getMessage());
551 log.error("SQLState: " + ex.getSQLState());
552 log.error("VendorError: " + ex.getErrorCode());
553 }
554 rs = null;
555 }
556 if (stmt != null) {
557 try {
558 stmt.close();
559 } catch (SQLException ex) {
560 log.error(ex, ex);
561 log.error("SQLException: " + ex.getMessage());
562 log.error("SQLState: " + ex.getSQLState());
563 log.error("VendorError: " + ex.getErrorCode());
564 }
565 stmt = null;
566 }
567 if (conn != null) {
568 try {
569 conn.close();
570 } catch (SQLException ex) {
571 log.error(ex, ex);
572 log.error("SQLException: " + ex.getMessage());
573 log.error("SQLState: " + ex.getSQLState());
574 log.error("VendorError: " + ex.getErrorCode());
575 }
576 conn = null;
577 }
579 if (reader != null) {
580 try {
581 reader.close();
582 } catch (Exception e) {
583 log.warn("Could not close SequenceFile.Reader:" ,e);
584 }
585 reader = null;
586 }
587 }
588 return true;
589 }
591 public Boolean call() throws IOException {
592 return run();
593 }
596 public static void main(String[] args) {
597 try {
598 MetricDataLoader mdl = new MetricDataLoader(args[0]);
600 } catch (Exception e) {
601 e.printStackTrace();
602 }
603 }
605 }