This project has retired. For details please refer to its
Attic page.
RecordMerger xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24 import org.apache.hadoop.fs.FileStatus;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.FileUtil;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.util.Tool;
29 import org.apache.hadoop.util.ToolRunner;
30 import org.apache.log4j.Logger;
31 import org.apache.hadoop.chukwa.util.ExceptionUtil;
32
33 public class RecordMerger extends Thread {
34 static Logger log = Logger.getLogger(RecordMerger.class);
35 ChukwaConfiguration conf = null;
36 FileSystem fs = null;
37 String[] mergeArgs = null;
38 Tool tool = null;
39 boolean deleteRawData = false;
40
41 public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
42 String[] mergeArgs, boolean deleteRawData) {
43 this.conf = conf;
44 this.fs = fs;
45 this.tool = tool;
46 this.mergeArgs = mergeArgs;
47 this.deleteRawData = deleteRawData;
48 }
49
50 @Override
51 public void run() {
52 System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
53 int res;
54 try {
55 res = ToolRunner.run(conf, tool, mergeArgs);
56 System.out.println("MR exit status: " + res);
57 if (res == 0) {
58 writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
59 mergeArgs[3]);
60
61
62 if (deleteRawData) {
63 FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
64
65 Path hours = new Path(mergeArgs[2]);
66 FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
67 for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
68 String dirName = hourOrMinuteFS.getPath().getName();
69
70 try {
71 Integer.parseInt(dirName);
72 FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
73 if (log.isDebugEnabled()) {
74 log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
75 + dirName);
76 }
77 } catch (NumberFormatException e) {
78
79
80
81 log.debug(ExceptionUtil.getStackTrace(e));
82 }
83 }
84 }
85
86
87 FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
88
89 FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
90 } else {
91 throw new RuntimeException("Error in M/R merge operation!");
92 }
93
94 } catch (Exception e) {
95 e.printStackTrace();
96 throw new RuntimeException("Error in M/R merge operation!", e);
97 }
98 }
99
100 void writeRecordFile(String input, String outputDir, String fileName)
101 throws IOException {
102 boolean done = false;
103 int count = 1;
104 Path recordFile = new Path(input);
105 do {
106 Path destDirPath = new Path(outputDir);
107 Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
108 + ".evt");
109
110 if (!fs.exists(destDirPath)) {
111 fs.mkdirs(destDirPath);
112 log.info(">>>>>>>>>>>> create Dir" + destDirPath);
113 }
114
115 if (!fs.exists(destFilePath)) {
116 boolean res = fs.rename(recordFile, destFilePath);
117
118 if (res == false) {
119 log.info(">>>>>>>>>>>> Use standard copy rename failded");
120 FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
121 }
122 done = true;
123 } else {
124 log.info("Start MoveToRepository main()");
125 }
126 count++;
127
128
129 if (count > 1000) {
130 throw new IOException("too many files in this directory: "
131 + destDirPath);
132 }
133 } while (!done);
134 }
135 }