This project has retired. For details please refer to its
Attic page.
ChunkImpl xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa;
20
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.nio.charset.Charset;
28 import java.util.regex.Matcher;
29 import java.util.regex.Pattern;
30
31 import org.apache.hadoop.chukwa.datacollection.DataFactory;
32 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
33
34 public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
35 public final static int PROTOCOL_VERSION = 1;
36
37 protected DataFactory dataFactory = DataFactory.getInstance();
38 private String source = "";
39 private String streamName = "";
40 private String dataType = "";
41 private String tags = "";
42 private byte[] data = null;
43 private int[] recordEndOffsets;
44 private int protocolVersion = 1;
45 private String debuggingInfo = "";
46
47 private transient Adaptor initiator;
48 long seqID;
49
50 private static String localHostAddr;
51 static {
52 try {
53 setHostAddress(InetAddress.getLocalHost().getHostName().toLowerCase());
54 } catch (UnknownHostException e) {
55 setHostAddress("localhost");
56 }
57 }
58
59 public static void setHostAddress(String host) {
60 ChunkImpl.localHostAddr = host;
61 }
62
63
64 public static ChunkImpl getBlankChunk() {
65 return new ChunkImpl();
66 }
67
68 ChunkImpl() {
69 }
70
71 public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
72 Adaptor source) {
73 this.seqID = seq;
74 this.source = localHostAddr;
75 this.tags = dataFactory.getDefaultTags();
76 this.streamName = streamName;
77 this.dataType = dataType;
78 this.data = (byte[]) data.clone();
79 this.initiator = source;
80 }
81
82
83
84
85 public byte[] getData() {
86 return data.clone();
87 }
88
89
90
91
92 public void setData(byte[] logEvent) {
93 this.data = (byte[]) logEvent.clone();
94 }
95
96
97
98
99 public String getStreamName() {
100 return streamName;
101 }
102
103 public void setStreamName(String logApplication) {
104 this.streamName = logApplication;
105 }
106
107 public String getSource() {
108 return source;
109 }
110
111 public void setSource(String logSource) {
112 this.source = logSource;
113 }
114
115 public String getDebugInfo() {
116 return debuggingInfo;
117 }
118
119 public void setDebugInfo(String a) {
120 this.debuggingInfo = a;
121 }
122
123
124
125
126 public long getSeqID() {
127 return seqID;
128 }
129
130 public void setSeqID(long l) {
131 seqID = l;
132 }
133
134 public int getProtocolVersion() {
135 return protocolVersion;
136 }
137
138 public void setProtocolVersion(int pv) {
139 this.protocolVersion = pv;
140 }
141
142 public Adaptor getInitiator() {
143 return initiator;
144 }
145
146 public void setInitiator(Adaptor a) {
147 initiator = a;
148 }
149
150 public void setLogSource() {
151 source = localHostAddr;
152 }
153
154 public int[] getRecordOffsets() {
155 if (recordEndOffsets == null)
156 recordEndOffsets = new int[] { data.length - 1 };
157 return recordEndOffsets.clone();
158 }
159
160 public void setRecordOffsets(int[] offsets) {
161 recordEndOffsets = (int[]) offsets.clone();
162 }
163
164 public String getDataType() {
165 return dataType;
166 }
167
168 public void setDataType(String t) {
169 dataType = t;
170 }
171
172 @Override
173 public void addTag(String tags) {
174 this.tags += " "+ tags;
175 }
176
177
178
179
180 public String getTags() {
181 return tags;
182 }
183
184
185
186
187 public String getTag(String tagName) {
188 Pattern tagPattern = Pattern.compile("\\b"+tagName+"=\"([^\"]*)\"");
189 if (tags != null) {
190 Matcher matcher = tagPattern.matcher(tags);
191 if (matcher.find()) {
192 return matcher.group(1);
193 }
194 }
195 return null;
196 }
197
198
199
200
201 public void readFields(DataInput in) throws IOException {
202 setProtocolVersion(in.readInt());
203 if (protocolVersion != PROTOCOL_VERSION) {
204 throw new IOException(
205 "Protocol version mismatched, drop data. source version: "
206 + protocolVersion + ", collector version:" + PROTOCOL_VERSION);
207 }
208 setSeqID(in.readLong());
209 setSource(in.readUTF());
210 tags = in.readUTF();
211 setStreamName(in.readUTF());
212 setDataType(in.readUTF());
213 setDebugInfo(in.readUTF());
214
215 int numRecords = in.readInt();
216 recordEndOffsets = new int[numRecords];
217 for (int i = 0; i < numRecords; ++i)
218 recordEndOffsets[i] = in.readInt();
219 data = new byte[recordEndOffsets[recordEndOffsets.length - 1] + 1];
220 in.readFully(data);
221
222 }
223
224
225
226
227 public void write(DataOutput out) throws IOException {
228 out.writeInt(PROTOCOL_VERSION);
229 out.writeLong(seqID);
230 out.writeUTF(source);
231 out.writeUTF(tags);
232 out.writeUTF(streamName);
233 out.writeUTF(dataType);
234 out.writeUTF(debuggingInfo);
235
236 if (recordEndOffsets == null)
237 recordEndOffsets = new int[] { data.length - 1 };
238
239 out.writeInt(recordEndOffsets.length);
240 for (int i = 0; i < recordEndOffsets.length; ++i)
241 out.writeInt(recordEndOffsets[i]);
242
243 out.write(data, 0, recordEndOffsets[recordEndOffsets.length - 1] + 1);
244
245 }
246
247 public static ChunkImpl read(DataInput in) throws IOException {
248 ChunkImpl w = new ChunkImpl();
249 w.readFields(in);
250 return w;
251 }
252
253 public String toString() {
254 StringBuilder buffer = new StringBuilder();
255 buffer.append(source);
256 buffer.append(":");
257 buffer.append(streamName);
258 buffer.append(new String(data, Charset.forName("UTF-8")));
259 buffer.append("/");
260 buffer.append(seqID);
261 return buffer.toString();
262 }
263
264
265
266
267
268
269 public int getSerializedSizeEstimate() {
270 int size = 2 * (source.length() + streamName.length() + dataType.length()
271 + debuggingInfo.length());
272 size += data.length + 4;
273 if (recordEndOffsets == null)
274 size += 8;
275 else
276 size += 4 * (recordEndOffsets.length + 1);
277 size += 8;
278 return size;
279 }
280
281 public void setRecordOffsets(java.util.Collection<Integer> carriageReturns) {
282 recordEndOffsets = new int[carriageReturns.size()];
283 int i = 0;
284 for (Integer offset : carriageReturns)
285 recordEndOffsets[i++] = offset;
286 }
287
288 public int getLength() {
289 return data.length;
290 }
291
292 }