This project has retired. For details please refer to its Attic page.
DumpChunks xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.util;
19  
20  
21  import java.net.URI;
22  import java.net.URISyntaxException;
23  import java.util.regex.*;
24  import java.util.*;
25  import java.io.*;
26  import org.apache.hadoop.chukwa.*;
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.fs.FileUtil;
32  import org.apache.hadoop.io.SequenceFile;
33  import org.apache.hadoop.conf.Configuration;
34  
35  public class DumpChunks {
36  
37    
38    /**
39     * Tries to find chunks matching a given pattern.
40     * Takes as input a set of &-delimited patterns, followed
41     * by a list of file names.
42     * 
43     * E.g:  Dump datatype=Iostat&source=/my/log/.* *.done
44     */
45    public static void main(String[] args) throws IOException, URISyntaxException {
46      
47      if(args.length < 2) {
48        System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
49        System.exit(-1);
50      }
51      
52      ChukwaConfiguration conf = new ChukwaConfiguration();
53  
54      dump(args, conf, System.out);
55    }
56    
57    static FileSystem getFS(Configuration conf, String uri) throws IOException, URISyntaxException {
58      FileSystem fs;
59      if(uri.contains("://")) {
60        fs = FileSystem.get(new URI(uri), conf);
61      } else {
62        String fsName = conf.get("writer.hdfs.filesystem");
63        if(fsName == null)
64          fs = FileSystem.getLocal(conf);
65        else
66          fs = FileSystem.get(conf);
67      }
68      System.err.println("filesystem is " + fs.getUri());
69      return fs;
70    }
71  
72    static void dump(String[] args, Configuration conf, PrintStream out) throws IOException, URISyntaxException {
73      
74      int filterArg = 0;
75      boolean summarize = false;
76      boolean nosort = false;
77      if(args[0].equals("-s")) {
78        filterArg++;
79        summarize = true;
80      } else if(args[0].equals("--nosort")) {
81        filterArg++;
82        nosort = true;
83      }
84      
85      Filter patterns = null;
86      if(args[filterArg].toLowerCase().equals("all"))
87        patterns = Filter.ALL;
88      else {
89        try {
90          patterns = new Filter(args[filterArg]);
91        } catch (CheckedPatternSyntaxException pse) {
92          System.err.println("Error parsing \"tags\" regular expression: " + pse.getMessage());
93          return;
94        }
95      }
96  
97      System.err.println("Patterns:" + patterns);
98      ArrayList<Path> filesToSearch = new ArrayList<Path>();
99  
100     FileSystem fs = getFS(conf, args[filterArg + 1]);
101     for(int i=filterArg + 1; i < args.length; ++i){
102       Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
103       if(globbedPaths != null)
104         for(Path p: globbedPaths)
105           filesToSearch.add(p);
106     }
107     
108     System.err.println("expands to " + filesToSearch.size() + " actual files");
109 
110     DumpChunks dc;
111     if(summarize)
112       dc = new DumpAndSummarize();
113     else if(nosort)
114       dc = new DumpNoSort(out);
115     else
116       dc= new DumpChunks();
117     
118     try {
119       for(Path p: filesToSearch) {
120       
121         SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
122   
123         ChukwaArchiveKey key = new ChukwaArchiveKey();
124         ChunkImpl chunk = ChunkImpl.getBlankChunk();
125         while (r.next(key, chunk)) {
126           if(patterns.matches(chunk)) {
127             dc.updateMatchCatalog(key.getStreamName(), chunk);
128             chunk = ChunkImpl.getBlankChunk();
129           }
130         }
131       }
132       
133       dc.displayResults(out);
134       
135     } catch (Exception e) {
136       e.printStackTrace();
137     }
138   }
139 
140   public DumpChunks() {
141     matchCatalog = new HashMap<String, SortedMap<Long, ChunkImpl> >();
142   }
143 
144   Map<String, SortedMap<Long, ChunkImpl>> matchCatalog;
145   
146   protected void displayResults(PrintStream out) throws IOException{
147     for(Map.Entry<String,SortedMap<Long, ChunkImpl>> streamE: matchCatalog.entrySet()) {
148       String header = streamE.getKey();
149       SortedMap<Long, ChunkImpl> stream = streamE.getValue();
150       long nextToPrint = 0;
151       if(stream.firstKey() > 0)
152         System.err.println("---- map starts at "+ stream.firstKey());
153       for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
154         if(e.getKey() >= nextToPrint) {
155           if(e.getKey() > nextToPrint)
156             System.err.println("---- printing bytes starting at " + e.getKey());
157           
158           out.write(e.getValue().getData());
159           nextToPrint = e.getValue().getSeqID();
160         } else if(e.getValue().getSeqID() < nextToPrint) {
161           continue; //data already printed
162         } else {
163           //tricky case: chunk overlaps with already-printed data, but not completely
164           ChunkImpl c = e.getValue();
165           long chunkStartPos = e.getKey();
166           int numToPrint = (int) (c.getSeqID() - nextToPrint);
167           int printStartOffset = (int) ( nextToPrint -  chunkStartPos);
168           out.write(c.getData(), printStartOffset, numToPrint);
169           nextToPrint = c.getSeqID();
170         }
171       }
172       out.println("\n--------"+header + "--------");
173     }
174   }
175  
176   protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) throws IOException {
177 
178     SortedMap<Long, ChunkImpl> chunksInStream = matchCatalog.get(streamName);
179     if(chunksInStream == null ) {
180       chunksInStream = new TreeMap<Long, ChunkImpl>();
181       matchCatalog.put(streamName, chunksInStream);
182     }
183     
184     long startPos = chunk.getSeqID() - chunk.getLength();
185     
186     ChunkImpl prevMatch = chunksInStream.get(startPos);
187     if(prevMatch == null)
188       chunksInStream.put(startPos, chunk);
189     else { //pick longest
190       if(chunk.getLength() > prevMatch.getLength())
191         chunksInStream.put (startPos, chunk);
192     }
193   }
194 
195   static class DumpAndSummarize extends DumpChunks {
196     Map<String, Integer> matchCounts = new LinkedHashMap<String, Integer>();
197     Map<String, Long> byteCounts = new LinkedHashMap<String, Long>();
198     
199 
200     protected void displayResults(PrintStream out) throws IOException{
201       for(Map.Entry<String, Integer> s: matchCounts.entrySet()) {
202         out.print(s.getKey());
203         out.print(" ");
204         out.print(s.getValue());
205         out.print(" chunks ");
206         out.print(byteCounts.get(s.getKey()));
207         out.println(" bytes");
208       }
209         
210     }
211     
212     protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) {
213       Integer i = matchCounts.get(streamName);
214       if(i != null) {
215         matchCounts.put(streamName, i+1);
216         Long b = byteCounts.get(streamName);
217         byteCounts.put(streamName, b + chunk.getLength());
218       } else {
219         matchCounts.put(streamName, new Integer(1));
220         byteCounts.put(streamName, new Long(chunk.getLength()));
221       }
222     }
223     
224   }
225   
226   static class DumpNoSort extends DumpChunks {
227     
228     PrintStream out; 
229     public DumpNoSort(PrintStream out) {
230       this.out = out;
231     }
232     //Do some display
233     protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) throws IOException {
234       out.write(chunk.getData());
235     }
236     
237     protected void displayResults(PrintStream out) throws IOException{
238       //did this in updateMatchCatalog
239     }
240     
241   }
242 
243 }