This project has retired. For details please refer to its
Attic page.
ChukwaFileParser xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
20
21
22 import java.io.IOException;
23 import java.util.LinkedList;
24 import java.util.List;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26 import org.apache.hadoop.chukwa.extraction.engine.Record;
27 import org.apache.hadoop.fs.FSDataInputStream;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30
31 public class ChukwaFileParser {
32 static final int timestampField = 0;
33
34 @SuppressWarnings("deprecation")
35 public static List<Record> readData(String cluster, String dataSource,
36 int maxRows, long t1, long t0, long maxOffset, String filter,
37 String fileName, FileSystem fs) throws MalformedFileFormat {
38
39
40 List<Record> records = new LinkedList<Record>();
41 FSDataInputStream dataIS = null;
42 int lineCount = 0;
43
44 try {
45
46 if (!fs.exists(new Path(fileName))) {
47 System.out.println("fileName not there!");
48 return records;
49 }
50 System.out.println("NameNodeParser Open [" + fileName + "]");
51
52 dataIS = fs.open(new Path(fileName));
53 System.out.println("NameNodeParser Open2 [" + fileName + "]");
54
55 long timestamp = 0;
56 int listSize = 0;
57 String line = null;
58 String[] data = null;
59 long offset = 0;
60
61 do {
62 offset = dataIS.getPos();
63
64
65
66
67
68
69
70 line = dataIS.readLine();
71 lineCount++;
72
73 if (line != null) {
74
75
76 if (line.length() < 14) {
77
78 continue;
79 }
80
81 data = line.split("\t");
82
83 try {
84 timestamp = Long.parseLong(data[timestampField]);
85
86 } catch (Exception e) {
87 e.printStackTrace();
88
89 }
90 if (timestamp < t0) {
91
92
93
94 continue;
95 } else if ((timestamp < t1) && (offset < maxOffset))
96
97 {
98
99
100 boolean valid = false;
101
102 if ((filter == null || filter.equals(""))) {
103 valid = true;
104 } else if (line.indexOf(filter) > 0) {
105 valid = true;
106 }
107
108 if (valid) {
109
110 ChukwaRecord record = new ChukwaRecord();
111 record.setTime(timestamp);
112 record.add("offset", "" + offset);
113 record.add(Record.bodyField, data[1]);
114 record.add(Record.sourceField, dataSource);
115
116 records.add(record);
117 listSize = records.size();
118 if (listSize > maxRows) {
119 records.remove(0);
120
121 }
122 } else {
123
124
125 }
126
127 } else {
128
129 break;
130 }
131 }
132
133 } while (line != null);
134 } catch (IOException e) {
135 e.printStackTrace();
136 } finally {
137 System.out.println("File: " + fileName + " Line count: " + lineCount);
138 try {
139 if(dataIS != null) {
140 dataIS.close();
141 }
142 } catch (IOException e) {
143 }
144 }
145 return records;
146 }
147 }