This project has retired. For details please refer to its Attic page.
ChunkImpl xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa;
20  
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.net.InetAddress;
26  import java.net.UnknownHostException;
27  import java.nio.charset.Charset;
28  import java.util.regex.Matcher;
29  import java.util.regex.Pattern;
30  
31  import org.apache.hadoop.chukwa.datacollection.DataFactory;
32  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
33  
34  public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
35    public final static int PROTOCOL_VERSION = 1;
36  
37    protected DataFactory dataFactory = DataFactory.getInstance();
38    private String source = "";
39    private String streamName = "";
40    private String dataType = "";
41    private String tags = "";
42    private byte[] data = null;
43    private int[] recordEndOffsets;
44    private int protocolVersion = 1;
45    private String debuggingInfo = "";
46  
47    private transient Adaptor initiator;
48    long seqID;
49  
50    private static String localHostAddr;
51    static {
52      try {
53        setHostAddress(InetAddress.getLocalHost().getHostName().toLowerCase());
54      } catch (UnknownHostException e) {
55        setHostAddress("localhost");
56      }
57    }
58    
59    public static void setHostAddress(String host) {
60      ChunkImpl.localHostAddr = host;
61    }
62    
63    
64    public static ChunkImpl getBlankChunk() {
65      return new ChunkImpl();
66    }
67  
68    ChunkImpl() {
69    }
70  
71    public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
72                     Adaptor source) {
73      this.seqID = seq;
74      this.source = localHostAddr;
75      this.tags = dataFactory.getDefaultTags();
76      this.streamName = streamName;
77      this.dataType = dataType;
78      this.data = (byte[]) data.clone();
79      this.initiator = source;
80    }
81  
82    /**
83     * @see org.apache.hadoop.chukwa.Chunk#getData()
84     */
85    public byte[] getData() {
86      return data.clone();
87    }
88  
89    /**
90     * @see org.apache.hadoop.chukwa.Chunk#setData(byte[])
91     */
92    public void setData(byte[] logEvent) {
93      this.data = (byte[]) logEvent.clone();
94    }
95  
96    /**
97     * @see org.apache.hadoop.chukwa.Chunk#getStreamName()
98     */
99    public String getStreamName() {
100     return streamName;
101   }
102 
103   public void setStreamName(String logApplication) {
104     this.streamName = logApplication;
105   }
106 
107   public String getSource() {
108     return source;
109   }
110 
111   public void setSource(String logSource) {
112     this.source = logSource;
113   }
114 
115   public String getDebugInfo() {
116     return debuggingInfo;
117   }
118 
119   public void setDebugInfo(String a) {
120     this.debuggingInfo = a;
121   }
122 
123   /**
124    * @see org.apache.hadoop.chukwa.Chunk#getSeqID()
125    */
126   public long getSeqID() {
127     return seqID;
128   }
129 
130   public void setSeqID(long l) {
131     seqID = l;
132   }
133 
134   public int getProtocolVersion() {
135     return protocolVersion;
136   }
137 
138   public void setProtocolVersion(int pv) {
139     this.protocolVersion = pv;
140   }
141 
142   public Adaptor getInitiator() {
143     return initiator;
144   }
145 
146   public void setInitiator(Adaptor a) {
147     initiator = a;
148   }
149 
150   public void setLogSource() {
151     source = localHostAddr;
152   }
153 
154   public int[] getRecordOffsets() {
155     if (recordEndOffsets == null)
156       recordEndOffsets = new int[] { data.length - 1 };
157     return recordEndOffsets.clone();
158   }
159 
160   public void setRecordOffsets(int[] offsets) {
161     recordEndOffsets = (int[]) offsets.clone();
162   }
163 
164   public String getDataType() {
165     return dataType;
166   }
167 
168   public void setDataType(String t) {
169     dataType = t;
170   }
171 
172   @Override
173   public void addTag(String tags) {
174     this.tags += " "+ tags;
175   }
176 
177   /**
178    * @see org.apache.hadoop.chukwa.Chunk#getTags()
179    */
180   public String getTags() {
181     return tags;
182   }
183   
184   /**
185    * @see org.apache.hadoop.chukwa.Chunk#getTag(java.lang.String)
186    */
187   public String getTag(String tagName) {
188     Pattern tagPattern = Pattern.compile("\\b"+tagName+"=\"([^\"]*)\"");
189     if (tags != null) {
190       Matcher matcher = tagPattern.matcher(tags);
191       if (matcher.find()) {
192         return matcher.group(1);
193       }
194     }
195     return null;
196   }
197   
198   /**
199    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
200    */
201   public void readFields(DataInput in) throws IOException {
202     setProtocolVersion(in.readInt());
203     if (protocolVersion != PROTOCOL_VERSION) {
204       throw new IOException(
205           "Protocol version mismatched, drop data.  source version: "
206               + protocolVersion + ", collector version:" + PROTOCOL_VERSION);
207     }
208     setSeqID(in.readLong());
209     setSource(in.readUTF());
210     tags = in.readUTF(); // no public set method here
211     setStreamName(in.readUTF());
212     setDataType(in.readUTF());
213     setDebugInfo(in.readUTF());
214 
215     int numRecords = in.readInt();
216     recordEndOffsets = new int[numRecords];
217     for (int i = 0; i < numRecords; ++i)
218       recordEndOffsets[i] = in.readInt();
219     data = new byte[recordEndOffsets[recordEndOffsets.length - 1] + 1];
220     in.readFully(data);
221 
222   }
223 
224   /**
225    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
226    */
227   public void write(DataOutput out) throws IOException {
228     out.writeInt(PROTOCOL_VERSION);
229     out.writeLong(seqID);
230     out.writeUTF(source);
231     out.writeUTF(tags);
232     out.writeUTF(streamName);
233     out.writeUTF(dataType);
234     out.writeUTF(debuggingInfo);
235 
236     if (recordEndOffsets == null)
237       recordEndOffsets = new int[] { data.length - 1 };
238 
239     out.writeInt(recordEndOffsets.length);
240     for (int i = 0; i < recordEndOffsets.length; ++i)
241       out.writeInt(recordEndOffsets[i]);
242 
243     out.write(data, 0, recordEndOffsets[recordEndOffsets.length - 1] + 1); 
244     // byte at last offset is valid
245   }
246 
247   public static ChunkImpl read(DataInput in) throws IOException {
248     ChunkImpl w = new ChunkImpl();
249     w.readFields(in);
250     return w;
251   }
252 
253   public String toString() {
254     StringBuilder buffer = new StringBuilder();
255     buffer.append(source);
256     buffer.append(":");
257     buffer.append(streamName);
258     buffer.append(new String(data, Charset.forName("UTF-8")));
259     buffer.append("/");
260     buffer.append(seqID);
261     return buffer.toString();
262   }
263 
264 
265 
266   /**
267    * @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()
268    */
269   public int getSerializedSizeEstimate() {
270     int size = 2 * (source.length() + streamName.length() + dataType.length() 
271         + debuggingInfo.length()); // length of strings (pessimistic)
272     size += data.length + 4;
273     if (recordEndOffsets == null)
274       size += 8;
275     else
276       size += 4 * (recordEndOffsets.length + 1); // +1 for length of array
277     size += 8; // uuid
278     return size;
279   }
280 
281   public void setRecordOffsets(java.util.Collection<Integer> carriageReturns) {
282     recordEndOffsets = new int[carriageReturns.size()];
283     int i = 0;
284     for (Integer offset : carriageReturns)
285       recordEndOffsets[i++] = offset;
286   }
287   
288   public int getLength() {
289     return data.length;
290   }
291 
292 }