This project has retired. For details please refer to its Attic page.
DumpArchive xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.util;
19  
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.*;
25  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
26  import org.apache.hadoop.chukwa.ChunkImpl;
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.FileUtil;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.io.SequenceFile;
32  import org.apache.hadoop.conf.Configuration;
33  
34  /**
35   * Tool for exploring the contents of the Chukwa data archive, or a collection
36   * of Chukwa sequence files.
37   * 
38   * Limitation: DumpArchive infers the filesystem to dump from based on the first
39   * path argument, and will behave strangely if you try to dump files
40   * from different filesystems in the same invocation.
41   *
42   */
43  public class DumpArchive {
44  
45    static boolean summarize = false;
46    
47    static HashMap<String, Integer> counts  = new LinkedHashMap<String, Integer>();
48    /**
49     * @param args
50     * @throws URISyntaxException
51     * @throws IOException
52     */
53    public static void main(String[] args) throws IOException, URISyntaxException {
54  
55      int firstArg = 0;
56      if(args.length == 0) {
57        System.out.println("Usage: DumpArchive [--summarize] <sequence files>");
58      }
59      if(args[0].equals("--summarize")) {
60        firstArg = 1;
61        summarize= true;
62      } 
63      ChukwaConfiguration conf = new ChukwaConfiguration();
64      FileSystem fs;
65      if(args[firstArg].contains("://")) {
66        fs = FileSystem.get(new URI(args[firstArg]), conf);
67      } else {
68        String fsName = conf.get("writer.hdfs.filesystem");
69        if(fsName != null)
70          fs = FileSystem.get(conf);
71        else
72          fs = FileSystem.getLocal(conf);
73      }
74      ArrayList<Path> filesToSearch = new ArrayList<Path>();
75      for(int i=firstArg; i < args.length; ++i){
76        Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
77        for(Path p: globbedPaths)
78          filesToSearch.add(p);
79      }
80      int tot = filesToSearch.size();
81      int i=1;
82  
83      System.err.println("total of " + tot + " files to search");
84      for(Path p: filesToSearch) {
85        System.err.println("scanning " + p.toUri() + "("+ (i++) +"/"+tot+")");
86        dumpFile(p, conf, fs);
87      }
88  
89      if(summarize) {
90        for(Map.Entry<String, Integer> count: counts.entrySet()) {
91          System.out.println(count.getKey()+ ")   ===> " + count.getValue());
92        }
93      }
94    }
95  
96    private static void dumpFile(Path p, Configuration conf,
97        FileSystem fs) throws IOException {
98      SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
99  
100     ChukwaArchiveKey key = new ChukwaArchiveKey();
101     ChunkImpl chunk = ChunkImpl.getBlankChunk();
102     try {
103       while (r.next(key, chunk)) {
104         
105         String entryKey = chunk.getSource() +":"+chunk.getDataType() +":" +
106         chunk.getStreamName();
107         
108         Integer oldC = counts.get(entryKey);
109         if(oldC != null)
110           counts.put(entryKey, oldC + 1);
111         else
112           counts.put(entryKey, new Integer(1));
113         
114         if(!summarize) {
115           System.out.println("\nTimePartition: " + key.getTimePartition());
116           System.out.println("DataType: " + key.getDataType());
117           System.out.println("StreamName: " + key.getStreamName());
118           System.out.println("SeqId: " + key.getSeqId());
119           System.out.println("\t\t =============== ");
120   
121           System.out.println("Cluster : " + chunk.getTags());
122           System.out.println("DataType : " + chunk.getDataType());
123           System.out.println("Source : " + chunk.getSource());
124           System.out.println("Application : " + chunk.getStreamName());
125           System.out.println("SeqID : " + chunk.getSeqID());
126           System.out.println("Data : " + new String(chunk.getData()));
127         }
128       }
129     } catch (Exception e) {
130       e.printStackTrace();
131     }
132   }
133 
134 }