This project has retired. For details please refer to its
Attic page.
ChukwaRecordDataSource xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
19
20
21 import java.io.IOException;
22 import java.text.SimpleDateFormat;
23 import java.util.ArrayList;
24 import java.util.Calendar;
25 import java.util.Date;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.TreeMap;
32
33 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
34 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
35 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
36 import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
37 import org.apache.hadoop.chukwa.extraction.engine.Record;
38 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
39 import org.apache.hadoop.chukwa.extraction.engine.Token;
40 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
41 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
42 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.io.SequenceFile;
46 import org.apache.log4j.Logger;
47
48 public class ChukwaRecordDataSource implements DataSource {
49
50
51
52 static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
53
54 private static final int dayFolder = 100;
55 private static final int hourFolder = 200;
56 private static final int rawFolder = 300;
57
58 static final String[] raws = { "0", "5", "10", "15", "20", "25", "30", "35",
59 "40", "45", "50", "55" };
60
61 private static FileSystem fs = null;
62 private static ChukwaConfiguration conf = null;
63
64 private static String rootDsFolder = null;
65 private static DataConfig dataConfig = null;
66
67 static {
68 dataConfig = new DataConfig();
69 rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
70 conf = new ChukwaConfiguration();
71 try {
72 fs = FileSystem.get(conf);
73 } catch (IOException e) {
74 e.printStackTrace();
75 }
76 }
77
78 @Override
79 public boolean isThreadSafe() {
80 return true;
81 }
82
83 @Override
84 public SearchResult search(SearchResult result, String cluster,
85 String dataSource, long t0, long t1, String filter, Token token)
86 throws DataSourceException {
87 String filePath = rootDsFolder + "/" + cluster + "/";
88
89 log.debug("filePath [" + filePath + "]");
90 Calendar calendar = Calendar.getInstance();
91 calendar.setTimeInMillis(t0);
92 SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
93 int maxCount = 200;
94
95 List<Record> records = new ArrayList<Record>();
96
97 ChukwaDSInternalResult res = new ChukwaDSInternalResult();
98
99 if (token != null) {
100
101
102 try {
103 String[] vars = token.key.split("\\|");
104 res.day = vars[0];
105 res.hour = Integer.parseInt(vars[1]);
106 res.rawIndex = Integer.parseInt(vars[2]);
107 res.spill = Integer.parseInt(vars[3]);
108 res.currentTs = Long.parseLong(vars[4]);
109 res.position = Long.parseLong(vars[5]);
110 res.fileName = vars[5];
111 log.info("Token is not null! :" + token.key);
112 } catch (Exception e) {
113 log.error("Incalid Key: [" + token.key + "] exception: ", e);
114 }
115 } else {
116 log.debug("Token is null!");
117 }
118
119 try {
120 do {
121 log.debug("start Date [" + calendar.getTime() + "]");
122 String workingDay = sdf.format(calendar.getTime());
123 int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
124 int startRawIndex = 0;
125 if (token != null) {
126 workingDay = res.day;
127 workingHour = res.hour;
128 startRawIndex = res.rawIndex;
129 } else {
130 token = new Token();
131 }
132
133 log.debug("workingDay " + workingDay);
134 log.debug("workingHour " + workingHour);
135
136 if (exist(dayFolder, filePath, dataSource, workingDay, null, null)) {
137
138 if (containsRotateFlag(dayFolder, filePath, dataSource, workingDay,
139 null)) {
140
141
142 log.debug("fs.exists(workingDayRotatePath) ");
143 extractRecords(res, ChukwaRecordDataSource.dayFolder, filePath,
144 dataSource, workingDay, null, -1, token, records, maxCount, t0,
145 t1, filter);
146 maxCount = maxCount - records.size();
147 if ((maxCount <= 0) || (res.currentTs > t1)) {
148 break;
149 }
150
151 }
152 else
153 {
154 log.debug("check for hours");
155 for (int hour = 0; hour < 24; hour++) {
156 if (workingDay.equals(res.day) && hour < workingHour) {
157 continue;
158 }
159 log.debug(" Hour? -->" + filePath + dataSource + "/"
160 + workingDay + "/" + hour);
161 if (exist(dayFolder, filePath, dataSource, workingDay, "" + hour,
162 null)) {
163 if (containsRotateFlag(dayFolder, filePath, dataSource,
164 workingDay, "" + hour)) {
165
166
167 extractRecords(res, ChukwaRecordDataSource.hourFolder,
168 filePath, dataSource, workingDay, "" + hour, -1, token,
169 records, maxCount, t0, t1, filter);
170 } else
171 {
172 log.debug("Working on Raw");
173
174 for (int rawIndex = startRawIndex; rawIndex < 12; rawIndex++) {
175
176
177
178 if (exist(dayFolder, filePath, dataSource, workingDay, ""
179 + hour, raws[rawIndex])) {
180 extractRecords(res, ChukwaRecordDataSource.rawFolder,
181 filePath, dataSource, workingDay, "" + hour,
182 rawIndex, token, records, maxCount, t0, t1, filter);
183 maxCount = maxCount - records.size();
184 if ((maxCount <= 0) || (res.currentTs > t1)) {
185 break;
186 }
187 } else {
188 log.debug("<<<<<<<<<Working on Raw Not exist--> "
189 + filePath + dataSource + "/" + workingDay + "/"
190 + workingHour + "/" + raws[rawIndex]);
191 }
192 res.spill = 1;
193 }
194 }
195 }
196
197
198 maxCount = maxCount - records.size();
199 if ((maxCount <= 0) || (res.currentTs > t1)) {
200 break;
201 }
202
203 }
204 }
205 }
206
207 maxCount = maxCount - records.size();
208 if ((maxCount <= 0) || (res.currentTs > t1)) {
209 break;
210 }
211
212
213 calendar.add(Calendar.DAY_OF_MONTH, +1);
214 calendar.set(Calendar.HOUR_OF_DAY, 0);
215 calendar.set(Calendar.MINUTE, 0);
216 calendar.set(Calendar.SECOND, 0);
217
218 } while (calendar.getTimeInMillis() < t1);
219
220 } catch (Exception e) {
221 e.printStackTrace();
222 throw new DataSourceException(e);
223 }
224
225 TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
226 for (Record record : records) {
227 long timestamp = record.getTime();
228 if (recordsInResult.containsKey(timestamp)) {
229 recordsInResult.get(timestamp).add(record);
230 } else {
231 List<Record> list = new LinkedList<Record>();
232 list.add(record);
233 recordsInResult.put(timestamp, list);
234 }
235 }
236 result.setToken(token);
237 return result;
238
239 }
240
241 public void extractRecords(ChukwaDSInternalResult res, int directoryType,
242 String rootFolder, String dataSource, String day, String hour,
243 int rawIndex, Token token, List<Record> records, int maxRows, long t0,
244 long t1, String filter) throws Exception {
245
246
247 int spill = res.spill;
248
249 boolean workdone = false;
250 do {
251 String fileName = buildFileName(directoryType, rootFolder, dataSource,
252 spill, day, hour, rawIndex);
253 log.debug("extractRecords : " + fileName);
254
255 if (fs.exists(new Path(fileName))) {
256 readData(res, token, fileName, maxRows, t0, t1, filter);
257 res.spill = spill;
258 List<Record> localRecords = res.records;
259 log.debug("localRecords size : " + localRecords.size());
260 maxRows = maxRows - localRecords.size();
261 if (maxRows <= 0) {
262 workdone = true;
263 }
264 records.addAll(localRecords);
265 log.debug("AFTER fileName [" + fileName + "] count="
266 + localRecords.size() + " maxCount=" + maxRows);
267 spill++;
268 } else {
269
270 workdone = true;
271 }
272 } while (!workdone);
273 token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|"
274 + res.currentTs + "|" + res.position + "|" + res.fileName;
275 }
276
277 public void readData(ChukwaDSInternalResult res, Token token,
278 String fileName, int maxRows, long t0, long t1, String filter)
279 throws Exception {
280 List<Record> records = new LinkedList<Record>();
281 res.records = records;
282 SequenceFile.Reader r = null;
283 if (filter != null) {
284 filter = filter.toLowerCase();
285 }
286
287 try {
288
289 if (!fs.exists(new Path(fileName))) {
290 log.debug("fileName not there!");
291 return;
292 }
293 log.debug("Parser Open [" + fileName + "]");
294
295 long timestamp = 0;
296 int listSize = 0;
297 ChukwaRecordKey key = new ChukwaRecordKey();
298 ChukwaRecord record = new ChukwaRecord();
299
300 r = new SequenceFile.Reader(fs, new Path(fileName), conf);
301
302 log.debug("readData Open2 [" + fileName + "]");
303 if ((fileName.equals(res.fileName)) && (res.position != -1)) {
304 r.seek(res.position);
305 }
306 res.fileName = fileName;
307
308 while (r.next(key, record)) {
309 if (record != null) {
310 res.position = r.getPosition();
311
312 timestamp = record.getTime();
313 res.currentTs = timestamp;
314 log.debug("\nSearch for startDate: " + new Date(t0) + " is :"
315 + new Date(timestamp));
316
317 if (timestamp < t0) {
318
319 continue;
320 } else if (timestamp < t1) {
321 log.debug("In Range: " + record.toString());
322 boolean valid = false;
323
324 if ((filter == null || filter.equals(""))) {
325 valid = true;
326 } else if (isValid(record, filter)) {
327 valid = true;
328 }
329
330 if (valid) {
331 records.add(record);
332 record = new ChukwaRecord();
333 listSize = records.size();
334 if (listSize >= maxRows) {
335
336
337 token.key = key.getKey();
338 token.hasMore = true;
339 break;
340 }
341 } else {
342 log.debug("In Range ==================>>>>>>>>> OUT Regex: "
343 + record);
344 }
345 } else {
346 log.debug("Line out of range. Stopping now: " + record);
347
348 token.key = key.getKey();
349 token.hasMore = false;
350 break;
351 }
352 }
353 }
354
355 } catch (IOException e) {
356 e.printStackTrace();
357 } finally {
358 try {
359 r.close();
360 } catch (Exception e) {
361 }
362 }
363 }
364
365 public boolean containsRotateFlag(int directoryType, String rootFolder,
366 String dataSource, String workingDay, String workingHour)
367 throws Exception {
368 boolean contains = false;
369 switch (directoryType) {
370 case ChukwaRecordDataSource.dayFolder:
371
372 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
373 + "/rotateDone"));
374 break;
375
376 case ChukwaRecordDataSource.hourFolder:
377
378 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
379 + "/" + workingHour + "/rotateDone"));
380 break;
381 default:
382 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
383 + "/rotateDone"));
384 break;
385 }
386 return contains;
387 }
388
389 public boolean exist(int directoryType, String rootFolder, String dataSource,
390 String workingDay, String workingHour, String raw) throws Exception {
391 boolean contains = false;
392 switch (directoryType) {
393 case ChukwaRecordDataSource.dayFolder:
394
395 contains = fs
396 .exists(new Path(rootFolder + dataSource + "/" + workingDay));
397 break;
398
399 case ChukwaRecordDataSource.hourFolder:
400
401 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
402 + "/" + workingHour));
403 break;
404 case ChukwaRecordDataSource.rawFolder:
405
406 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
407 + "/" + workingHour + "/" + raw));
408 break;
409 default:
410 contains = fs
411 .exists(new Path(rootFolder + dataSource + "/" + workingDay));
412 break;
413 }
414 return contains;
415 }
416
417 protected boolean isValid(ChukwaRecord record, String filter) {
418 String[] fields = record.getFields();
419 for (String field : fields) {
420 if (record.getValue(field).toLowerCase().indexOf(filter) >= 0) {
421 return true;
422 }
423 }
424 return false;
425 }
426
427 public String buildFileName(int directoryType, String rootFolder,
428 String dataSource, int spill, String day, String hour, int rawIndex) {
429 String fileName = null;
430
431
432
433 switch (directoryType) {
434 case ChukwaRecordDataSource.dayFolder:
435
436 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
437 + "_" + day + "." + spill + ".evt";
438 break;
439
440 case ChukwaRecordDataSource.hourFolder:
441
442 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
443 + dataSource + "_" + day + "_" + hour + "." + spill + ".evt";
444 break;
445
446 case ChukwaRecordDataSource.rawFolder:
447
448 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
449 + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
450 + raws[rawIndex] + "." + spill + ".evt";
451 break;
452 default:
453 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
454 + "_" + day + "." + spill + ".evt";
455 break;
456 }
457 log.debug("buildFileName :" + fileName);
458 return fileName;
459 }
460
461 public static void main(String[] args) throws DataSourceException {
462 ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
463 SearchResult result = new ChukwaSearchResult();
464 result.setRecords(new TreeMap<Long, List<Record>>());
465 String cluster = args[0];
466 String dataSource = args[1];
467 long t0 = Long.parseLong(args[2]);
468 long t1 = Long.parseLong(args[3]);
469 String filter = null;
470 Token token = null;
471
472 if (args.length >= 5 && !args[4].equalsIgnoreCase("null")) {
473 filter = args[4];
474 }
475 if (args.length == 6) {
476 token = new Token();
477 token.key = args[5];
478 System.out.println("token :" + token.key);
479 }
480
481 System.out.println("cluster :" + cluster);
482 System.out.println("dataSource :" + dataSource);
483 System.out.println("t0 :" + t0);
484 System.out.println("t1 :" + t1);
485 System.out.println("filter :" + filter);
486
487 ds.search(result, cluster, dataSource, t0, t1, filter, token);
488 TreeMap<Long, List<Record>> records = result.getRecords();
489 for(Entry<Long, List<Record>> entry : records.entrySet()) {
490 long ts = entry.getKey();
491 System.out.println("\n\nTimestamp: " + new Date(ts));
492 List<Record> list = entry.getValue();
493 for (int i = 0; i < list.size(); i++) {
494 System.out.println(list.get(i));
495 }
496 }
497
498 if (result.getToken() != null) {
499 System.out.println("Key -->" + result.getToken().key);
500 }
501 }
502 }