This project has retired. For details please refer to its Attic page.
RecordMerger xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24  import org.apache.hadoop.fs.FileStatus;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.FileUtil;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.util.Tool;
29  import org.apache.hadoop.util.ToolRunner;
30  import org.apache.log4j.Logger;
31  import org.apache.hadoop.chukwa.util.ExceptionUtil;
32  
33  public class RecordMerger extends Thread {
34    static Logger log = Logger.getLogger(RecordMerger.class);
35    ChukwaConfiguration conf = null;
36    FileSystem fs = null;
37    String[] mergeArgs = null;
38    Tool tool = null;
39    boolean deleteRawData = false;
40  
41    public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
42                        String[] mergeArgs, boolean deleteRawData) {
43      this.conf = conf;
44      this.fs = fs;
45      this.tool = tool;
46      this.mergeArgs = mergeArgs;
47      this.deleteRawData = deleteRawData;
48    }
49  
50    @Override
51    public void run() {
52      System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
53      int res;
54      try {
55        res = ToolRunner.run(conf, tool, mergeArgs);
56        System.out.println("MR exit status: " + res);
57        if (res == 0) {
58          writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
59              mergeArgs[3]);
60  
61          // delete input
62          if (deleteRawData) {
63            FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
64  
65            Path hours = new Path(mergeArgs[2]);
66            FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
67            for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
68              String dirName = hourOrMinuteFS.getPath().getName();
69  
70              try {
71                Integer.parseInt(dirName);
72                FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
73                if (log.isDebugEnabled()) {
74                  log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
75                      + dirName);
76                }
77              } catch (NumberFormatException e) { /*
78                                                   * Not an Hour or Minutes
79                                                   * directory- Do nothing
80                                                   */
81                log.debug(ExceptionUtil.getStackTrace(e));
82              }
83            }
84          }
85  
86          // delete rolling tag
87          FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
88          // delete M/R temp directory
89          FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
90        } else {
91          throw new RuntimeException("Error in M/R merge operation!");
92        }
93  
94      } catch (Exception e) {
95        e.printStackTrace();
96        throw new RuntimeException("Error in M/R merge operation!", e);
97      }
98    }
99  
100   void writeRecordFile(String input, String outputDir, String fileName)
101       throws IOException {
102     boolean done = false;
103     int count = 1;
104     Path recordFile = new Path(input);
105     do {
106       Path destDirPath = new Path(outputDir);
107       Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
108           + ".evt");
109 
110       if (!fs.exists(destDirPath)) {
111         fs.mkdirs(destDirPath);
112         log.info(">>>>>>>>>>>> create Dir" + destDirPath);
113       }
114 
115       if (!fs.exists(destFilePath)) {
116         boolean res = fs.rename(recordFile, destFilePath);
117 
118         if (res == false) {
119           log.info(">>>>>>>>>>>> Use standard copy rename failded");
120           FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
121         }
122         done = true;
123       } else {
124         log.info("Start MoveToRepository main()");
125       }
126       count++;
127       // Just put a limit here
128       // TODO read from config
129       if (count > 1000) {
130         throw new IOException("too many files in this directory: "
131             + destDirPath);
132       }
133     } while (!done);
134   }
135 }