This project has retired. For details please refer to its Attic page.
ChunkImpl xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa;
20  
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  import java.util.regex.Matcher;
28  import java.util.regex.Pattern;
29  
30  import org.apache.hadoop.chukwa.datacollection.DataFactory;
31  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
32  
33  public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
34    public static int PROTOCOL_VERSION = 1;
35  
36    protected DataFactory dataFactory = DataFactory.getInstance();
37    private String source = "";
38    private String streamName = "";
39    private String dataType = "";
40    private String tags = "";
41    private byte[] data = null;
42    private int[] recordEndOffsets;
43    private int protocolVersion = 1;
44    private String debuggingInfo = "";
45  
46    private transient Adaptor initiator;
47    long seqID;
48  
49    private static String localHostAddr;
50    static {
51      try {
52        setHostAddress(InetAddress.getLocalHost().getHostName());
53      } catch (UnknownHostException e) {
54        setHostAddress("localhost");
55      }
56    }
57    
58    public static void setHostAddress(String host) {
59      ChunkImpl.localHostAddr = host;
60    }
61    
62    
63    public static ChunkImpl getBlankChunk() {
64      return new ChunkImpl();
65    }
66  
67    ChunkImpl() {
68    }
69  
70    public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
71                     Adaptor source) {
72      this.seqID = seq;
73      this.source = localHostAddr;
74      this.tags = dataFactory.getDefaultTags();
75      this.streamName = streamName;
76      this.dataType = dataType;
77      this.data = (byte[]) data.clone();
78      this.initiator = source;
79    }
80  
81    /**
82     * @see org.apache.hadoop.chukwa.Chunk#getData()
83     */
84    public byte[] getData() {
85      return data.clone();
86    }
87  
88    /**
89     * @see org.apache.hadoop.chukwa.Chunk#setData(byte[])
90     */
91    public void setData(byte[] logEvent) {
92      this.data = (byte[]) logEvent.clone();
93    }
94  
95    /**
96     * @see org.apache.hadoop.chukwa.Chunk#getStreamName()
97     */
98    public String getStreamName() {
99      return streamName;
100   }
101 
102   public void setStreamName(String logApplication) {
103     this.streamName = logApplication;
104   }
105 
106   public String getSource() {
107     return source;
108   }
109 
110   public void setSource(String logSource) {
111     this.source = logSource;
112   }
113 
114   public String getDebugInfo() {
115     return debuggingInfo;
116   }
117 
118   public void setDebugInfo(String a) {
119     this.debuggingInfo = a;
120   }
121 
122   /**
123    * @see org.apache.hadoop.chukwa.Chunk#getSeqID()
124    */
125   public long getSeqID() {
126     return seqID;
127   }
128 
129   public void setSeqID(long l) {
130     seqID = l;
131   }
132 
133   public int getProtocolVersion() {
134     return protocolVersion;
135   }
136 
137   public void setProtocolVersion(int pv) {
138     this.protocolVersion = pv;
139   }
140 
141   public Adaptor getInitiator() {
142     return initiator;
143   }
144 
145   public void setInitiator(Adaptor a) {
146     initiator = a;
147   }
148 
149   public void setLogSource() {
150     source = localHostAddr;
151   }
152 
153   public int[] getRecordOffsets() {
154     if (recordEndOffsets == null)
155       recordEndOffsets = new int[] { data.length - 1 };
156     return recordEndOffsets.clone();
157   }
158 
159   public void setRecordOffsets(int[] offsets) {
160     recordEndOffsets = (int[]) offsets.clone();
161   }
162 
163   public String getDataType() {
164     return dataType;
165   }
166 
167   public void setDataType(String t) {
168     dataType = t;
169   }
170 
171   @Override
172   public void addTag(String tags) {
173     this.tags += " "+ tags;
174   }
175 
176   /**
177    * @see org.apache.hadoop.chukwa.Chunk#getTags()
178    */
179   public String getTags() {
180     return tags;
181   }
182   
183   /**
184    * @see org.apache.hadoop.chukwa.Chunk#getTag(java.lang.String)
185    */
186   public String getTag(String tagName) {
187     Pattern tagPattern = Pattern.compile("\\b"+tagName+"=\"([^\"]*)\"");
188     if (tags != null) {
189       Matcher matcher = tagPattern.matcher(tags);
190       if (matcher.find()) {
191         return matcher.group(1);
192       }
193     }
194     return null;
195   }
196   
197   /**
198    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
199    */
200   public void readFields(DataInput in) throws IOException {
201     setProtocolVersion(in.readInt());
202     if (protocolVersion != PROTOCOL_VERSION) {
203       throw new IOException(
204           "Protocol version mismatched, drop data.  source version: "
205               + protocolVersion + ", collector version:" + PROTOCOL_VERSION);
206     }
207     setSeqID(in.readLong());
208     setSource(in.readUTF());
209     tags = in.readUTF(); // no public set method here
210     setStreamName(in.readUTF());
211     setDataType(in.readUTF());
212     setDebugInfo(in.readUTF());
213 
214     int numRecords = in.readInt();
215     recordEndOffsets = new int[numRecords];
216     for (int i = 0; i < numRecords; ++i)
217       recordEndOffsets[i] = in.readInt();
218     data = new byte[recordEndOffsets[recordEndOffsets.length - 1] + 1];
219     in.readFully(data);
220 
221   }
222 
223   /**
224    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
225    */
226   public void write(DataOutput out) throws IOException {
227     out.writeInt(PROTOCOL_VERSION);
228     out.writeLong(seqID);
229     out.writeUTF(source);
230     out.writeUTF(tags);
231     out.writeUTF(streamName);
232     out.writeUTF(dataType);
233     out.writeUTF(debuggingInfo);
234 
235     if (recordEndOffsets == null)
236       recordEndOffsets = new int[] { data.length - 1 };
237 
238     out.writeInt(recordEndOffsets.length);
239     for (int i = 0; i < recordEndOffsets.length; ++i)
240       out.writeInt(recordEndOffsets[i]);
241 
242     out.write(data, 0, recordEndOffsets[recordEndOffsets.length - 1] + 1); 
243     // byte at last offset is valid
244   }
245 
246   public static ChunkImpl read(DataInput in) throws IOException {
247     ChunkImpl w = new ChunkImpl();
248     w.readFields(in);
249     return w;
250   }
251 
252   // FIXME: should do something better here, but this is OK for debugging
253   public String toString() {
254     return source + ":" + streamName + ":" + new String(data) + "/" + seqID;
255   }
256 
257 
258 
259   /**
260    * @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()
261    */
262   public int getSerializedSizeEstimate() {
263     int size = 2 * (source.length() + streamName.length() + dataType.length() 
264         + debuggingInfo.length()); // length of strings (pessimistic)
265     size += data.length + 4;
266     if (recordEndOffsets == null)
267       size += 8;
268     else
269       size += 4 * (recordEndOffsets.length + 1); // +1 for length of array
270     size += 8; // uuid
271     return size;
272   }
273 
274   public void setRecordOffsets(java.util.Collection<Integer> carriageReturns) {
275     recordEndOffsets = new int[carriageReturns.size()];
276     int i = 0;
277     for (Integer offset : carriageReturns)
278       recordEndOffsets[i++] = offset;
279   }
280   
281   public int getLength() {
282     return data.length;
283   }
284 
285 }