This project has retired. For details please refer to its Attic page.
RecordDS xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
20  
21  
22  import java.io.IOException;
23  import java.text.SimpleDateFormat;
24  import java.util.Calendar;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.TreeMap;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.Record;
30  import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
31  import org.apache.hadoop.chukwa.extraction.engine.Token;
32  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
33  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
34  import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
35  import org.apache.hadoop.fs.FileSystem;
36  
37  public class RecordDS implements DataSource {
38  
39    private static FileSystem fs = null;
40    private static ChukwaConfiguration conf = null;
41  
42    private static String rootFolder = null;
43    private static DataConfig dataConfig = null;
44  
45    static {
46      dataConfig = new DataConfig();
47      rootFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
48      conf = new ChukwaConfiguration();
49      try {
50        fs = FileSystem.get(conf);
51      } catch (IOException e) {
52        e.printStackTrace();
53      }
54    }
55  
56    public SearchResult search(SearchResult result, String cluster,
57        String dataSource, long t0, long t1, String filter, Token token)
58        throws DataSourceException {
59  
60      String filePath = rootFolder + "/" + cluster + "/" + dataSource;
61  
62      System.out.println("filePath [" + filePath + "]");
63      Calendar calendar = Calendar.getInstance();
64      calendar.setTimeInMillis(t1);
65  
66      TreeMap<Long, List<Record>> records = result.getRecords();
67      int maxCount = 200;
68      SimpleDateFormat sdf = new java.text.SimpleDateFormat("_yyyyMMdd_HH_");
69      do {
70        System.out.println("start Date [" + calendar.getTime() + "]");
71        String fileName = sdf.format(calendar.getTime());
72        int minutes = calendar.get(Calendar.MINUTE);
73        int dec = minutes / 10;
74        fileName += dec;
75  
76        int m = minutes - (dec * 10);
77        if (m < 5) {
78          fileName += "0.1.evt";
79        } else {
80          fileName += "5.1.evt";
81        }
82  
83        fileName = filePath + "/" + dataSource + fileName;
84  
85        // System.out.println("JB fileName  [" +fileName + "]");
86  
87        try {
88          System.out.println("BEFORE fileName  [" + fileName + "]");
89  
90          // List<Record> evts =
91          // ChukwaFileParser.readData(cluster,dataSource,maxCount, t1, t0,
92          // Long.MAX_VALUE, filter, fileName, fs);
93          List<Record> evts = ChukwaSequenceFileParser.readData(cluster,
94              dataSource, maxCount, t1, t0, Long.MAX_VALUE, filter, fileName, fs,
95              conf);
96  
97          maxCount = maxCount - evts.size();
98          System.out.println("AFTER fileName  [" + fileName + "] count="
99              + evts.size() + " maxCount=" + maxCount);
100         for (Record evt : evts) {
101           System.out.println("AFTER Loop  [" + evt.toString() + "]");
102           long timestamp = evt.getTime();
103           if (records.containsKey(timestamp)) {
104             records.get(timestamp).add(evt);
105           } else {
106             List<Record> list = new LinkedList<Record>();
107             list.add(evt);
108             records.put(timestamp, list);
109           }
110         }
111       } catch (Exception e) {
112         e.printStackTrace();
113       }
114 
115       if (maxCount <= 0) {
116         System.out.println("BREAKING LOOP AFTER [" + fileName + "] maxCount="
117             + maxCount);
118         break;
119       }
120 
121       calendar.add(Calendar.MINUTE, -5);
122 
123       System.out.println("calendar  [" + calendar.getTimeInMillis() + "] ");
124       System.out.println("end       [" + (t0 - 1000 * 60 * 5) + "] ");
125     } while (calendar.getTimeInMillis() > (t0 - 1000 * 60 * 5)); // <= need some
126                                                                  // code here
127     // Need more than this to compute the end
128 
129     return result;
130   }
131 
132   public boolean isThreadSafe() {
133     return true;
134   }
135 }