This project has retired. For details please refer to its
Attic page.
DumpChunks xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.util;
19
20
21 import java.net.URI;
22 import java.net.URISyntaxException;
23 import java.util.regex.*;
24 import java.util.*;
25 import java.io.*;
26 import org.apache.hadoop.chukwa.*;
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.fs.FileUtil;
32 import org.apache.hadoop.io.SequenceFile;
33 import org.apache.hadoop.conf.Configuration;
34
35 public class DumpChunks {
36
37
38
39
40
41
42
43
44
45 public static void main(String[] args) throws IOException, URISyntaxException {
46
47 if(args.length < 2) {
48 System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
49 System.exit(-1);
50 }
51
52 ChukwaConfiguration conf = new ChukwaConfiguration();
53
54 dump(args, conf, System.out);
55 }
56
57 static FileSystem getFS(Configuration conf, String uri) throws IOException, URISyntaxException {
58 FileSystem fs;
59 if(uri.contains("://")) {
60 fs = FileSystem.get(new URI(uri), conf);
61 } else {
62 String fsName = conf.get("writer.hdfs.filesystem");
63 if(fsName == null)
64 fs = FileSystem.getLocal(conf);
65 else
66 fs = FileSystem.get(conf);
67 }
68 System.err.println("filesystem is " + fs.getUri());
69 return fs;
70 }
71
72 static void dump(String[] args, Configuration conf, PrintStream out) throws IOException, URISyntaxException {
73
74 int filterArg = 0;
75 boolean summarize = false;
76 boolean nosort = false;
77 if(args[0].equals("-s")) {
78 filterArg++;
79 summarize = true;
80 } else if(args[0].equals("--nosort")) {
81 filterArg++;
82 nosort = true;
83 }
84
85 Filter patterns = null;
86 if(args[filterArg].toLowerCase().equals("all"))
87 patterns = Filter.ALL;
88 else {
89 try {
90 patterns = new Filter(args[filterArg]);
91 } catch (CheckedPatternSyntaxException pse) {
92 System.err.println("Error parsing \"tags\" regular expression: " + pse.getMessage());
93 return;
94 }
95 }
96
97 System.err.println("Patterns:" + patterns);
98 ArrayList<Path> filesToSearch = new ArrayList<Path>();
99
100 FileSystem fs = getFS(conf, args[filterArg + 1]);
101 for(int i=filterArg + 1; i < args.length; ++i){
102 Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
103 if(globbedPaths != null)
104 for(Path p: globbedPaths)
105 filesToSearch.add(p);
106 }
107
108 System.err.println("expands to " + filesToSearch.size() + " actual files");
109
110 DumpChunks dc;
111 if(summarize)
112 dc = new DumpAndSummarize();
113 else if(nosort)
114 dc = new DumpNoSort(out);
115 else
116 dc= new DumpChunks();
117
118 try {
119 for(Path p: filesToSearch) {
120
121 SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
122
123 ChukwaArchiveKey key = new ChukwaArchiveKey();
124 ChunkImpl chunk = ChunkImpl.getBlankChunk();
125 while (r.next(key, chunk)) {
126 if(patterns.matches(chunk)) {
127 dc.updateMatchCatalog(key.getStreamName(), chunk);
128 chunk = ChunkImpl.getBlankChunk();
129 }
130 }
131 }
132
133 dc.displayResults(out);
134
135 } catch (Exception e) {
136 e.printStackTrace();
137 }
138 }
139
140 public DumpChunks() {
141 matchCatalog = new HashMap<String, SortedMap<Long, ChunkImpl> >();
142 }
143
144 Map<String, SortedMap<Long, ChunkImpl>> matchCatalog;
145
146 protected void displayResults(PrintStream out) throws IOException{
147 for(Map.Entry<String,SortedMap<Long, ChunkImpl>> streamE: matchCatalog.entrySet()) {
148 String header = streamE.getKey();
149 SortedMap<Long, ChunkImpl> stream = streamE.getValue();
150 long nextToPrint = 0;
151 if(stream.firstKey() > 0)
152 System.err.println("---- map starts at "+ stream.firstKey());
153 for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
154 if(e.getKey() >= nextToPrint) {
155 if(e.getKey() > nextToPrint)
156 System.err.println("---- printing bytes starting at " + e.getKey());
157
158 out.write(e.getValue().getData());
159 nextToPrint = e.getValue().getSeqID();
160 } else if(e.getValue().getSeqID() < nextToPrint) {
161 continue;
162 } else {
163
164 ChunkImpl c = e.getValue();
165 long chunkStartPos = e.getKey();
166 int numToPrint = (int) (c.getSeqID() - nextToPrint);
167 int printStartOffset = (int) ( nextToPrint - chunkStartPos);
168 out.write(c.getData(), printStartOffset, numToPrint);
169 nextToPrint = c.getSeqID();
170 }
171 }
172 out.println("\n--------"+header + "--------");
173 }
174 }
175
176 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) throws IOException {
177
178 SortedMap<Long, ChunkImpl> chunksInStream = matchCatalog.get(streamName);
179 if(chunksInStream == null ) {
180 chunksInStream = new TreeMap<Long, ChunkImpl>();
181 matchCatalog.put(streamName, chunksInStream);
182 }
183
184 long startPos = chunk.getSeqID() - chunk.getLength();
185
186 ChunkImpl prevMatch = chunksInStream.get(startPos);
187 if(prevMatch == null)
188 chunksInStream.put(startPos, chunk);
189 else {
190 if(chunk.getLength() > prevMatch.getLength())
191 chunksInStream.put (startPos, chunk);
192 }
193 }
194
195 static class DumpAndSummarize extends DumpChunks {
196 Map<String, Integer> matchCounts = new LinkedHashMap<String, Integer>();
197 Map<String, Long> byteCounts = new LinkedHashMap<String, Long>();
198
199
200 protected void displayResults(PrintStream out) throws IOException{
201 for(Map.Entry<String, Integer> s: matchCounts.entrySet()) {
202 out.print(s.getKey());
203 out.print(" ");
204 out.print(s.getValue());
205 out.print(" chunks ");
206 out.print(byteCounts.get(s.getKey()));
207 out.println(" bytes");
208 }
209
210 }
211
212 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) {
213 Integer i = matchCounts.get(streamName);
214 if(i != null) {
215 matchCounts.put(streamName, i+1);
216 Long b = byteCounts.get(streamName);
217 byteCounts.put(streamName, b + chunk.getLength());
218 } else {
219 matchCounts.put(streamName, new Integer(1));
220 byteCounts.put(streamName, new Long(chunk.getLength()));
221 }
222 }
223
224 }
225
226 static class DumpNoSort extends DumpChunks {
227
228 PrintStream out;
229 public DumpNoSort(PrintStream out) {
230 this.out = out;
231 }
232
233 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) throws IOException {
234 out.write(chunk.getData());
235 }
236
237 protected void displayResults(PrintStream out) throws IOException{
238
239 }
240
241 }
242
243 }