This project has retired. For details please refer to its
Attic page.
Aggregator xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.database;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileReader;
24 import java.io.IOException;
25 import java.text.ParsePosition;
26 import java.text.SimpleDateFormat;
27 import java.util.Calendar;
28 import java.util.Date;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.chukwa.util.DatabaseWriter;
32
33 @SuppressWarnings("unused")
34 public class Aggregator {
35
36 private static Log log = LogFactory.getLog(Aggregator.class);
37 private String table = null;
38 private String jdbc = null;
39 private int[] intervals;
40 private long current = 0;
41 private static DatabaseWriter db = null;
42
43 public Aggregator() {
44 Calendar now = Calendar.getInstance();
45 current = now.getTimeInMillis();
46 }
47
48 public static String getContents(File aFile) {
49 StringBuffer contents = new StringBuffer();
50 try {
51 BufferedReader input = new BufferedReader(new FileReader(aFile));
52 try {
53 String line = null;
54 while ((line = input.readLine()) != null) {
55 contents.append(line);
56 contents.append(System.getProperty("line.separator"));
57 }
58 } finally {
59 input.close();
60 }
61 } catch (IOException ex) {
62 ex.printStackTrace();
63 }
64 return contents.toString();
65 }
66
67 public void process(long start, long end, String query) throws Throwable {
68 try {
69 Macro macroProcessor = new Macro(start, end, query);
70 query = macroProcessor.toString();
71 db.execute(query);
72 } catch(Exception e) {
73 log.error("Query: "+query);
74 throw new Exception("Aggregation failed for: "+query);
75 }
76 }
77
78 public void process(String query) throws Throwable {
79 long start = current;
80 long end = current;
81 process(current, current, query);
82 }
83
84 public void setWriter(DatabaseWriter dbw) {
85 db = dbw;
86 }
87
88 public static void main(String[] args) {
89 long startTime = 0;
90 long endTime = 0;
91 long aggregatorStart = Calendar.getInstance().getTimeInMillis();
92 long longest = 0;
93 if(args.length>=4) {
94 ParsePosition pp = new ParsePosition(0);
95 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
96 String buffer = args[0]+" "+args[1];
97 Date tmp = format.parse(buffer, pp);
98 startTime = tmp.getTime();
99 buffer = args[2]+" "+args[3];
100 pp = new ParsePosition(0);
101 tmp = format.parse(buffer, pp);
102 endTime = tmp.getTime();
103 }
104 String longQuery = null;
105 log.info("Aggregator started.");
106 String cluster = System.getProperty("CLUSTER");
107 if (cluster == null) {
108 cluster = "unknown";
109 }
110 db = new DatabaseWriter(cluster);
111 String queries = Aggregator.getContents(new File(System
112 .getenv("CHUKWA_CONF_DIR")
113 + File.separator + "aggregator.sql"));
114 String[] query = queries.split("\n");
115 while(startTime<=endTime) {
116 for (int i = 0; i < query.length; i++) {
117 if (query[i].indexOf("#") == 0) {
118 log.debug("skipping: " + query[i]);
119 } else if(!query[i].equals("")) {
120 Aggregator dba = new Aggregator();
121 long start = Calendar.getInstance().getTimeInMillis();
122 try {
123 if(startTime!=0 && endTime!=0) {
124 dba.process(startTime, startTime, query[i]);
125 } else {
126 dba.process(query[i]);
127 }
128 } catch(Throwable e) {
129 log.error("Invalid query:"+query[i]);
130 }
131 long end = Calendar.getInstance().getTimeInMillis();
132 long duration = end - start;
133 if (duration >= longest) {
134 longest = duration;
135 longQuery = query[i];
136 }
137 }
138 }
139 startTime = startTime + 5*60000;
140 }
141 db.close();
142 long aggregatorEnd = Calendar.getInstance().getTimeInMillis();
143 log.info("Longest running query: " + longQuery + " (" + (double) longest
144 / 1000 + " seconds)");
145 log.info("Total running time: ("+(double) (aggregatorEnd-aggregatorStart)/1000+" seconds)");
146 log.info("Aggregator finished.");
147 }
148
149 }