This project has retired. For details please refer to its Attic page.
ChukwaFileParser xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
20  
21  
22  import java.io.IOException;
23  import java.util.LinkedList;
24  import java.util.List;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26  import org.apache.hadoop.chukwa.extraction.engine.Record;
27  import org.apache.hadoop.fs.FSDataInputStream;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  
31  public class ChukwaFileParser {
32    static final int timestampField = 0;
33  
34    @SuppressWarnings("deprecation")
35    public static List<Record> readData(String cluster, String dataSource,
36        int maxRows, long t1, long t0, long maxOffset, String filter,
37        String fileName, FileSystem fs) throws MalformedFileFormat {
38  
39      // String source = "NameNode." + fileName;
40      List<Record> records = new LinkedList<Record>();
41      FSDataInputStream dataIS = null;
42      int lineCount = 0;
43  
44      try {
45  
46        if (!fs.exists(new Path(fileName))) {
47          System.out.println("fileName not there!");
48          return records;
49        }
50        System.out.println("NameNodeParser Open [" + fileName + "]");
51  
52        dataIS = fs.open(new Path(fileName));
53        System.out.println("NameNodeParser Open2 [" + fileName + "]");
54  
55        long timestamp = 0;
56        int listSize = 0;
57        String line = null;
58        String[] data = null;
59        long offset = 0;
60  
61        do {
62          offset = dataIS.getPos();
63  
64          // Need TODO something here
65          // if (offset > maxOffset)
66          // {
67          // break;
68          // }
69  
70          line = dataIS.readLine();
71          lineCount++;
72          // System.out.println("NameNodeParser Line [" +line + "]");
73          if (line != null) {
74  
75            // empty lines
76            if (line.length() < 14) {
77              // System.out.println("NameNodeParser Line < 14! [" +line + "]");
78              continue;
79            }
80            // System.out.println("Line [" +line + "]");
81            data = line.split("\t");// Default separator for TextOutputFormat!
82  
83            try {
84              timestamp = Long.parseLong(data[timestampField]);
85  
86            } catch (Exception e) {
87              e.printStackTrace();
88              // throw new MalformedFileFormat(e);
89            }
90            if (timestamp < t0) {
91              // System.out.println("Line not in range. Skipping: " +line);
92              // System.out.println("Search for: " + new Date(t0) + " is :" + new
93              // Date(timestamp));
94              continue;
95            } else if ((timestamp < t1) && (offset < maxOffset)) // JB (epochTS <
96                                                                 // maxDate)
97            {
98  
99              // System.out.println("In Range: " + line);
100             boolean valid = false;
101 
102             if ((filter == null || filter.equals(""))) {
103               valid = true;
104             } else if (line.indexOf(filter) > 0) {
105               valid = true;
106             }
107 
108             if (valid) {
109               // System.out.println("In Range In Filter: " + line);
110               ChukwaRecord record = new ChukwaRecord();
111               record.setTime(timestamp);
112               record.add("offset", "" + offset);
113               record.add(Record.bodyField, data[1]);
114               record.add(Record.sourceField, dataSource);
115 
116               records.add(record);
117               listSize = records.size();
118               if (listSize > maxRows) {
119                 records.remove(0);
120                 // System.out.println("==========>>>>>REMOVING: " + e);
121               }
122             } else {
123               // System.out.println(
124               // "In Range ==================>>>>>>>>> OUT Regex: " + line);
125             }
126 
127           } else {
128             // System.out.println("Line out of range. Stopping now: " +line);
129             break;
130           }
131         }
132 
133       } while (line != null);
134     } catch (IOException e) {
135       e.printStackTrace();
136     } finally {
137       System.out.println("File: " + fileName + " Line count: " + lineCount);
138       try {
139         if(dataIS != null) {
140           dataIS.close();
141         }
142       } catch (IOException e) {
143       }
144     }
145     return records;
146   }
147 }