This project has retired. For details please refer to its
Attic page.
ChunkImpl xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa;
20
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.regex.Matcher;
28 import java.util.regex.Pattern;
29
30 import org.apache.hadoop.chukwa.datacollection.DataFactory;
31 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
32
33 public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk {
34 public static int PROTOCOL_VERSION = 1;
35
36 protected DataFactory dataFactory = DataFactory.getInstance();
37 private String source = "";
38 private String streamName = "";
39 private String dataType = "";
40 private String tags = "";
41 private byte[] data = null;
42 private int[] recordEndOffsets;
43 private int protocolVersion = 1;
44 private String debuggingInfo = "";
45
46 private transient Adaptor initiator;
47 long seqID;
48
49 private static String localHostAddr;
50 static {
51 try {
52 setHostAddress(InetAddress.getLocalHost().getHostName());
53 } catch (UnknownHostException e) {
54 setHostAddress("localhost");
55 }
56 }
57
58 public static void setHostAddress(String host) {
59 ChunkImpl.localHostAddr = host;
60 }
61
62
63 public static ChunkImpl getBlankChunk() {
64 return new ChunkImpl();
65 }
66
67 ChunkImpl() {
68 }
69
70 public ChunkImpl(String dataType, String streamName, long seq, byte[] data,
71 Adaptor source) {
72 this.seqID = seq;
73 this.source = localHostAddr;
74 this.tags = dataFactory.getDefaultTags();
75 this.streamName = streamName;
76 this.dataType = dataType;
77 this.data = (byte[]) data.clone();
78 this.initiator = source;
79 }
80
81
82
83
84 public byte[] getData() {
85 return data.clone();
86 }
87
88
89
90
91 public void setData(byte[] logEvent) {
92 this.data = (byte[]) logEvent.clone();
93 }
94
95
96
97
98 public String getStreamName() {
99 return streamName;
100 }
101
102 public void setStreamName(String logApplication) {
103 this.streamName = logApplication;
104 }
105
106 public String getSource() {
107 return source;
108 }
109
110 public void setSource(String logSource) {
111 this.source = logSource;
112 }
113
114 public String getDebugInfo() {
115 return debuggingInfo;
116 }
117
118 public void setDebugInfo(String a) {
119 this.debuggingInfo = a;
120 }
121
122
123
124
125 public long getSeqID() {
126 return seqID;
127 }
128
129 public void setSeqID(long l) {
130 seqID = l;
131 }
132
133 public int getProtocolVersion() {
134 return protocolVersion;
135 }
136
137 public void setProtocolVersion(int pv) {
138 this.protocolVersion = pv;
139 }
140
141 public Adaptor getInitiator() {
142 return initiator;
143 }
144
145 public void setInitiator(Adaptor a) {
146 initiator = a;
147 }
148
149 public void setLogSource() {
150 source = localHostAddr;
151 }
152
153 public int[] getRecordOffsets() {
154 if (recordEndOffsets == null)
155 recordEndOffsets = new int[] { data.length - 1 };
156 return recordEndOffsets.clone();
157 }
158
159 public void setRecordOffsets(int[] offsets) {
160 recordEndOffsets = (int[]) offsets.clone();
161 }
162
163 public String getDataType() {
164 return dataType;
165 }
166
167 public void setDataType(String t) {
168 dataType = t;
169 }
170
171 @Override
172 public void addTag(String tags) {
173 this.tags += " "+ tags;
174 }
175
176
177
178
179 public String getTags() {
180 return tags;
181 }
182
183
184
185
186 public String getTag(String tagName) {
187 Pattern tagPattern = Pattern.compile("\\b"+tagName+"=\"([^\"]*)\"");
188 if (tags != null) {
189 Matcher matcher = tagPattern.matcher(tags);
190 if (matcher.find()) {
191 return matcher.group(1);
192 }
193 }
194 return null;
195 }
196
197
198
199
200 public void readFields(DataInput in) throws IOException {
201 setProtocolVersion(in.readInt());
202 if (protocolVersion != PROTOCOL_VERSION) {
203 throw new IOException(
204 "Protocol version mismatched, drop data. source version: "
205 + protocolVersion + ", collector version:" + PROTOCOL_VERSION);
206 }
207 setSeqID(in.readLong());
208 setSource(in.readUTF());
209 tags = in.readUTF();
210 setStreamName(in.readUTF());
211 setDataType(in.readUTF());
212 setDebugInfo(in.readUTF());
213
214 int numRecords = in.readInt();
215 recordEndOffsets = new int[numRecords];
216 for (int i = 0; i < numRecords; ++i)
217 recordEndOffsets[i] = in.readInt();
218 data = new byte[recordEndOffsets[recordEndOffsets.length - 1] + 1];
219 in.readFully(data);
220
221 }
222
223
224
225
226 public void write(DataOutput out) throws IOException {
227 out.writeInt(PROTOCOL_VERSION);
228 out.writeLong(seqID);
229 out.writeUTF(source);
230 out.writeUTF(tags);
231 out.writeUTF(streamName);
232 out.writeUTF(dataType);
233 out.writeUTF(debuggingInfo);
234
235 if (recordEndOffsets == null)
236 recordEndOffsets = new int[] { data.length - 1 };
237
238 out.writeInt(recordEndOffsets.length);
239 for (int i = 0; i < recordEndOffsets.length; ++i)
240 out.writeInt(recordEndOffsets[i]);
241
242 out.write(data, 0, recordEndOffsets[recordEndOffsets.length - 1] + 1);
243
244 }
245
246 public static ChunkImpl read(DataInput in) throws IOException {
247 ChunkImpl w = new ChunkImpl();
248 w.readFields(in);
249 return w;
250 }
251
252
253 public String toString() {
254 return source + ":" + streamName + ":" + new String(data) + "/" + seqID;
255 }
256
257
258
259
260
261
262 public int getSerializedSizeEstimate() {
263 int size = 2 * (source.length() + streamName.length() + dataType.length()
264 + debuggingInfo.length());
265 size += data.length + 4;
266 if (recordEndOffsets == null)
267 size += 8;
268 else
269 size += 4 * (recordEndOffsets.length + 1);
270 size += 8;
271 return size;
272 }
273
274 public void setRecordOffsets(java.util.Collection<Integer> carriageReturns) {
275 recordEndOffsets = new int[carriageReturns.size()];
276 int i = 0;
277 for (Integer offset : carriageReturns)
278 recordEndOffsets[i++] = offset;
279 }
280
281 public int getLength() {
282 return data.length;
283 }
284
285 }