This project has retired. For details please refer to its Attic page.
CharFileTailingAdaptorUTF8NewLineEscaped xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
20  
21  
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24  import org.apache.hadoop.chukwa.util.RecordConstants;
25  
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  
29  /**
30   * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
31   * records at non-escaped carriage returns
32   * 
33   */
34  public class CharFileTailingAdaptorUTF8NewLineEscaped extends
35      FileTailingAdaptor {
36  
37    private static final char SEPARATOR = '\n';
38  
39    private ArrayList<Integer> offsets = new ArrayList<Integer>();
40  
41    /**
42     * 
43     * Note: this method uses a temporary ArrayList (shared across instances).
44     * This means we're copying ints each time. This could be a performance issue.
45     * Also, 'offsets' never shrinks, and will be of size proportional to the
46     * largest number of lines ever seen in an event.
47     */
48    @Override
49    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
50        byte[] buf) throws InterruptedException {
51      String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
52      for (int i = 0; i < buf.length; ++i) {
53        // if this is a separator
54        if (buf[i] == SEPARATOR) {
55          // if possibly preceded by escape sequence (avoid outOfBounds here)
56          boolean escaped = false; // was it escaped?
57          if (i - es.length() >= 0) {
58            escaped = true; // maybe (at least there was room for an escape
59                            // sequence, so let's check for the e.s.)
60            for (int j = 0; j < es.length(); j++) {
61              if (buf[i - es.length() + j] != es.charAt(j)) {
62                escaped = false;
63              }
64            }
65          }
66          if (!escaped) {
67            offsets.add(i);
68          }
69        }
70      }
71  
72      if (offsets.size() > 0) {
73        int[] offsets_i = new int[offsets.size()];
74        for (int i = 0; i < offsets.size(); i++) {
75          try {
76          offsets_i[i] = offsets.get(i);
77          } catch(NullPointerException e) {
78            // Skip offsets 0 where it can be null.
79          }
80        }
81        // make the stream unique to this adaptor
82        int bytesUsed = 0;
83        if(buf.length==offsets_i[offsets_i.length -1]) {
84          // If Separator is last character of stream,
85          // send the record.
86          bytesUsed = offsets_i[offsets_i.length - 2] + 1;
87        } else {
88          // If the last record is partial read,
89          // truncate the record to the n -1 new line.
90          bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last        
91        }
92                                                             // offset uses a byte
93        assert bytesUsed > 0 : " shouldn't send empty events";
94        ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
95            buffOffsetInFile + bytesUsed, Arrays.copyOf(buf, bytesUsed), this);
96  
97        chunk.setSeqID(buffOffsetInFile + bytesUsed);
98        chunk.setRecordOffsets(offsets_i);
99        eq.add(chunk);
100 
101       offsets.clear();
102       return bytesUsed;
103     } else
104       return 0;
105   }
106 
107   public String toString() {
108     return "escaped newline CFTA-UTF8";
109   }
110 
111 }