This project has retired. For details please refer to its Attic page.
MoveOrMergeRecordFile xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.FileUtil;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.fs.PathFilter;
33  import org.apache.hadoop.mapred.FileInputFormat;
34  import org.apache.hadoop.mapred.FileOutputFormat;
35  import org.apache.hadoop.mapred.JobClient;
36  import org.apache.hadoop.mapred.JobConf;
37  import org.apache.hadoop.mapred.SequenceFileInputFormat;
38  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
39  import org.apache.hadoop.mapred.lib.IdentityMapper;
40  import org.apache.hadoop.mapred.lib.IdentityReducer;
41  import org.apache.hadoop.util.Tool;
42  import org.apache.hadoop.util.ToolRunner;
43  
44  public class MoveOrMergeRecordFile extends Configured implements Tool {
45    static ChukwaConfiguration conf = null;
46    static FileSystem fs = null;
47    static final String HadoopLogDir = "_logs";
48    static final String hadoopTempDir = "_temporary";
49  
50    public int run(String[] args) throws Exception {
51      JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
52  
53      conf.setJobName("Chukwa-MoveOrMergeLogFile");
54      conf.setInputFormat(SequenceFileInputFormat.class);
55  
56      conf.setMapperClass(IdentityMapper.class);
57      conf.setReducerClass(IdentityReducer.class);
58  
59      // conf.setPartitionerClass(ChukwaPartitioner.class);
60      // conf.setOutputFormat(ChukwaOutputFormat.class);
61  
62      conf.setOutputKeyClass(ChukwaRecordKey.class);
63      conf.setOutputValueClass(ChukwaRecord.class);
64      conf.setOutputFormat(SequenceFileOutputFormat.class);
65  
66      FileInputFormat.setInputPaths(conf, args[0]);
67      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
68  
69      JobClient.runJob(conf);
70      return 0;
71    }
72  
73    static void moveOrMergeOneCluster(Path srcDir, String destDir)
74        throws Exception {
75      System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + ","
76          + destDir + ")");
77      FileStatus fstat = fs.getFileStatus(srcDir);
78  
79      if (!fstat.isDir()) {
80        throw new IOException(srcDir + " is not a directory!");
81      } else {
82        FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
83        for (FileStatus datasourceDirectory : datasourceDirectories) {
84          System.out.println(datasourceDirectory.getPath() + " isDir?"
85              + datasourceDirectory.isDir());
86          if (!datasourceDirectory.isDir()) {
87            throw new IOException("Top level should just contains directories :"
88                + datasourceDirectory.getPath());
89          }
90  
91          String dirName = datasourceDirectory.getPath().getName();
92  
93          Path destPath = new Path(destDir + "/" + dirName);
94          System.out.println("dest directory path: " + destPath);
95  
96          if (!fs.exists(destPath)) {
97            System.out.println("create datasource directory [" + destDir + "/"
98                + dirName + "]");
99            fs.mkdirs(destPath);
100         }
101 
102         FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),
103             new EventFileFilter());
104         for (FileStatus eventFile : evts) {
105 
106           Path eventFilePath = eventFile.getPath();
107           String filename = eventFilePath.getName();
108           System.out.println("src dir File: [" + filename + "]");
109           Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
110           if (!fs.exists(destFilePath)) {
111             System.out.println("Moving File: [" + destFilePath + "]");
112             // Copy to final Location
113             FileUtil.copy(fs, eventFilePath, fs, destFilePath, false, false,
114                 conf);
115           } else {
116             System.out.println("Need to merge! : [" + destFilePath + "]");
117             String strMrPath = datasourceDirectory.getPath().toString() + "/"
118                 + "MR_" + System.currentTimeMillis();
119             Path mrPath = new Path(strMrPath);
120             System.out.println("\t New MR directory : [" + mrPath + "]");
121             // Create MR input Dir
122             fs.mkdirs(mrPath);
123             // Move Input files
124             FileUtil.copy(fs, eventFilePath, fs,
125                 new Path(strMrPath + "/1.evt"), false, false, conf);
126             fs.rename(destFilePath, new Path(strMrPath + "/2.evt"));
127 
128             // Merge
129             String[] mergeArgs = new String[2];
130             mergeArgs[0] = strMrPath;
131             mergeArgs[1] = strMrPath + "/mrOutput";
132             DoMerge merge = new DoMerge(conf, fs, eventFilePath, destFilePath,
133                 mergeArgs);
134             merge.start();
135           }
136         }
137       }
138     }
139 
140   }
141 
142   /**
143    * @param args is command line parameters
144    * @throws Exception if unable to process data
145    */
146   public static void main(String[] args) throws Exception {
147     conf = new ChukwaConfiguration();
148     String fsName = conf.get("writer.hdfs.filesystem");
149     fs = FileSystem.get(new URI(fsName), conf);
150 
151     Path srcDir = new Path(args[0]);
152     String destDir = args[1];
153 
154     FileStatus fstat = fs.getFileStatus(srcDir);
155 
156     if (!fstat.isDir()) {
157       throw new IOException(srcDir + " is not a directory!");
158     } else {
159       FileStatus[] clusters = fs.listStatus(srcDir);
160       // Run a moveOrMerge on all clusters
161       String name = null;
162       for (FileStatus cluster : clusters) {
163         name = cluster.getPath().getName();
164         // Skip hadoop outDir
165         if ((name.intern() == HadoopLogDir.intern())
166             || (name.intern() == hadoopTempDir.intern())) {
167           continue;
168         }
169         moveOrMergeOneCluster(cluster.getPath(), destDir + "/"
170             + cluster.getPath().getName());
171       }
172     }
173     System.out.println("Done with moveOrMerge main()");
174   }
175 }
176 
177 
178 class DoMerge extends Thread {
179   ChukwaConfiguration conf = null;
180   FileSystem fs = null;
181   String[] mergeArgs = new String[2];
182   Path destFilePath = null;
183   Path eventFilePath = null;
184 
185   public DoMerge(ChukwaConfiguration conf, FileSystem fs, Path eventFilePath,
186                  Path destFilePath, String[] mergeArgs) {
187     this.conf = conf;
188     this.fs = fs;
189     this.eventFilePath = eventFilePath;
190     this.destFilePath = destFilePath;
191     this.mergeArgs = mergeArgs;
192   }
193 
194   @Override
195   public void run() {
196     System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
197     int res;
198     try {
199       res = ToolRunner.run(new ChukwaConfiguration(),
200           new MoveOrMergeRecordFile(), mergeArgs);
201       System.out.println("MR exit status: " + res);
202       if (res == 0) {
203         System.out.println("\t Moving output file : to [" + destFilePath + "]");
204         FileUtil.copy(fs, new Path(mergeArgs[1] + "/part-00000"), fs,
205             destFilePath, false, false, conf);
206         fs.rename(new Path(mergeArgs[1] + "/part-00000"), eventFilePath);
207       } else {
208         throw new RuntimeException("Error in M/R merge operation!");
209       }
210 
211     } catch (Exception e) {
212       e.printStackTrace();
213       throw new RuntimeException("Error in M/R merge operation!", e);
214     }
215   }
216 
217 }
218 
219 
220 class EventFileFilter implements PathFilter {
221   public boolean accept(Path path) {
222     return (path.toString().endsWith(".evt"));
223   }
224 }