This project has retired. For details please refer to its
Attic page.
RecordDS xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
20
21
22 import java.io.IOException;
23 import java.text.SimpleDateFormat;
24 import java.util.Calendar;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.TreeMap;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.Record;
30 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
31 import org.apache.hadoop.chukwa.extraction.engine.Token;
32 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
33 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
34 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
35 import org.apache.hadoop.fs.FileSystem;
36
37 public class RecordDS implements DataSource {
38
39 private static FileSystem fs = null;
40 private static ChukwaConfiguration conf = null;
41
42 private static String rootFolder = null;
43 private static DataConfig dataConfig = null;
44
45 static {
46 dataConfig = new DataConfig();
47 rootFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
48 conf = new ChukwaConfiguration();
49 try {
50 fs = FileSystem.get(conf);
51 } catch (IOException e) {
52 e.printStackTrace();
53 }
54 }
55
56 public SearchResult search(SearchResult result, String cluster,
57 String dataSource, long t0, long t1, String filter, Token token)
58 throws DataSourceException {
59
60 String filePath = rootFolder + "/" + cluster + "/" + dataSource;
61
62 System.out.println("filePath [" + filePath + "]");
63 Calendar calendar = Calendar.getInstance();
64 calendar.setTimeInMillis(t1);
65
66 TreeMap<Long, List<Record>> records = result.getRecords();
67 int maxCount = 200;
68 SimpleDateFormat sdf = new java.text.SimpleDateFormat("_yyyyMMdd_HH_");
69 do {
70 System.out.println("start Date [" + calendar.getTime() + "]");
71 String fileName = sdf.format(calendar.getTime());
72 int minutes = calendar.get(Calendar.MINUTE);
73 int dec = minutes / 10;
74 fileName += dec;
75
76 int m = minutes - (dec * 10);
77 if (m < 5) {
78 fileName += "0.1.evt";
79 } else {
80 fileName += "5.1.evt";
81 }
82
83 fileName = filePath + "/" + dataSource + fileName;
84
85
86
87 try {
88 System.out.println("BEFORE fileName [" + fileName + "]");
89
90
91
92
93 List<Record> evts = ChukwaSequenceFileParser.readData(cluster,
94 dataSource, maxCount, t1, t0, Long.MAX_VALUE, filter, fileName, fs,
95 conf);
96
97 maxCount = maxCount - evts.size();
98 System.out.println("AFTER fileName [" + fileName + "] count="
99 + evts.size() + " maxCount=" + maxCount);
100 for (Record evt : evts) {
101 System.out.println("AFTER Loop [" + evt.toString() + "]");
102 long timestamp = evt.getTime();
103 if (records.containsKey(timestamp)) {
104 records.get(timestamp).add(evt);
105 } else {
106 List<Record> list = new LinkedList<Record>();
107 list.add(evt);
108 records.put(timestamp, list);
109 }
110 }
111 } catch (Exception e) {
112 e.printStackTrace();
113 }
114
115 if (maxCount <= 0) {
116 System.out.println("BREAKING LOOP AFTER [" + fileName + "] maxCount="
117 + maxCount);
118 break;
119 }
120
121 calendar.add(Calendar.MINUTE, -5);
122
123 System.out.println("calendar [" + calendar.getTimeInMillis() + "] ");
124 System.out.println("end [" + (t0 - 1000 * 60 * 5) + "] ");
125 } while (calendar.getTimeInMillis() > (t0 - 1000 * 60 * 5));
126
127
128
129 return result;
130 }
131
132 public boolean isThreadSafe() {
133 return true;
134 }
135 }