This project has retired. For details please refer to its
Attic page.
SinkArchiver xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.archive;
19
20 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.*;
23 import org.apache.hadoop.util.ToolRunner;
24 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
25 import org.apache.log4j.Logger;
26 import java.io.IOException;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public class SinkArchiver implements CHUKWA_CONSTANT {
43
44 final public static PathFilter DATA_SINK_FILTER = new PathFilter() {
45 public boolean accept(Path file) {
46 return file.getName().endsWith(".done");
47 }
48 };
49
50 static Logger log = Logger.getLogger(SinkArchiver.class);
51
52 public static void main(String[] args) {
53 try {
54 Configuration conf = new ChukwaConfiguration();
55 if(conf.get(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME) == null )
56 conf.set(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME, "true");
57 FileSystem fs = FileSystem.get(conf);
58 SinkArchiver archiver = new SinkArchiver();
59 archiver.exec(fs, conf);
60 } catch(IOException e) {
61 e.printStackTrace();
62 }
63 }
64
65
66
67
68
69
70 public void exec(FileSystem fs, Configuration conf) {
71 try {
72
73 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
74 if ( ! chukwaRootDir.endsWith("/") ) {
75 chukwaRootDir += "/";
76 }
77 String archiveSource = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
78 if ( ! archiveSource.endsWith("/") ) {
79 archiveSource += "/";
80 }
81 String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
82
83
84 String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
85 String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
86
87 Path pSource = new Path(archiveSource);
88
89 Path pMRInputDir = new Path(archivesMRInputDir);
90 if(!fs.exists(pMRInputDir))
91 fs.mkdirs(pMRInputDir);
92
93 Path pOutputDir = new Path(archivesMROutputDir);
94 if(!fs.exists(pOutputDir))
95 fs.mkdirs(pOutputDir);
96
97 if(fs.listStatus(pOutputDir).length == 0)
98 fs.delete(pOutputDir, true);
99 Path archive = new Path(chukwaRootDir + "archive");
100
101 selectInputs(fs, pSource, pMRInputDir);
102
103 int result = runMapRedJob(conf, archivesMRInputDir, archivesMROutputDir);
104 if(result == 0) {
105 fs.delete(pMRInputDir, true);
106 }
107
108 if(!fs.exists(archive)) {
109 fs.mkdirs(archive);
110 }
111 FileStatus[] files = fs.listStatus(pOutputDir);
112 for(FileStatus f: files) {
113 if(!f.getPath().getName().endsWith("_logs"))
114 promoteAndMerge(fs, f.getPath(), archive);
115 }
116
117 fs.delete(pOutputDir, true);
118
119 } catch (Exception e) {
120 e.printStackTrace();
121 }
122 }
123
124 private void selectInputs(FileSystem fs, Path pSource,
125 Path pMRInputDir) throws IOException {
126
127 FileStatus[] dataSinkFiles = fs.listStatus(pSource, DATA_SINK_FILTER);
128 for(FileStatus fstatus: dataSinkFiles) {
129 boolean rename = fs.rename(fstatus.getPath(),pMRInputDir);
130 log.info("Moving " + fstatus.getPath() + " to " + pMRInputDir
131 +", status is: " + rename);
132 }
133
134 }
135
136 public int runMapRedJob(Configuration conf, String in, String out)
137 throws Exception {
138 String grouper = conf.get("archive.grouper","DataType");
139 String[] args = new String[] {grouper, in, out};
140 int res = ToolRunner.run(conf, new ChukwaArchiveBuilder(),
141 args);
142 return res;
143 }
144
145
146
147
148
149
150
151
152
153 public void promoteAndMerge(FileSystem fs, Path src, Path dest)
154 throws IOException {
155 FileStatus stat = fs.getFileStatus(src);
156 String baseName = src.getName();
157 Path target = new Path(dest, baseName);
158 if(!fs.exists(target)) {
159 fs.rename(src, dest);
160 System.out.println("moving " + src + " to " + dest);
161 } else if(stat.isDir()) {
162 FileStatus[] files = fs.listStatus(src);
163 for(FileStatus f: files) {
164 promoteAndMerge(fs, f.getPath(), target);
165 }
166 } else {
167 int i=0;
168 do {
169
170 String newName;
171 if(baseName.endsWith(".arc")) {
172 newName = baseName.substring(0, baseName.length() - 4) + "-"+i+".arc";
173 }
174 else
175 newName = baseName+"-"+i;
176 target = new Path(dest, newName);
177 } while(fs.exists(target));
178 fs.rename(src, target);
179 System.out.println("promoting " + src + " to " + target);
180 }
181
182 }
183 }