This project has retired. For details please refer to its Attic page.
Aggregator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.database;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.FileReader;
25  import java.io.IOException;
26  import java.io.InputStreamReader;
27  import java.nio.charset.Charset;
28  import java.text.ParsePosition;
29  import java.text.SimpleDateFormat;
30  import java.util.Calendar;
31  import java.util.Date;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.chukwa.util.DatabaseWriter;
36  
37  @SuppressWarnings("unused")
38  public class Aggregator {
39  
40    private static Log log = LogFactory.getLog(Aggregator.class);
41    private String table = null;
42    private String jdbc = null;
43    private int[] intervals;
44    private long current = 0;
45    private DatabaseWriter db = null;
46  
47    public Aggregator() {
48      Calendar now = Calendar.getInstance();
49      current = now.getTimeInMillis();
50    }
51  
52    public static String getContents(File aFile) {
53      StringBuffer contents = new StringBuffer();
54      try {
55        BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
56        try {
57          String line = null; // not declared within while loop
58          while ((line = input.readLine()) != null) {
59            contents.append(line);
60            contents.append(System.getProperty("line.separator"));
61          }
62        } finally {
63          input.close();
64        }
65      } catch (IOException ex) {
66        ex.printStackTrace();
67      }
68      return contents.toString();
69    }
70  
71    public void process(long start, long end, String query) throws Throwable {
72      try {
73        Macro macroProcessor = new Macro(start, end, query);
74        query = macroProcessor.toString();
75        db.execute(query);
76      } catch(Exception e) {
77        log.error("Query: "+query);
78        throw new Exception("Aggregation failed for: "+query);
79      } finally {
80        db.close();
81      }
82    }
83  
84    public void process(String query) throws Throwable {
85      long start = current;
86      long end = current;
87      process(current, current, query);
88    }
89  
90    public void setWriter(DatabaseWriter dbw) {
91      db = dbw;
92    }
93  
94    public static void main(String[] args) {
95      long startTime = 0;
96      long endTime = 0;
97      long aggregatorStart = Calendar.getInstance().getTimeInMillis();
98      long longest = 0;
99      if(args.length>=4) {
100       ParsePosition pp = new ParsePosition(0);
101       SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
102       String buffer = args[0]+" "+args[1];
103       Date tmp = format.parse(buffer, pp);
104       startTime = tmp.getTime();
105       buffer = args[2]+" "+args[3];
106       pp = new ParsePosition(0);
107       tmp = format.parse(buffer, pp);
108       endTime = tmp.getTime();
109     }
110     String longQuery = null;
111     log.info("Aggregator started.");
112     String cluster = System.getProperty("CLUSTER");
113     if (cluster == null) {
114       cluster = "unknown";
115     }
116     String queries = Aggregator.getContents(new File(System
117         .getenv("CHUKWA_CONF_DIR")
118         + File.separator + "aggregator.sql"));
119     String[] query = queries.split("\n");
120     while(startTime<=endTime) {
121       for (int i = 0; i < query.length; i++) {
122         if (query[i].indexOf("#") == 0) {
123           log.debug("skipping: " + query[i]);
124         } else if(!query[i].equals("")) {
125           Aggregator dba = new Aggregator();
126           dba.setWriter(new DatabaseWriter(cluster));
127           long start = Calendar.getInstance().getTimeInMillis();
128           try {
129             if(startTime!=0 && endTime!=0) {
130               dba.process(startTime, startTime, query[i]);
131             } else {
132               dba.process(query[i]);
133             }
134           } catch(Throwable e) {
135             log.error("Invalid query:"+query[i]);
136           }
137           long end = Calendar.getInstance().getTimeInMillis();
138           long duration = end - start;
139           if (duration >= longest) {
140             longest = duration;
141             longQuery = query[i];
142           }
143         }
144       }
145       startTime = startTime + 5*60000;
146     }
147     long aggregatorEnd = Calendar.getInstance().getTimeInMillis();
148     log.info("Longest running query: " + longQuery + " (" + (double) longest
149         / 1000 + " seconds)");
150     log.info("Total running time: ("+(double) (aggregatorEnd-aggregatorStart)/1000+" seconds)");
151     log.info("Aggregator finished.");
152   }
153 
154 }