This project has retired. For details please refer to its
Attic page.
Aggregator xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.database;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.FileReader;
25 import java.io.IOException;
26 import java.io.InputStreamReader;
27 import java.nio.charset.Charset;
28 import java.text.ParsePosition;
29 import java.text.SimpleDateFormat;
30 import java.util.Calendar;
31 import java.util.Date;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.chukwa.util.DatabaseWriter;
36
37 @SuppressWarnings("unused")
38 public class Aggregator {
39
40 private static Log log = LogFactory.getLog(Aggregator.class);
41 private String table = null;
42 private String jdbc = null;
43 private int[] intervals;
44 private long current = 0;
45 private DatabaseWriter db = null;
46
47 public Aggregator() {
48 Calendar now = Calendar.getInstance();
49 current = now.getTimeInMillis();
50 }
51
52 public static String getContents(File aFile) {
53 StringBuffer contents = new StringBuffer();
54 try {
55 BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
56 try {
57 String line = null;
58 while ((line = input.readLine()) != null) {
59 contents.append(line);
60 contents.append(System.getProperty("line.separator"));
61 }
62 } finally {
63 input.close();
64 }
65 } catch (IOException ex) {
66 ex.printStackTrace();
67 }
68 return contents.toString();
69 }
70
71 public void process(long start, long end, String query) throws Throwable {
72 try {
73 Macro macroProcessor = new Macro(start, end, query);
74 query = macroProcessor.toString();
75 db.execute(query);
76 } catch(Exception e) {
77 log.error("Query: "+query);
78 throw new Exception("Aggregation failed for: "+query);
79 } finally {
80 db.close();
81 }
82 }
83
84 public void process(String query) throws Throwable {
85 long start = current;
86 long end = current;
87 process(current, current, query);
88 }
89
90 public void setWriter(DatabaseWriter dbw) {
91 db = dbw;
92 }
93
94 public static void main(String[] args) {
95 long startTime = 0;
96 long endTime = 0;
97 long aggregatorStart = Calendar.getInstance().getTimeInMillis();
98 long longest = 0;
99 if(args.length>=4) {
100 ParsePosition pp = new ParsePosition(0);
101 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
102 String buffer = args[0]+" "+args[1];
103 Date tmp = format.parse(buffer, pp);
104 startTime = tmp.getTime();
105 buffer = args[2]+" "+args[3];
106 pp = new ParsePosition(0);
107 tmp = format.parse(buffer, pp);
108 endTime = tmp.getTime();
109 }
110 String longQuery = null;
111 log.info("Aggregator started.");
112 String cluster = System.getProperty("CLUSTER");
113 if (cluster == null) {
114 cluster = "unknown";
115 }
116 String queries = Aggregator.getContents(new File(System
117 .getenv("CHUKWA_CONF_DIR")
118 + File.separator + "aggregator.sql"));
119 String[] query = queries.split("\n");
120 while(startTime<=endTime) {
121 for (int i = 0; i < query.length; i++) {
122 if (query[i].indexOf("#") == 0) {
123 log.debug("skipping: " + query[i]);
124 } else if(!query[i].equals("")) {
125 Aggregator dba = new Aggregator();
126 dba.setWriter(new DatabaseWriter(cluster));
127 long start = Calendar.getInstance().getTimeInMillis();
128 try {
129 if(startTime!=0 && endTime!=0) {
130 dba.process(startTime, startTime, query[i]);
131 } else {
132 dba.process(query[i]);
133 }
134 } catch(Throwable e) {
135 log.error("Invalid query:"+query[i]);
136 }
137 long end = Calendar.getInstance().getTimeInMillis();
138 long duration = end - start;
139 if (duration >= longest) {
140 longest = duration;
141 longQuery = query[i];
142 }
143 }
144 }
145 startTime = startTime + 5*60000;
146 }
147 long aggregatorEnd = Calendar.getInstance().getTimeInMillis();
148 log.info("Longest running query: " + longQuery + " (" + (double) longest
149 / 1000 + " seconds)");
150 log.info("Total running time: ("+(double) (aggregatorEnd-aggregatorStart)/1000+" seconds)");
151 log.info("Aggregator finished.");
152 }
153
154 }