This project has retired. For details please refer to its
Attic page.
MetricDataLoader xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.dataloader;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.sql.Statement;
26 import java.text.SimpleDateFormat;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 import java.util.concurrent.Callable;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37 import org.apache.hadoop.chukwa.database.DatabaseConfig;
38 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
41 import org.apache.hadoop.chukwa.util.ClusterConfig;
42 import org.apache.hadoop.chukwa.util.DatabaseWriter;
43 import org.apache.hadoop.chukwa.util.ExceptionUtil;
44 import org.apache.hadoop.chukwa.util.RegexUtil;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.io.SequenceFile;
48
49 public class MetricDataLoader implements Callable {
50 private static Log log = LogFactory.getLog(MetricDataLoader.class);
51
52 private Statement stmt = null;
53 private ResultSet rs = null;
54 private DatabaseConfig mdlConfig = null;
55 private HashMap<String, String> normalize = null;
56 private HashMap<String, String> transformer = null;
57 private HashMap<String, Float> conversion = null;
58 private HashMap<String, String> dbTables = null;
59 private HashMap<String, HashMap<String, Integer>> dbSchema = null;
60 private String newSpace = "-";
61 private boolean batchMode = true;
62 private Connection conn = null;
63 private Path source = null;
64
65 private static ChukwaConfiguration conf = null;
66 private static FileSystem fs = null;
67 private String jdbc_url = "";
68
69
70 public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
71 source = new Path(fileName);
72 this.conf = conf;
73 this.fs = fs;
74 }
75
76 private void initEnv(String cluster) throws Exception {
77 mdlConfig = new DatabaseConfig();
78 transformer = mdlConfig.startWith("metric.");
79 conversion = new HashMap<String, Float>();
80 normalize = mdlConfig.startWith("normalize.");
81 dbTables = mdlConfig.startWith("report.db.name.");
82 Iterator<?> entries = mdlConfig.iterator();
83 while (entries.hasNext()) {
84 String entry = entries.next().toString();
85 if (entry.startsWith("conversion.")) {
86 String[] metrics = entry.split("=");
87 try {
88 float convertNumber = Float.parseFloat(metrics[1]);
89 conversion.put(metrics[0], convertNumber);
90 } catch (NumberFormatException ex) {
91 log.error(metrics[0] + " is not a number.");
92 }
93 }
94 }
95 log.debug("cluster name:" + cluster);
96 if (!cluster.equals("")) {
97 ClusterConfig cc = new ClusterConfig();
98 jdbc_url = cc.getURL(cluster);
99 }
100 try {
101 DatabaseWriter dbWriter = new DatabaseWriter(cluster);
102 conn = dbWriter.getConnection();
103 } catch(Exception ex) {
104 throw new Exception("JDBC URL does not exist for:"+jdbc_url);
105 }
106 log.debug("Initialized JDBC URL: " + jdbc_url);
107 HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
108 Iterator<String> ki = dbNames.keySet().iterator();
109 dbSchema = new HashMap<String, HashMap<String, Integer>>();
110 while (ki.hasNext()) {
111 String recordType = ki.next().toString();
112 String table = dbNames.get(recordType);
113 try {
114 ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
115 HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
116 while(rs.next()) {
117 String name = rs.getString("COLUMN_NAME");
118 int type = rs.getInt("DATA_TYPE");
119 tableSchema.put(name, type);
120 StringBuilder metricName = new StringBuilder();
121 metricName.append("metric.");
122 metricName.append(recordType.substring(15));
123 metricName.append(".");
124 metricName.append(name);
125 String mdlKey = metricName.toString().toLowerCase();
126 if(!transformer.containsKey(mdlKey)) {
127 transformer.put(mdlKey, name);
128 }
129 }
130 rs.close();
131 dbSchema.put(table, tableSchema);
132 } catch (SQLException ex) {
133 log.debug("table: " + table
134 + " template does not exist, MDL will not load data for this table.");
135 }
136 }
137 stmt = conn.createStatement();
138 conn.setAutoCommit(false);
139 }
140
141 public void interrupt() {
142 }
143
144 private String escape(String s, String c) {
145
146 String ns = s.trim();
147 Pattern pattern = Pattern.compile(" +");
148 Matcher matcher = pattern.matcher(ns);
149 String s2 = matcher.replaceAll(c);
150
151 return s2;
152
153 }
154
155 public static String escapeQuotes( String s ) {
156 StringBuffer sb = new StringBuffer();
157 int index;
158 int length = s.length();
159 char ch;
160 for( index = 0; index < length; ++index ) {
161 if(( ch = s.charAt( index )) == '\"' ) {
162 sb.append( "\\\"" );
163 } else if( ch == '\\' ) {
164 sb.append( "\\\\" );
165 } else if( ch == '\'' ) {
166 sb.append( "\\'" );
167 } else {
168 sb.append( ch );
169 }
170 }
171 return( sb.toString());
172 }
173
174 public boolean run() throws IOException {
175 boolean first=true;
176 log.info("StreamName: " + source.getName());
177 SequenceFile.Reader reader = null;
178
179 try {
180
181
182 reader = new SequenceFile.Reader(fs, source, conf);
183 } catch (Exception ex) {
184
185 log.error(ex, ex);
186 }
187 long currentTimeMillis = System.currentTimeMillis();
188 boolean isSuccessful = true;
189 String recordType = null;
190
191 ChukwaRecordKey key = new ChukwaRecordKey();
192 ChukwaRecord record = new ChukwaRecord();
193 String cluster = null;
194 int numOfRecords = 0;
195 try {
196 Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
197 int batch = 0;
198 while (reader.next(key, record)) {
199 numOfRecords++;
200 if(first) {
201 try {
202 cluster = RecordUtil.getClusterName(record);
203 initEnv(cluster);
204 first=false;
205 } catch(Exception ex) {
206 log.error("Initialization failed for: "+cluster+". Please check jdbc configuration.");
207 return false;
208 }
209 }
210 String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
211 log.debug("Timestamp: " + record.getTime());
212 log.debug("DataType: " + key.getReduceType());
213
214 String[] fields = record.getFields();
215 String table = null;
216 String[] priKeys = null;
217 HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
218 StringBuilder normKey = new StringBuilder();
219 String node = record.getValue("csource");
220 recordType = key.getReduceType().toLowerCase();
221 String dbKey = "report.db.name." + recordType;
222 Matcher m = p.matcher(recordType);
223 if (dbTables.containsKey(dbKey)) {
224 String tableName = mdlConfig.get(dbKey);
225 if (!RegexUtil.isRegex(tableName)) {
226 log.error("Error parsing 'tableName' as a regex: "
227 + RegexUtil.regexError(tableName));
228 return false;
229 }
230 String[] tmp = mdlConfig.findTableName(tableName, record
231 .getTime(), record.getTime());
232 table = tmp[0];
233 } else if(m.matches()) {
234 String timePartition = "_week";
235 int timeSize = Integer.parseInt(m.group(2));
236 if(timeSize == 5) {
237 timePartition = "_month";
238 } else if(timeSize == 30) {
239 timePartition = "_quarter";
240 } else if(timeSize == 180) {
241 timePartition = "_year";
242 } else if(timeSize == 720) {
243 timePartition = "_decade";
244 }
245 int partition = (int) (record.getTime() / timeSize);
246 StringBuilder tmpDbKey = new StringBuilder();
247 tmpDbKey.append("report.db.name.");
248 tmpDbKey.append(m.group(1));
249 if(dbTables.containsKey(tmpDbKey.toString())) {
250 StringBuilder tmpTable = new StringBuilder();
251 tmpTable.append(dbTables.get(tmpDbKey.toString()));
252 tmpTable.append("_");
253 tmpTable.append(partition);
254 tmpTable.append("_");
255 tmpTable.append(timePartition);
256 table = tmpTable.toString();
257 } else {
258 log.debug(tmpDbKey.toString() + " does not exist.");
259 continue;
260 }
261 } else {
262 log.debug(dbKey + " does not exist.");
263 continue;
264 }
265 log.debug("table name:" + table);
266 try {
267 priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
268 ",");
269 } catch (Exception nullException) {
270 log.debug(ExceptionUtil.getStackTrace(nullException));
271 }
272 for (String field : fields) {
273 String keyName = escape(field.toLowerCase(), newSpace);
274 String keyValue = escape(record.getValue(field).toLowerCase(),
275 newSpace);
276 StringBuilder buildKey = new StringBuilder();
277 buildKey.append("normalize.");
278 buildKey.append(recordType);
279 buildKey.append(".");
280 buildKey.append(keyName);
281 if (normalize.containsKey(buildKey.toString())) {
282 if (normKey.toString().equals("")) {
283 normKey.append(keyName);
284 normKey.append(".");
285 normKey.append(keyValue);
286 } else {
287 normKey.append(".");
288 normKey.append(keyName);
289 normKey.append(".");
290 normKey.append(keyValue);
291 }
292 }
293 StringBuilder normalizedKey = new StringBuilder();
294 normalizedKey.append("metric.");
295 normalizedKey.append(recordType);
296 normalizedKey.append(".");
297 normalizedKey.append(normKey);
298 if (hashReport.containsKey(node)) {
299 HashMap<String, String> tmpHash = hashReport.get(node);
300 tmpHash.put(normalizedKey.toString(), keyValue);
301 hashReport.put(node, tmpHash);
302 } else {
303 HashMap<String, String> tmpHash = new HashMap<String, String>();
304 tmpHash.put(normalizedKey.toString(), keyValue);
305 hashReport.put(node, tmpHash);
306 }
307 }
308 for (String field : fields) {
309 String valueName = escape(field.toLowerCase(), newSpace);
310 String valueValue = escape(record.getValue(field).toLowerCase(),
311 newSpace);
312 StringBuilder buildKey = new StringBuilder();
313 buildKey.append("metric.");
314 buildKey.append(recordType);
315 buildKey.append(".");
316 buildKey.append(valueName);
317 if (!normKey.toString().equals("")) {
318 buildKey = new StringBuilder();
319 buildKey.append("metric.");
320 buildKey.append(recordType);
321 buildKey.append(".");
322 buildKey.append(normKey);
323 buildKey.append(".");
324 buildKey.append(valueName);
325 }
326 String normalizedKey = buildKey.toString();
327 if (hashReport.containsKey(node)) {
328 HashMap<String, String> tmpHash = hashReport.get(node);
329 tmpHash.put(normalizedKey, valueValue);
330 hashReport.put(node, tmpHash);
331 } else {
332 HashMap<String, String> tmpHash = new HashMap<String, String>();
333 tmpHash.put(normalizedKey, valueValue);
334 hashReport.put(node, tmpHash);
335
336 }
337
338 }
339 Iterator<String> i = hashReport.keySet().iterator();
340 while (i.hasNext()) {
341 Object iteratorNode = i.next();
342 HashMap<String, String> recordSet = hashReport.get(iteratorNode);
343 Iterator<String> fi = recordSet.keySet().iterator();
344
345 StringBuilder sqlPriKeys = new StringBuilder();
346 try {
347 for (String priKey : priKeys) {
348 if (priKey.equals("timestamp")) {
349 sqlPriKeys.append(priKey);
350 sqlPriKeys.append(" = \"");
351 sqlPriKeys.append(sqlTime);
352 sqlPriKeys.append("\"");
353 }
354 if (!priKey.equals(priKeys[priKeys.length - 1])) {
355 sqlPriKeys.append(sqlPriKeys);
356 sqlPriKeys.append(", ");
357 }
358 }
359 } catch (Exception nullException) {
360
361 log.debug(ExceptionUtil.getStackTrace(nullException));
362 }
363
364 StringBuilder sqlValues = new StringBuilder();
365 boolean firstValue = true;
366 while (fi.hasNext()) {
367 String fieldKey = fi.next();
368 if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
369 if (!firstValue) {
370 sqlValues.append(", ");
371 }
372 try {
373 if (dbSchema.get(dbTables.get(dbKey)).get(
374 transformer.get(fieldKey)) == java.sql.Types.VARCHAR
375 || dbSchema.get(dbTables.get(dbKey)).get(
376 transformer.get(fieldKey)) == java.sql.Types.BLOB) {
377 String conversionKey = "conversion." + fieldKey;
378 if (conversion.containsKey(conversionKey)) {
379 sqlValues.append(transformer.get(fieldKey));
380 sqlValues.append("=");
381 sqlValues.append(recordSet.get(fieldKey));
382 sqlValues.append(conversion.get(conversionKey).toString());
383 } else {
384 sqlValues.append(transformer.get(fieldKey));
385 sqlValues.append("=\'");
386 sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
387 sqlValues.append("\'");
388 }
389 } else if (dbSchema.get(dbTables.get(dbKey)).get(
390 transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
391 SimpleDateFormat formatter = new SimpleDateFormat(
392 "yyyy-MM-dd HH:mm:ss");
393 Date recordDate = new Date();
394 recordDate.setTime(Long.parseLong(recordSet
395 .get(fieldKey)));
396 sqlValues.append(transformer.get(fieldKey));
397 sqlValues.append("=\"");
398 sqlValues.append(formatter.format(recordDate));
399 sqlValues.append("\"");
400 } else if (dbSchema.get(dbTables.get(dbKey)).get(
401 transformer.get(fieldKey)) == java.sql.Types.BIGINT
402 || dbSchema.get(dbTables.get(dbKey)).get(
403 transformer.get(fieldKey)) == java.sql.Types.TINYINT
404 || dbSchema.get(dbTables.get(dbKey)).get(
405 transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
406 long tmp = 0;
407 try {
408 tmp = Long.parseLong(recordSet.get(fieldKey).toString());
409 String conversionKey = "conversion." + fieldKey;
410 if (conversion.containsKey(conversionKey)) {
411 tmp = tmp
412 * Long.parseLong(conversion.get(conversionKey)
413 .toString());
414 }
415 } catch (Exception e) {
416 tmp = 0;
417 }
418 sqlValues.append(transformer.get(fieldKey));
419 sqlValues.append("=");
420 sqlValues.append(tmp);
421 } else {
422 double tmp = 0;
423 tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
424 String conversionKey = "conversion." + fieldKey;
425 if (conversion.containsKey(conversionKey)) {
426 tmp = tmp
427 * Double.parseDouble(conversion.get(conversionKey)
428 .toString());
429 }
430 if (Double.isNaN(tmp)) {
431 tmp = 0;
432 }
433 sqlValues.append(transformer.get(fieldKey));
434 sqlValues.append("=");
435 sqlValues.append(tmp);
436 }
437 firstValue = false;
438 } catch (NumberFormatException ex) {
439 String conversionKey = "conversion." + fieldKey;
440 if (conversion.containsKey(conversionKey)) {
441 sqlValues.append(transformer.get(fieldKey));
442 sqlValues.append("=");
443 sqlValues.append(recordSet.get(fieldKey));
444 sqlValues.append(conversion.get(conversionKey).toString());
445 } else {
446 sqlValues.append(transformer.get(fieldKey));
447 sqlValues.append("='");
448 sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
449 sqlValues.append("'");
450 }
451 firstValue = false;
452 } catch (NullPointerException ex) {
453 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
454 + " does not contain valid MDL structure.");
455 }
456 }
457 }
458
459 StringBuilder sql = new StringBuilder();
460 if (sqlPriKeys.length() > 0) {
461 sql.append("INSERT INTO ");
462 sql.append(table);
463 sql.append(" SET ");
464 sql.append(sqlPriKeys.toString());
465 sql.append(",");
466 sql.append(sqlValues.toString());
467 sql.append(" ON DUPLICATE KEY UPDATE ");
468 sql.append(sqlPriKeys.toString());
469 sql.append(",");
470 sql.append(sqlValues.toString());
471 sql.append(";");
472 } else {
473 if(sqlValues.length() > 0) {
474 sql.append("INSERT INTO ");
475 sql.append(table);
476 sql.append(" SET ");
477 sql.append(sqlValues.toString());
478 sql.append(" ON DUPLICATE KEY UPDATE ");
479 sql.append(sqlValues.toString());
480 sql.append(";");
481 }
482 }
483 if(sql.length() > 0) {
484 log.trace(sql);
485
486 if (batchMode) {
487 stmt.addBatch(sql.toString());
488 batch++;
489 } else {
490 stmt.execute(sql.toString());
491 }
492 if (batchMode && batch > 20000) {
493 int[] updateCounts = stmt.executeBatch();
494 log.info("Batch mode inserted=" + updateCounts.length + "records.");
495 batch = 0;
496 }
497 }
498 }
499
500 }
501
502 if (batchMode) {
503 int[] updateCounts = stmt.executeBatch();
504 log.info("Batch mode inserted=" + updateCounts.length + "records.");
505 }
506 } catch (SQLException ex) {
507
508 isSuccessful = false;
509 log.error(ex, ex);
510 log.error("SQLException: " + ex.getMessage());
511 log.error("SQLState: " + ex.getSQLState());
512 log.error("VendorError: " + ex.getErrorCode());
513
514 throw new IOException (ex);
515 } catch (Exception e) {
516 isSuccessful = false;
517 log.error(ExceptionUtil.getStackTrace(e));
518
519 throw new IOException (e);
520 } finally {
521 if (batchMode && conn!=null) {
522 try {
523 conn.commit();
524 log.info("batchMode commit done");
525 } catch (SQLException ex) {
526 log.error(ex, ex);
527 log.error("SQLException: " + ex.getMessage());
528 log.error("SQLState: " + ex.getSQLState());
529 log.error("VendorError: " + ex.getErrorCode());
530 }
531 }
532 long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
533 int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
534 String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
535 log.info(logMsg + " (" + recordType + ","
536 + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
537 if (rs != null) {
538 try {
539 rs.close();
540 } catch (SQLException ex) {
541 log.error(ex, ex);
542 log.error("SQLException: " + ex.getMessage());
543 log.error("SQLState: " + ex.getSQLState());
544 log.error("VendorError: " + ex.getErrorCode());
545 }
546 rs = null;
547 }
548 if (stmt != null) {
549 try {
550 stmt.close();
551 } catch (SQLException ex) {
552 log.error(ex, ex);
553 log.error("SQLException: " + ex.getMessage());
554 log.error("SQLState: " + ex.getSQLState());
555 log.error("VendorError: " + ex.getErrorCode());
556 }
557 stmt = null;
558 }
559 if (conn != null) {
560 try {
561 conn.close();
562 } catch (SQLException ex) {
563 log.error(ex, ex);
564 log.error("SQLException: " + ex.getMessage());
565 log.error("SQLState: " + ex.getSQLState());
566 log.error("VendorError: " + ex.getErrorCode());
567 }
568 conn = null;
569 }
570
571 if (reader != null) {
572 try {
573 reader.close();
574 } catch (Exception e) {
575 log.warn("Could not close SequenceFile.Reader:" ,e);
576 }
577 reader = null;
578 }
579 }
580 return true;
581 }
582
583 public Boolean call() throws IOException {
584 return run();
585 }
586
587
588 public static void main(String[] args) {
589 try {
590 conf = new ChukwaConfiguration();
591 fs = FileSystem.get(conf);
592 MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
593 mdl.run();
594 } catch (Exception e) {
595 e.printStackTrace();
596 }
597 }
598
599 }