This project has retired. For details please refer to its Attic page.
ConstRateValidator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.util;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.*;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.io.*;
26  import org.apache.hadoop.mapreduce.*;
27  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
28  import org.apache.hadoop.mapreduce.lib.input.FileSplit;
29  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
30  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
31  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
32  import org.apache.hadoop.util.*;
33  import org.apache.hadoop.chukwa.*;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.conf.Configured;
36  
37  public class ConstRateValidator extends Configured implements Tool{
38    
39    public static class ByteRange implements WritableComparable<ByteRange> {
40      
41      String stream;
42      String split ="";
43      public long start;
44      public long len;
45      
46      public ByteRange() {
47        start=len=0;
48      }
49      
50      public ByteRange(ChunkImpl val) {
51        
52        len = val.getLength();
53        start = val.getSeqID() - len;     
54        this.stream = val.getSource()+":"+val.getStreamName() ;
55      }
56      
57      @Override
58      public void readFields(DataInput in) throws IOException {
59        stream = in.readUTF();
60        split = in.readUTF();
61        start = in.readLong();
62        len = in.readLong();
63      }
64      @Override
65      public void write(DataOutput out) throws IOException {
66        out.writeUTF(stream);
67        out.writeUTF(split);
68        out.writeLong(start);
69        out.writeLong(len);
70      }
71  
72      public static ByteRange read(DataInput in) throws IOException {
73        ByteRange b = new ByteRange();
74        b.readFields(in);
75        return b;
76      }
77      
78      @Override
79      public int compareTo(ByteRange o) {
80        int c = stream.compareTo(o.stream);
81        if(c != 0)
82          return c;
83        
84        if(start > o.start)
85          return 1;
86        else if (start < o.start)
87          return -1;
88        else {
89          if(len > o.len)
90            return 1;
91          else if(len < o.len)
92            return -1;
93          else
94            return split.compareTo(o.split);
95        }
96      }
97      
98      public boolean equals(Object o) {
99        if(o instanceof ByteRange) {
100         ByteRange rhs = (ByteRange) o;
101         return stream.equals(rhs.stream) &&
102          split.equals(rhs.split)&& rhs.start == start && rhs.len == len;
103       } else
104         return false;
105     }
106     
107     public int hashCode() {
108       return (int) (
109           stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >> 32)
110           ^ (start & 0xFFFFFFFF));
111     } 
112   }
113   
114   
115   
116   ///////  State machine; expects chunks in order ////////
117   public static class ValidatorSM {
118     public long ok=0, missingBytes=0,dupBytes=0;
119     long consecDupchunks=0;
120     long nextExpectedStart = 0;
121     public long chunks;
122     public long dupChunks;
123     public Set<String> filesContaining = new LinkedHashSet<String>();
124 
125     public String closeSM() {
126       if(consecDupchunks > 0)
127         return consecDupchunks + " consecutive duplicate chunks ending at " + consecDupchunks;
128       else
129         return null;
130     }
131     
132     public String advanceSM(ByteRange b) {
133       if(!b.split.equals(""))
134         filesContaining.add(b.split);
135       
136       chunks++;
137       
138       if(b.start == nextExpectedStart) {
139         String msg = null;
140         if(consecDupchunks > 0)
141           msg = consecDupchunks + " consecutive duplicative chunks ending at " + b.start;
142         consecDupchunks = 0;
143         nextExpectedStart += b.len;
144         ok += b.len;
145         return msg;
146       } else{
147 //        Text msg = new Text(b.stream + " " + consecOKchunks + 
148 //            "consecutive OK chunks ending at " + nextExpectedStart);
149         String msg;
150         if(b.start < nextExpectedStart) {    //duplicate bytes
151           consecDupchunks ++;
152           dupChunks++;
153           long duplicatedBytes;
154           if(b.start + b.len <= nextExpectedStart) {
155             duplicatedBytes = b.len;
156             msg =" dupchunk of length " + b.len + " at " + b.start;
157           } else {
158             duplicatedBytes = b.start + b.len - nextExpectedStart;
159             ok += b.len - duplicatedBytes;
160             msg = "  overlap of " + duplicatedBytes+ " starting at " + b.start +
161             " (total chunk len ="+b.len+")";
162           }
163           dupBytes += duplicatedBytes;
164           nextExpectedStart = Math.max(b.start + b.len, nextExpectedStart);
165         } else {  //b.start > nextExpectedStart  ==>  missing bytes
166           consecDupchunks = 0;
167           long missing = (b.start - nextExpectedStart);
168           msg = "==Missing "+ missing+ " bytes starting from " + nextExpectedStart;
169           nextExpectedStart = b.start + b.len;
170           
171           if(b.start < 0 || b.len < 0)
172             System.out.println("either len or start was negative; something is seriously wrong");
173           
174           missingBytes += missing;
175         }
176         return msg;
177       } //end not-OK  
178     } //end advance
179   } //end class
180   
181   
182   ///////  Map Class /////////
183   public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange, NullWritable> {
184     
185     @Override
186     protected void map(ChukwaArchiveKey key, ChunkImpl val, 
187         Mapper<ChukwaArchiveKey, ChunkImpl,ByteRange, NullWritable>.Context context)
188         throws IOException, InterruptedException 
189     {
190       boolean valid = ConstRateAdaptor.checkChunk(val);
191       String fname = "unknown";
192       
193       ByteRange ret = new ByteRange(val);
194       
195       InputSplit inSplit = context.getInputSplit();
196       if(inSplit instanceof FileSplit) {
197         FileSplit fs = (FileSplit) inSplit;
198         fname = fs.getPath().getName();
199       }
200       ret.split = fname;
201       
202       if(!valid) {
203         context.getCounter("app", "badchunks").increment(1);
204       }
205       context.write(ret, NullWritable.get());
206     }
207   }
208     
209   public static class ReduceClass extends Reducer<ByteRange, NullWritable, Text,Text> {
210     
211     ValidatorSM sm;
212     String curStream = "";
213     
214     public ReduceClass() {
215       sm = new ValidatorSM();
216     }
217     
218 //    @Override
219 //    protected void setup(Reducer<ByteRange, NullWritable, Text,Text>.Context context) {       }
220     
221     @Override
222     protected void reduce(ByteRange b, Iterable<NullWritable> vals, 
223         Reducer<ByteRange, NullWritable, Text,Text>.Context context) {
224       try {
225 
226       if(!curStream.equals(b.stream)) {
227         if(!curStream.equals("")) {
228           printEndOfStream(context);
229         }
230         
231         System.out.println("rolling over to new stream " + b.stream);
232         curStream = b.stream;
233         sm = new ValidatorSM();
234       }
235       
236       String msg = sm.advanceSM(b);
237       if(msg != null)
238         context.write(new Text(b.stream), new Text(msg));
239 
240     } catch(InterruptedException e) {
241     } catch(IOException e) {
242       e.printStackTrace();
243     }
244   }
245     
246     @Override
247     protected void cleanup(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
248     throws IOException, InterruptedException{
249       printEndOfStream(context);
250     }
251     
252     public void printEndOfStream(Reducer<ByteRange, NullWritable, Text,Text>.Context context) 
253     throws IOException, InterruptedException {
254       Text cs = new Text(curStream);
255 
256       String t = sm.closeSM();
257       if(t != null)
258         context.write(cs, new Text(t));
259       if(!sm.filesContaining.isEmpty()) {
260         StringBuilder sb = new StringBuilder();
261         sb.append("Data contained in");
262         for(String s: sm.filesContaining) 
263           sb.append(" ").append(s);
264         context.write(cs, new Text(sb.toString()));
265       }
266       context.write(cs, new Text("total of " + sm.chunks + " chunks ("
267          + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
268       
269       context.getCounter("app", "missing bytes").increment(sm.missingBytes);
270       context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
271       context.getCounter("app", "OK Bytes").increment(sm.ok);
272     }
273   } //end reduce class
274 
275 
276   public static void main(String[] args) throws Exception {
277  //   System.out.println("specify -D textOutput=true for text output");
278     int res = ToolRunner.run(new Configuration(),
279         new ConstRateValidator(), args);
280     return;
281   }
282 
283   @Override
284   public int run(String[] real_args) throws Exception {
285     GenericOptionsParser gop = new GenericOptionsParser(getConf(), real_args);
286     Configuration conf = gop.getConfiguration();
287     String[] args = gop.getRemainingArgs();
288 
289     Job validate = new Job(conf);
290     
291     validate.setJobName("Chukwa Test pattern validator");
292     validate.setJarByClass(this.getClass());
293     
294     validate.setInputFormatClass(SequenceFileInputFormat.class);
295     
296     validate.setMapperClass(MapClass.class);
297     validate.setMapOutputKeyClass(ByteRange.class);
298     validate.setMapOutputValueClass(NullWritable.class);
299 
300     validate.setReducerClass(ReduceClass.class);
301     validate.setOutputFormatClass(TextOutputFormat.class);
302 
303     
304     FileInputFormat.setInputPaths(validate, new Path(args[0]));
305     FileOutputFormat.setOutputPath(validate, new Path(args[1]));
306 
307     validate.submit();
308     return 0;
309   }
310 
311 }