This project has retired. For details please refer to its
Attic page.
MoveToRepository xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.HashSet;
28 import java.util.List;
29
30 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31 import org.apache.hadoop.chukwa.util.HierarchyDataType;
32 import org.apache.hadoop.fs.PathFilter;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43 public class MoveToRepository {
44 static Logger log = Logger.getLogger(MoveToRepository.class);
45
46 static ChukwaConfiguration conf = null;
47 static FileSystem fs = null;
48 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
49 static Calendar calendar = Calendar.getInstance();
50
51 static Collection<Path> processClusterDirectory(Path srcDir, String destDir)
52 throws Exception {
53 log.info("processClusterDirectory (" + srcDir.getName() + "," + destDir
54 + ")");
55 FileStatus fstat = fs.getFileStatus(srcDir);
56 Collection<Path> destFiles = new HashSet<Path>();
57
58 if (!fstat.isDir()) {
59 throw new IOException(srcDir + " is not a directory!");
60 } else {
61 FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
62
63 for (FileStatus datasourceDirectory : datasourceDirectories) {
64 log.info(datasourceDirectory.getPath() + " isDir?"
65 + datasourceDirectory.isDir());
66 if (!datasourceDirectory.isDir()) {
67 throw new IOException(
68 "Top level datasource directory should be a directory :"
69 + datasourceDirectory.getPath());
70 }
71
72 PathFilter filter = new PathFilter()
73 {public boolean accept(Path file) {
74 return file.getName().endsWith(".evt");
75 } };
76
77
78 List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, datasourceDirectory.getPath(),filter,true);
79 for (FileStatus eventfile : eventfiles){
80 Path datatypeDir = eventfile.getPath().getParent();
81 String dirName = HierarchyDataType.getDataType(datatypeDir, srcDir);
82
83 Path destPath = new Path(destDir + "/" + dirName);
84 log.info("dest directory path: " + destPath);
85 log.info("processClusterDirectory processing Datasource: (" + dirName
86 + ")");
87 StringBuilder dtDir = new StringBuilder(srcDir.toString()).append("/").append(dirName);
88 log.debug("srcDir: " + dtDir.toString());
89 processDatasourceDirectory(srcDir.toString(), new Path(dtDir.toString()), destDir + "/" + dirName);
90 }
91 }
92 }
93 return destFiles;
94 }
95
96 static Collection<Path> processDatasourceDirectory(String clusterpath, Path srcDir,
97 String destDir) throws Exception {
98 Path cPath = new Path(clusterpath);
99 String cluster = cPath.getName();
100
101 Collection<Path> destFiles = new HashSet<Path>();
102 String fileName = null;
103 int fileDay = 0;
104 int fileHour = 0;
105 int fileMin = 0;
106
107 FileStatus[] recordFiles = fs.listStatus(srcDir);
108 for (FileStatus recordFile : recordFiles) {
109
110
111
112 fileName = recordFile.getPath().getName();
113 log.info("processDatasourceDirectory processing RecordFile: (" + fileName
114 + ")");
115 log.info("fileName: " + fileName);
116
117 int l = fileName.length();
118 String dataSource = HierarchyDataType.getDataType(srcDir, cPath);
119 log.info("Datasource: " + dataSource);
120
121 if (fileName.endsWith(".D.evt")) {
122
123
124 fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
125 Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
126 recordFile.getPath(),
127 HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
128 + fileDay);
129 if (destFile != null) {
130 destFiles.add(destFile);
131 }
132 } else if (fileName.endsWith(".H.evt")) {
133
134
135
136 String day = null;
137 String hour = null;
138 if (fileName.charAt(l - 8) == '_') {
139 day = fileName.substring(l - 16, l - 8);
140 log.info("day->" + day);
141 hour = "" + fileName.charAt(l - 7);
142 log.info("hour->" + hour);
143 } else {
144 day = fileName.substring(l - 17, l - 9);
145 log.info("day->" + day);
146 hour = fileName.substring(l - 8, l - 6);
147 log.info("hour->" + hour);
148 }
149 fileDay = Integer.parseInt(day);
150 fileHour = Integer.parseInt(hour);
151
152 Path destFile = writeRecordFile(destDir + "/" + fileDay + "/"
153 + fileHour + "/", recordFile.getPath(),
154 HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
155 + fileDay + "_" + fileHour);
156 if (destFile != null) {
157 destFiles.add(destFile);
158 }
159
160 addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
161 } else if (fileName.endsWith(".R.evt")) {
162 if (fileName.charAt(l - 11) == '_') {
163 fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
164 fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
165 fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
166 } else {
167 fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
168 fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
169 fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
170 }
171
172 log.info("fileDay: " + fileDay);
173 log.info("fileHour: " + fileHour);
174 log.info("fileMin: " + fileMin);
175 Path destFile = writeRecordFile(
176 destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin,
177 recordFile.getPath(),
178 HierarchyDataType.getHierarchyDataTypeFileName(HierarchyDataType.trimSlash(dataSource))
179 + "_" + fileDay + "_" + fileHour + "_" + fileMin);
180 if (destFile != null) {
181 destFiles.add(destFile);
182 }
183
184 addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
185 } else {
186 throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
187 }
188 }
189
190 return destFiles;
191 }
192
193 static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
194 String cluster, String dataSource) throws IOException {
195
196 String rollingDirectory = "/chukwa/rolling/";
197
198 Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
199 + "/" + dataSource);
200 if (!fs.exists(path)) {
201 fs.mkdirs(path);
202 }
203
204 if (!isDailyOnly) {
205 path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
206 + cluster + "/" + dataSource);
207 if (!fs.exists(path)) {
208 fs.mkdirs(path);
209 }
210 }
211 }
212
213 static Path writeRecordFile(String destDir, Path recordFile, String fileName)
214 throws IOException {
215 boolean done = false;
216 int count = 1;
217 do {
218 Path destDirPath = new Path(destDir);
219 Path destFilePath = new Path(destDir + "/" + fileName + "." + count
220 + ".evt");
221
222 if (!fs.exists(destDirPath)) {
223 fs.mkdirs(destDirPath);
224 log.info(">>>>>>>>>>>> create Dir" + destDirPath);
225 }
226
227 if (!fs.exists(destFilePath)) {
228 log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
229 + destFilePath);
230 boolean rename = fs.rename(recordFile,destFilePath);
231 done = true;
232 log.info(">>>>>>>>>>>> after Rename" + destFilePath + " , rename:"+rename);
233 return destFilePath;
234 }
235 count++;
236
237 if (count > 1000) {
238 log.warn("too many files in this directory: " + destDir);
239 }
240 } while (!done);
241
242 return null;
243 }
244
245 static boolean checkRotate(String directoryAsString,
246 boolean createDirectoryIfNotExist) throws IOException {
247 Path directory = new Path(directoryAsString);
248 boolean exist = fs.exists(directory);
249
250 if (!exist) {
251 if (createDirectoryIfNotExist == true) {
252 fs.mkdirs(directory);
253 }
254 return false;
255 } else {
256 return fs.exists(new Path(directoryAsString + "/rotateDone"));
257 }
258 }
259
260 public static Path[] doMove(Path srcDir, String destDir) throws Exception {
261 conf = new ChukwaConfiguration();
262 String fsName = conf.get("writer.hdfs.filesystem");
263 fs = FileSystem.get(new URI(fsName), conf);
264 log.info("Start MoveToRepository doMove()");
265
266 FileStatus fstat = fs.getFileStatus(srcDir);
267
268 Collection<Path> destinationFiles = new HashSet<Path>();
269 if (!fstat.isDir()) {
270 throw new IOException(srcDir + " is not a directory!");
271 } else {
272 FileStatus[] clusters = fs.listStatus(srcDir);
273
274 String name = null;
275 for (FileStatus cluster : clusters) {
276 name = cluster.getPath().getName();
277
278 if (name.startsWith("_")) {
279 continue;
280 }
281 log.info("main procesing Cluster (" + cluster.getPath().getName() + ")");
282 destinationFiles.addAll(processClusterDirectory(cluster.getPath(),
283 destDir + "/" + cluster.getPath().getName()));
284
285
286 FileUtil.fullyDelete(fs, cluster.getPath());
287 }
288 }
289
290 log.info("Done with MoveToRepository doMove()");
291 return destinationFiles.toArray(new Path[destinationFiles.size()]);
292 }
293
294
295
296
297
298 public static void main(String[] args) throws Exception {
299
300 Path srcDir = new Path(args[0]);
301 String destDir = args[1];
302 doMove(srcDir, destDir);
303 }
304
305 }