This project has retired. For details please refer to its
Attic page.
ChukwaRecordDataSource xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
19
20
21 import java.io.IOException;
22 import java.text.SimpleDateFormat;
23 import java.util.ArrayList;
24 import java.util.Calendar;
25 import java.util.Date;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.TreeMap;
30 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
34 import org.apache.hadoop.chukwa.extraction.engine.Record;
35 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36 import org.apache.hadoop.chukwa.extraction.engine.Token;
37 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.io.SequenceFile;
43 import org.apache.log4j.Logger;
44
45 public class ChukwaRecordDataSource implements DataSource {
46
47
48
49 static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
50
51 private static final int dayFolder = 100;
52 private static final int hourFolder = 200;
53 private static final int rawFolder = 300;
54
55 static final String[] raws = { "0", "5", "10", "15", "20", "25", "30", "35",
56 "40", "45", "50", "55" };
57
58 private static FileSystem fs = null;
59 private static ChukwaConfiguration conf = null;
60
61 private static String rootDsFolder = null;
62 private static DataConfig dataConfig = null;
63
64 static {
65 dataConfig = new DataConfig();
66 rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
67 conf = new ChukwaConfiguration();
68 try {
69 fs = FileSystem.get(conf);
70 } catch (IOException e) {
71 e.printStackTrace();
72 }
73 }
74
75 @Override
76 public boolean isThreadSafe() {
77 return true;
78 }
79
80 @Override
81 public SearchResult search(SearchResult result, String cluster,
82 String dataSource, long t0, long t1, String filter, Token token)
83 throws DataSourceException {
84 String filePath = rootDsFolder + "/" + cluster + "/";
85
86 log.debug("filePath [" + filePath + "]");
87 Calendar calendar = Calendar.getInstance();
88 calendar.setTimeInMillis(t0);
89 SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
90 int maxCount = 200;
91
92 List<Record> records = new ArrayList<Record>();
93
94 ChukwaDSInternalResult res = new ChukwaDSInternalResult();
95
96 if (token != null) {
97
98
99 try {
100 String[] vars = token.key.split("\\|");
101 res.day = vars[0];
102 res.hour = Integer.parseInt(vars[1]);
103 res.rawIndex = Integer.parseInt(vars[2]);
104 res.spill = Integer.parseInt(vars[3]);
105 res.currentTs = Long.parseLong(vars[4]);
106 res.position = Long.parseLong(vars[5]);
107 res.fileName = vars[5];
108 log.info("Token is not null! :" + token.key);
109 } catch (Exception e) {
110 log.error("Incalid Key: [" + token.key + "] exception: ", e);
111 }
112 } else {
113 log.debug("Token is null!");
114 }
115
116 try {
117 do {
118 log.debug("start Date [" + calendar.getTime() + "]");
119 String workingDay = sdf.format(calendar.getTime());
120 int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
121 int startRawIndex = 0;
122 if (token != null) {
123 workingDay = res.day;
124 workingHour = res.hour;
125 startRawIndex = res.rawIndex;
126 } else {
127 token = new Token();
128 }
129
130 log.debug("workingDay " + workingDay);
131 log.debug("workingHour " + workingHour);
132
133 if (exist(dayFolder, filePath, dataSource, workingDay, null, null)) {
134
135 if (containsRotateFlag(dayFolder, filePath, dataSource, workingDay,
136 null)) {
137
138
139 log.debug("fs.exists(workingDayRotatePath) ");
140 extractRecords(res, ChukwaRecordDataSource.dayFolder, filePath,
141 dataSource, workingDay, null, -1, token, records, maxCount, t0,
142 t1, filter);
143 maxCount = maxCount - records.size();
144 if ((maxCount <= 0) || (res.currentTs > t1)) {
145 break;
146 }
147
148 }
149 else
150 {
151 log.debug("check for hours");
152 for (int hour = 0; hour < 24; hour++) {
153 if (workingDay == res.day && hour < workingHour) {
154 continue;
155 }
156 log.debug(" Hour? -->" + filePath + dataSource + "/"
157 + workingDay + "/" + hour);
158 if (exist(dayFolder, filePath, dataSource, workingDay, "" + hour,
159 null)) {
160 if (containsRotateFlag(dayFolder, filePath, dataSource,
161 workingDay, "" + hour)) {
162
163
164 extractRecords(res, ChukwaRecordDataSource.hourFolder,
165 filePath, dataSource, workingDay, "" + hour, -1, token,
166 records, maxCount, t0, t1, filter);
167 } else
168 {
169 log.debug("Working on Raw");
170
171 for (int rawIndex = startRawIndex; rawIndex < 12; rawIndex++) {
172
173
174
175 if (exist(dayFolder, filePath, dataSource, workingDay, ""
176 + hour, raws[rawIndex])) {
177 extractRecords(res, ChukwaRecordDataSource.rawFolder,
178 filePath, dataSource, workingDay, "" + hour,
179 rawIndex, token, records, maxCount, t0, t1, filter);
180 maxCount = maxCount - records.size();
181 if ((maxCount <= 0) || (res.currentTs > t1)) {
182 break;
183 }
184 } else {
185 log.debug("<<<<<<<<<Working on Raw Not exist--> "
186 + filePath + dataSource + "/" + workingDay + "/"
187 + workingHour + "/" + raws[rawIndex]);
188 }
189 res.spill = 1;
190 }
191 }
192 }
193
194
195 maxCount = maxCount - records.size();
196 if ((maxCount <= 0) || (res.currentTs > t1)) {
197 break;
198 }
199
200 }
201 }
202 }
203
204 maxCount = maxCount - records.size();
205 if ((maxCount <= 0) || (res.currentTs > t1)) {
206 break;
207 }
208
209
210 calendar.add(Calendar.DAY_OF_MONTH, +1);
211 calendar.set(Calendar.HOUR_OF_DAY, 0);
212 calendar.set(Calendar.MINUTE, 0);
213 calendar.set(Calendar.SECOND, 0);
214
215 } while (calendar.getTimeInMillis() < t1);
216
217 } catch (Exception e) {
218 e.printStackTrace();
219 throw new DataSourceException(e);
220 }
221
222 TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
223 for (Record record : records) {
224 long timestamp = record.getTime();
225 if (recordsInResult.containsKey(timestamp)) {
226 recordsInResult.get(timestamp).add(record);
227 } else {
228 List<Record> list = new LinkedList<Record>();
229 list.add(record);
230 recordsInResult.put(timestamp, list);
231 }
232 }
233 result.setToken(token);
234 return result;
235
236 }
237
238 public void extractRecords(ChukwaDSInternalResult res, int directoryType,
239 String rootFolder, String dataSource, String day, String hour,
240 int rawIndex, Token token, List<Record> records, int maxRows, long t0,
241 long t1, String filter) throws Exception {
242
243
244 int spill = res.spill;
245
246 boolean workdone = false;
247 do {
248 String fileName = buildFileName(directoryType, rootFolder, dataSource,
249 spill, day, hour, rawIndex);
250 log.debug("extractRecords : " + fileName);
251
252 if (fs.exists(new Path(fileName))) {
253 readData(res, token, fileName, maxRows, t0, t1, filter);
254 res.spill = spill;
255 List<Record> localRecords = res.records;
256 log.debug("localRecords size : " + localRecords.size());
257 maxRows = maxRows - localRecords.size();
258 if (maxRows <= 0) {
259 workdone = true;
260 }
261 records.addAll(localRecords);
262 log.debug("AFTER fileName [" + fileName + "] count="
263 + localRecords.size() + " maxCount=" + maxRows);
264 spill++;
265 } else {
266
267 workdone = true;
268 }
269 } while (!workdone);
270 token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|"
271 + res.currentTs + "|" + res.position + "|" + res.fileName;
272 }
273
274 public void readData(ChukwaDSInternalResult res, Token token,
275 String fileName, int maxRows, long t0, long t1, String filter)
276 throws Exception {
277 List<Record> records = new LinkedList<Record>();
278 res.records = records;
279 SequenceFile.Reader r = null;
280 if (filter != null) {
281 filter = filter.toLowerCase();
282 }
283
284 try {
285
286 if (!fs.exists(new Path(fileName))) {
287 log.debug("fileName not there!");
288 return;
289 }
290 log.debug("Parser Open [" + fileName + "]");
291
292 long timestamp = 0;
293 int listSize = 0;
294 ChukwaRecordKey key = new ChukwaRecordKey();
295 ChukwaRecord record = new ChukwaRecord();
296
297 r = new SequenceFile.Reader(fs, new Path(fileName), conf);
298
299 log.debug("readData Open2 [" + fileName + "]");
300 if ((fileName.equals(res.fileName)) && (res.position != -1)) {
301 r.seek(res.position);
302 }
303 res.fileName = fileName;
304
305 while (r.next(key, record)) {
306 if (record != null) {
307 res.position = r.getPosition();
308
309 timestamp = record.getTime();
310 res.currentTs = timestamp;
311 log.debug("\nSearch for startDate: " + new Date(t0) + " is :"
312 + new Date(timestamp));
313
314 if (timestamp < t0) {
315
316 continue;
317 } else if (timestamp < t1) {
318 log.debug("In Range: " + record.toString());
319 boolean valid = false;
320
321 if ((filter == null || filter.equals(""))) {
322 valid = true;
323 } else if (isValid(record, filter)) {
324 valid = true;
325 }
326
327 if (valid) {
328 records.add(record);
329 record = new ChukwaRecord();
330 listSize = records.size();
331 if (listSize >= maxRows) {
332
333
334 token.key = key.getKey();
335 token.hasMore = true;
336 break;
337 }
338 } else {
339 log.debug("In Range ==================>>>>>>>>> OUT Regex: "
340 + record);
341 }
342 } else {
343 log.debug("Line out of range. Stopping now: " + record);
344
345 token.key = key.getKey();
346 token.hasMore = false;
347 break;
348 }
349 }
350 }
351
352 } catch (Exception e) {
353 e.printStackTrace();
354 } finally {
355 try {
356 r.close();
357 } catch (Exception e) {
358 }
359 }
360 }
361
362 public boolean containsRotateFlag(int directoryType, String rootFolder,
363 String dataSource, String workingDay, String workingHour)
364 throws Exception {
365 boolean contains = false;
366 switch (directoryType) {
367 case ChukwaRecordDataSource.dayFolder:
368
369 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
370 + "/rotateDone"));
371 break;
372
373 case ChukwaRecordDataSource.hourFolder:
374
375 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
376 + "/" + workingHour + "/rotateDone"));
377 break;
378
379 }
380 return contains;
381 }
382
383 public boolean exist(int directoryType, String rootFolder, String dataSource,
384 String workingDay, String workingHour, String raw) throws Exception {
385 boolean contains = false;
386 switch (directoryType) {
387 case ChukwaRecordDataSource.dayFolder:
388
389 contains = fs
390 .exists(new Path(rootFolder + dataSource + "/" + workingDay));
391 break;
392
393 case ChukwaRecordDataSource.hourFolder:
394
395 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
396 + "/" + workingHour));
397 break;
398 case ChukwaRecordDataSource.rawFolder:
399
400 contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
401 + "/" + workingHour + "/" + raw));
402 break;
403
404 }
405 return contains;
406 }
407
408 protected boolean isValid(ChukwaRecord record, String filter) {
409 String[] fields = record.getFields();
410 for (String field : fields) {
411 if (record.getValue(field).toLowerCase().indexOf(filter) >= 0) {
412 return true;
413 }
414 }
415 return false;
416 }
417
418 public String buildFileName(int directoryType, String rootFolder,
419 String dataSource, int spill, String day, String hour, int rawIndex) {
420 String fileName = null;
421
422
423
424 switch (directoryType) {
425 case ChukwaRecordDataSource.dayFolder:
426
427 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
428 + "_" + day + "." + spill + ".evt";
429 break;
430
431 case ChukwaRecordDataSource.hourFolder:
432
433 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
434 + dataSource + "_" + day + "_" + hour + "." + spill + ".evt";
435 break;
436
437 case ChukwaRecordDataSource.rawFolder:
438
439 fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
440 + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
441 + raws[rawIndex] + "." + spill + ".evt";
442 break;
443 }
444 log.debug("buildFileName :" + fileName);
445 return fileName;
446 }
447
448 public static void main(String[] args) throws DataSourceException {
449 ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
450 SearchResult result = new ChukwaSearchResult();
451 result.setRecords(new TreeMap<Long, List<Record>>());
452 String cluster = args[0];
453 String dataSource = args[1];
454 long t0 = Long.parseLong(args[2]);
455 long t1 = Long.parseLong(args[3]);
456 String filter = null;
457 Token token = null;
458
459 if (args.length >= 5 && !args[4].equalsIgnoreCase("null")) {
460 filter = args[4];
461 }
462 if (args.length == 6) {
463 token = new Token();
464 token.key = args[5];
465 System.out.println("token :" + token.key);
466 }
467
468 System.out.println("cluster :" + cluster);
469 System.out.println("dataSource :" + dataSource);
470 System.out.println("t0 :" + t0);
471 System.out.println("t1 :" + t1);
472 System.out.println("filter :" + filter);
473
474 ds.search(result, cluster, dataSource, t0, t1, filter, token);
475 TreeMap<Long, List<Record>> records = result.getRecords();
476 Iterator<Long> it = records.keySet().iterator();
477
478 while (it.hasNext()) {
479 long ts = it.next();
480 System.out.println("\n\nTimestamp: " + new Date(ts));
481 List<Record> list = records.get(ts);
482 for (int i = 0; i < list.size(); i++) {
483 System.out.println(list.get(i));
484 }
485 }
486
487 if (result.getToken() != null) {
488 System.out.println("Key -->" + result.getToken().key);
489 }
490 }
491 }