This project has retired. For details please refer to its
Attic page.
MoveOrMergeRecordFile xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27 import org.apache.hadoop.conf.Configured;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.PathFilter;
33 import org.apache.hadoop.mapred.FileInputFormat;
34 import org.apache.hadoop.mapred.FileOutputFormat;
35 import org.apache.hadoop.mapred.JobClient;
36 import org.apache.hadoop.mapred.JobConf;
37 import org.apache.hadoop.mapred.SequenceFileInputFormat;
38 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
39 import org.apache.hadoop.mapred.lib.IdentityMapper;
40 import org.apache.hadoop.mapred.lib.IdentityReducer;
41 import org.apache.hadoop.util.Tool;
42 import org.apache.hadoop.util.ToolRunner;
43
44 public class MoveOrMergeRecordFile extends Configured implements Tool {
45 static ChukwaConfiguration conf = null;
46 static FileSystem fs = null;
47 static final String HadoopLogDir = "_logs";
48 static final String hadoopTempDir = "_temporary";
49
50 public int run(String[] args) throws Exception {
51 JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
52
53 conf.setJobName("Chukwa-MoveOrMergeLogFile");
54 conf.setInputFormat(SequenceFileInputFormat.class);
55
56 conf.setMapperClass(IdentityMapper.class);
57 conf.setReducerClass(IdentityReducer.class);
58
59
60
61
62 conf.setOutputKeyClass(ChukwaRecordKey.class);
63 conf.setOutputValueClass(ChukwaRecord.class);
64 conf.setOutputFormat(SequenceFileOutputFormat.class);
65
66 FileInputFormat.setInputPaths(conf, args[0]);
67 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
68
69 JobClient.runJob(conf);
70 return 0;
71 }
72
73 static void moveOrMergeOneCluster(Path srcDir, String destDir)
74 throws Exception {
75 System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + ","
76 + destDir + ")");
77 FileStatus fstat = fs.getFileStatus(srcDir);
78
79 if (!fstat.isDir()) {
80 throw new IOException(srcDir + " is not a directory!");
81 } else {
82 FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
83 for (FileStatus datasourceDirectory : datasourceDirectories) {
84 System.out.println(datasourceDirectory.getPath() + " isDir?"
85 + datasourceDirectory.isDir());
86 if (!datasourceDirectory.isDir()) {
87 throw new IOException("Top level should just contains directories :"
88 + datasourceDirectory.getPath());
89 }
90
91 String dirName = datasourceDirectory.getPath().getName();
92
93 Path destPath = new Path(destDir + "/" + dirName);
94 System.out.println("dest directory path: " + destPath);
95
96 if (!fs.exists(destPath)) {
97 System.out.println("create datasource directory [" + destDir + "/"
98 + dirName + "]");
99 fs.mkdirs(destPath);
100 }
101
102 FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),
103 new EventFileFilter());
104 for (FileStatus eventFile : evts) {
105
106 Path eventFilePath = eventFile.getPath();
107 String filename = eventFilePath.getName();
108 System.out.println("src dir File: [" + filename + "]");
109 Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
110 if (!fs.exists(destFilePath)) {
111 System.out.println("Moving File: [" + destFilePath + "]");
112
113 FileUtil.copy(fs, eventFilePath, fs, destFilePath, false, false,
114 conf);
115 } else {
116 System.out.println("Need to merge! : [" + destFilePath + "]");
117 String strMrPath = datasourceDirectory.getPath().toString() + "/"
118 + "MR_" + System.currentTimeMillis();
119 Path mrPath = new Path(strMrPath);
120 System.out.println("\t New MR directory : [" + mrPath + "]");
121
122 fs.mkdirs(mrPath);
123
124 FileUtil.copy(fs, eventFilePath, fs,
125 new Path(strMrPath + "/1.evt"), false, false, conf);
126 fs.rename(destFilePath, new Path(strMrPath + "/2.evt"));
127
128
129 String[] mergeArgs = new String[2];
130 mergeArgs[0] = strMrPath;
131 mergeArgs[1] = strMrPath + "/mrOutput";
132 DoMerge merge = new DoMerge(conf, fs, eventFilePath, destFilePath,
133 mergeArgs);
134 merge.start();
135 }
136 }
137 }
138 }
139
140 }
141
142
143
144
145
146 public static void main(String[] args) throws Exception {
147 conf = new ChukwaConfiguration();
148 String fsName = conf.get("writer.hdfs.filesystem");
149 fs = FileSystem.get(new URI(fsName), conf);
150
151 Path srcDir = new Path(args[0]);
152 String destDir = args[1];
153
154 FileStatus fstat = fs.getFileStatus(srcDir);
155
156 if (!fstat.isDir()) {
157 throw new IOException(srcDir + " is not a directory!");
158 } else {
159 FileStatus[] clusters = fs.listStatus(srcDir);
160
161 String name = null;
162 for (FileStatus cluster : clusters) {
163 name = cluster.getPath().getName();
164
165 if ((name.intern() == HadoopLogDir.intern())
166 || (name.intern() == hadoopTempDir.intern())) {
167 continue;
168 }
169 moveOrMergeOneCluster(cluster.getPath(), destDir + "/"
170 + cluster.getPath().getName());
171 }
172 }
173 System.out.println("Done with moveOrMerge main()");
174 }
175 }
176
177
178 class DoMerge extends Thread {
179 ChukwaConfiguration conf = null;
180 FileSystem fs = null;
181 String[] mergeArgs = new String[2];
182 Path destFilePath = null;
183 Path eventFilePath = null;
184
185 public DoMerge(ChukwaConfiguration conf, FileSystem fs, Path eventFilePath,
186 Path destFilePath, String[] mergeArgs) {
187 this.conf = conf;
188 this.fs = fs;
189 this.eventFilePath = eventFilePath;
190 this.destFilePath = destFilePath;
191 this.mergeArgs = mergeArgs;
192 }
193
194 @Override
195 public void run() {
196 System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
197 int res;
198 try {
199 res = ToolRunner.run(new ChukwaConfiguration(),
200 new MoveOrMergeRecordFile(), mergeArgs);
201 System.out.println("MR exit status: " + res);
202 if (res == 0) {
203 System.out.println("\t Moving output file : to [" + destFilePath + "]");
204 FileUtil.copy(fs, new Path(mergeArgs[1] + "/part-00000"), fs,
205 destFilePath, false, false, conf);
206 fs.rename(new Path(mergeArgs[1] + "/part-00000"), eventFilePath);
207 } else {
208 throw new RuntimeException("Error in M/R merge operation!");
209 }
210
211 } catch (Exception e) {
212 e.printStackTrace();
213 throw new RuntimeException("Error in M/R merge operation!", e);
214 }
215 }
216
217 }
218
219
220 class EventFileFilter implements PathFilter {
221 public boolean accept(Path path) {
222 return (path.toString().endsWith(".evt"));
223 }
224 }