This project has retired. For details please refer to its
Attic page.
ChukwaSequenceFileParser xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
20
21
22 import java.io.IOException;
23 import java.util.Date;
24 import java.util.LinkedList;
25 import java.util.List;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.Record;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.io.SequenceFile;
33
34 public class ChukwaSequenceFileParser {
35
36 public static List<Record> readData(String cluster, String dataSource,
37 int maxRows, long t1, long t0, long maxOffset, String filter,
38 String fileName, FileSystem fs, Configuration conf)
39 throws MalformedFileFormat {
40
41
42 List<Record> records = new LinkedList<Record>();
43 SequenceFile.Reader r = null;
44
45 int lineCount = 0;
46 if (filter != null) {
47 filter = filter.toLowerCase();
48 }
49
50 try {
51
52 if (!fs.exists(new Path(fileName))) {
53 System.out.println("fileName not there!");
54 return records;
55 }
56 System.out.println("NameNodeParser Open [" + fileName + "]");
57
58 r = new SequenceFile.Reader(fs, new Path(fileName), conf);
59 System.out.println("NameNodeParser Open2 [" + fileName + "]");
60
61 long timestamp = 0;
62 int listSize = 0;
63
64 long offset = 0;
65
66
67 ChukwaRecordKey key = new ChukwaRecordKey();
68 ChukwaRecord record = new ChukwaRecord();
69
70 while (r.next(key, record)) {
71 lineCount++;
72
73 System.out.println("NameNodeParser Line ["
74 + record.getValue(Record.bodyField) + "]");
75
76 if (record != null) {
77 timestamp = record.getTime();
78 if (timestamp < t0) {
79 System.out.println("Line not in range. Skipping: "
80 + record.getValue(Record.bodyField));
81 System.out.println("Search for: " + new Date(t0) + " is :"
82 + new Date(timestamp));
83 continue;
84 } else if ((timestamp < t1) && (offset < maxOffset))
85
86 {
87
88 System.out
89 .println("In Range: " + record.getValue(Record.bodyField));
90 boolean valid = false;
91
92 if ((filter == null || filter.equals(""))) {
93 valid = true;
94 } else if (isValid(record, filter)) {
95 valid = true;
96 }
97
98 if (valid) {
99 records.add(record);
100 record = new ChukwaRecord();
101 listSize = records.size();
102 if (listSize > maxRows) {
103 records.remove(0);
104 System.out.println("==========>>>>>REMOVING: "
105 + record.getValue(Record.bodyField));
106 }
107 } else {
108 System.out
109 .println("In Range ==================>>>>>>>>> OUT Regex: "
110 + record.getValue(Record.bodyField));
111 }
112
113 } else {
114 System.out.println("Line out of range. Stopping now: "
115 + record.getValue(Record.bodyField));
116 break;
117 }
118 }
119
120 }
121 } catch (IOException e) {
122 e.printStackTrace();
123 } finally {
124 System.out.println("File: " + fileName + " Line count: " + lineCount);
125 if (r != null) {
126 try {
127 r.close();
128 } catch (IOException e) {
129 }
130 }
131
132 }
133 return records;
134 }
135
136 protected static boolean isValid(ChukwaRecord record, String filter) {
137 String[] fields = record.getFields();
138 for (String field : fields) {
139 if (record.getValue(field).toLowerCase().indexOf(filter) >= 0) {
140 return true;
141 }
142 }
143 return false;
144 }
145 }