This project has retired. For details please refer to its Attic page.
Aggregator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.database;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.FileReader;
24  import java.io.IOException;
25  import java.text.ParsePosition;
26  import java.text.SimpleDateFormat;
27  import java.util.Calendar;
28  import java.util.Date;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.chukwa.util.DatabaseWriter;
32  
33  @SuppressWarnings("unused")
34  public class Aggregator {
35  
36    private static Log log = LogFactory.getLog(Aggregator.class);
37    private String table = null;
38    private String jdbc = null;
39    private int[] intervals;
40    private long current = 0;
41    private static DatabaseWriter db = null;
42  
43    public Aggregator() {
44      Calendar now = Calendar.getInstance();
45      current = now.getTimeInMillis();
46    }
47  
48    public static String getContents(File aFile) {
49      StringBuffer contents = new StringBuffer();
50      try {
51        BufferedReader input = new BufferedReader(new FileReader(aFile));
52        try {
53          String line = null; // not declared within while loop
54          while ((line = input.readLine()) != null) {
55            contents.append(line);
56            contents.append(System.getProperty("line.separator"));
57          }
58        } finally {
59          input.close();
60        }
61      } catch (IOException ex) {
62        ex.printStackTrace();
63      }
64      return contents.toString();
65    }
66  
67    public void process(long start, long end, String query) throws Throwable {
68      try {
69        Macro macroProcessor = new Macro(start, end, query);
70        query = macroProcessor.toString();
71        db.execute(query);
72      } catch(Exception e) {
73        log.error("Query: "+query);
74        throw new Exception("Aggregation failed for: "+query);
75      }
76    }
77  
78    public void process(String query) throws Throwable {
79      long start = current;
80      long end = current;
81      process(current, current, query);
82    }
83  
84    public void setWriter(DatabaseWriter dbw) {
85      db = dbw;
86    }
87  
88    public static void main(String[] args) {
89      long startTime = 0;
90      long endTime = 0;
91      long aggregatorStart = Calendar.getInstance().getTimeInMillis();
92      long longest = 0;
93      if(args.length>=4) {
94        ParsePosition pp = new ParsePosition(0);
95        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
96        String buffer = args[0]+" "+args[1];
97        Date tmp = format.parse(buffer, pp);
98        startTime = tmp.getTime();
99        buffer = args[2]+" "+args[3];
100       pp = new ParsePosition(0);
101       tmp = format.parse(buffer, pp);
102       endTime = tmp.getTime();
103     }
104     String longQuery = null;
105     log.info("Aggregator started.");
106     String cluster = System.getProperty("CLUSTER");
107     if (cluster == null) {
108       cluster = "unknown";
109     }
110     db = new DatabaseWriter(cluster);
111     String queries = Aggregator.getContents(new File(System
112         .getenv("CHUKWA_CONF_DIR")
113         + File.separator + "aggregator.sql"));
114     String[] query = queries.split("\n");
115     while(startTime<=endTime) {
116       for (int i = 0; i < query.length; i++) {
117         if (query[i].indexOf("#") == 0) {
118           log.debug("skipping: " + query[i]);
119         } else if(!query[i].equals("")) {
120           Aggregator dba = new Aggregator();
121           long start = Calendar.getInstance().getTimeInMillis();
122           try {
123             if(startTime!=0 && endTime!=0) {
124               dba.process(startTime, startTime, query[i]);
125             } else {
126               dba.process(query[i]);
127             }
128           } catch(Throwable e) {
129             log.error("Invalid query:"+query[i]);
130           }
131           long end = Calendar.getInstance().getTimeInMillis();
132           long duration = end - start;
133           if (duration >= longest) {
134             longest = duration;
135             longQuery = query[i];
136           }
137         }
138       }
139       startTime = startTime + 5*60000;
140     }
141     db.close();
142     long aggregatorEnd = Calendar.getInstance().getTimeInMillis();
143     log.info("Longest running query: " + longQuery + " (" + (double) longest
144         / 1000 + " seconds)");
145     log.info("Total running time: ("+(double) (aggregatorEnd-aggregatorStart)/1000+" seconds)");
146     log.info("Aggregator finished.");
147   }
148 
149 }