This project has retired. For details please refer to its
Attic page.
ConstRateValidator xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.util;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.*;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.*;
26 import org.apache.hadoop.mapreduce.*;
27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
28 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
29 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
30 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
32 import org.apache.hadoop.util.*;
33 import org.apache.hadoop.chukwa.*;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.conf.Configured;
36
37 public class ConstRateValidator extends Configured implements Tool{
38
39 public static class ByteRange implements WritableComparable<ByteRange> {
40
41 String stream;
42 String split ="";
43 public long start;
44 public long len;
45
46 public ByteRange() {
47 start=len=0;
48 }
49
50 public ByteRange(ChunkImpl val) {
51
52 len = val.getLength();
53 start = val.getSeqID() - len;
54 this.stream = val.getSource()+":"+val.getStreamName() ;
55 }
56
57 @Override
58 public void readFields(DataInput in) throws IOException {
59 stream = in.readUTF();
60 split = in.readUTF();
61 start = in.readLong();
62 len = in.readLong();
63 }
64 @Override
65 public void write(DataOutput out) throws IOException {
66 out.writeUTF(stream);
67 out.writeUTF(split);
68 out.writeLong(start);
69 out.writeLong(len);
70 }
71
72 public static ByteRange read(DataInput in) throws IOException {
73 ByteRange b = new ByteRange();
74 b.readFields(in);
75 return b;
76 }
77
78 @Override
79 public int compareTo(ByteRange o) {
80 int c = stream.compareTo(o.stream);
81 if(c != 0)
82 return c;
83
84 if(start > o.start)
85 return 1;
86 else if (start < o.start)
87 return -1;
88 else {
89 if(len > o.len)
90 return 1;
91 else if(len < o.len)
92 return -1;
93 else
94 return split.compareTo(o.split);
95 }
96 }
97
98 public boolean equals(Object o) {
99 if(o instanceof ByteRange) {
100 ByteRange rhs = (ByteRange) o;
101 return stream.equals(rhs.stream) &&
102 split.equals(rhs.split)&& rhs.start == start && rhs.len == len;
103 } else
104 return false;
105 }
106
107 public int hashCode() {
108 return (int) (
109 stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >> 32)
110 ^ (start & 0xFFFFFFFF));
111 }
112 }
113
114
115
116
117 public static class ValidatorSM {
118 public long ok=0, missingBytes=0,dupBytes=0;
119 long consecDupchunks=0;
120 long nextExpectedStart = 0;
121 public long chunks;
122 public long dupChunks;
123 public Set<String> filesContaining = new LinkedHashSet<String>();
124
125 public String closeSM() {
126 if(consecDupchunks > 0)
127 return consecDupchunks + " consecutive duplicate chunks ending at " + consecDupchunks;
128 else
129 return null;
130 }
131
132 public String advanceSM(ByteRange b) {
133 if(!b.split.equals(""))
134 filesContaining.add(b.split);
135
136 chunks++;
137
138 if(b.start == nextExpectedStart) {
139 String msg = null;
140 if(consecDupchunks > 0)
141 msg = consecDupchunks + " consecutive duplicative chunks ending at " + b.start;
142 consecDupchunks = 0;
143 nextExpectedStart += b.len;
144 ok += b.len;
145 return msg;
146 } else{
147
148
149 String msg;
150 if(b.start < nextExpectedStart) {
151 consecDupchunks ++;
152 dupChunks++;
153 long duplicatedBytes;
154 if(b.start + b.len <= nextExpectedStart) {
155 duplicatedBytes = b.len;
156 msg =" dupchunk of length " + b.len + " at " + b.start;
157 } else {
158 duplicatedBytes = b.start + b.len - nextExpectedStart;
159 ok += b.len - duplicatedBytes;
160 msg = " overlap of " + duplicatedBytes+ " starting at " + b.start +
161 " (total chunk len ="+b.len+")";
162 }
163 dupBytes += duplicatedBytes;
164 nextExpectedStart = Math.max(b.start + b.len, nextExpectedStart);
165 } else {
166 consecDupchunks = 0;
167 long missing = (b.start - nextExpectedStart);
168 msg = "==Missing "+ missing+ " bytes starting from " + nextExpectedStart;
169 nextExpectedStart = b.start + b.len;
170
171 if(b.start < 0 || b.len < 0)
172 System.out.println("either len or start was negative; something is seriously wrong");
173
174 missingBytes += missing;
175 }
176 return msg;
177 }
178 }
179 }
180
181
182
183 public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange, NullWritable> {
184
185 @Override
186 protected void map(ChukwaArchiveKey key, ChunkImpl val,
187 Mapper<ChukwaArchiveKey, ChunkImpl,ByteRange, NullWritable>.Context context)
188 throws IOException, InterruptedException
189 {
190 boolean valid = ConstRateAdaptor.checkChunk(val);
191 String fname = "unknown";
192
193 ByteRange ret = new ByteRange(val);
194
195 InputSplit inSplit = context.getInputSplit();
196 if(inSplit instanceof FileSplit) {
197 FileSplit fs = (FileSplit) inSplit;
198 fname = fs.getPath().getName();
199 }
200 ret.split = fname;
201
202 if(!valid) {
203 context.getCounter("app", "badchunks").increment(1);
204 }
205 context.write(ret, NullWritable.get());
206 }
207 }
208
209 public static class ReduceClass extends Reducer<ByteRange, NullWritable, Text,Text> {
210
211 ValidatorSM sm;
212 String curStream = "";
213
214 public ReduceClass() {
215 sm = new ValidatorSM();
216 }
217
218
219
220
221 @Override
222 protected void reduce(ByteRange b, Iterable<NullWritable> vals,
223 Reducer<ByteRange, NullWritable, Text,Text>.Context context) {
224 try {
225
226 if(!curStream.equals(b.stream)) {
227 if(!curStream.equals("")) {
228 printEndOfStream(context);
229 }
230
231 System.out.println("rolling over to new stream " + b.stream);
232 curStream = b.stream;
233 sm = new ValidatorSM();
234 }
235
236 String msg = sm.advanceSM(b);
237 if(msg != null)
238 context.write(new Text(b.stream), new Text(msg));
239
240 } catch(InterruptedException e) {
241 } catch(IOException e) {
242 e.printStackTrace();
243 }
244 }
245
246 @Override
247 protected void cleanup(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
248 throws IOException, InterruptedException{
249 printEndOfStream(context);
250 }
251
252 public void printEndOfStream(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
253 throws IOException, InterruptedException {
254 Text cs = new Text(curStream);
255
256 String t = sm.closeSM();
257 if(t != null)
258 context.write(cs, new Text(t));
259 if(!sm.filesContaining.isEmpty()) {
260 StringBuilder sb = new StringBuilder();
261 sb.append("Data contained in");
262 for(String s: sm.filesContaining)
263 sb.append(" ").append(s);
264 context.write(cs, new Text(sb.toString()));
265 }
266 context.write(cs, new Text("total of " + sm.chunks + " chunks ("
267 + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
268
269 context.getCounter("app", "missing bytes").increment(sm.missingBytes);
270 context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
271 context.getCounter("app", "OK Bytes").increment(sm.ok);
272 }
273 }
274
275
276 public static void main(String[] args) throws Exception {
277
278 int res = ToolRunner.run(new Configuration(),
279 new ConstRateValidator(), args);
280 return;
281 }
282
283 @Override
284 public int run(String[] real_args) throws Exception {
285 GenericOptionsParser gop = new GenericOptionsParser(getConf(), real_args);
286 Configuration conf = gop.getConfiguration();
287 String[] args = gop.getRemainingArgs();
288
289 Job validate = new Job(conf);
290
291 validate.setJobName("Chukwa Test pattern validator");
292 validate.setJarByClass(this.getClass());
293
294 validate.setInputFormatClass(SequenceFileInputFormat.class);
295
296 validate.setMapperClass(MapClass.class);
297 validate.setMapOutputKeyClass(ByteRange.class);
298 validate.setMapOutputValueClass(NullWritable.class);
299
300 validate.setReducerClass(ReduceClass.class);
301 validate.setOutputFormatClass(TextOutputFormat.class);
302
303
304 FileInputFormat.setInputPaths(validate, new Path(args[0]));
305 FileOutputFormat.setOutputPath(validate, new Path(args[1]));
306
307 validate.submit();
308 return 0;
309 }
310
311 }