This project has retired. For details please refer to its Attic page.
MoveToRepository xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.Calendar;
26  import java.util.Collection;
27  import java.util.HashSet;
28  import java.util.List;
29  
30  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31  import org.apache.hadoop.chukwa.util.HierarchyDataType;
32  import org.apache.hadoop.fs.PathFilter;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FileUtil;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.log4j.Logger;
38  
39  // TODO
40  // First version of the Spill
41  // need some polishing
42  
43  public class MoveToRepository {
44    static Logger log = Logger.getLogger(MoveToRepository.class);
45  
46    static ChukwaConfiguration conf = null;
47    static FileSystem fs = null;
48    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
49    static Calendar calendar = Calendar.getInstance();
50  
51    static Collection<Path> processClusterDirectory(Path srcDir, String destDir)
52        throws Exception {
53      log.info("processClusterDirectory (" + srcDir.getName() + "," + destDir
54          + ")");
55      FileStatus fstat = fs.getFileStatus(srcDir);
56      Collection<Path> destFiles = new HashSet<Path>();
57  
58      if (!fstat.isDir()) {
59        throw new IOException(srcDir + " is not a directory!");
60      } else {
61        FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
62  
63        for (FileStatus datasourceDirectory : datasourceDirectories) {
64          log.info(datasourceDirectory.getPath() + " isDir?"
65              + datasourceDirectory.isDir());
66          if (!datasourceDirectory.isDir()) {
67            throw new IOException(
68                "Top level datasource directory should be a directory :"
69                    + datasourceDirectory.getPath());
70          }
71  
72          PathFilter filter = new PathFilter()
73          {public boolean accept(Path file) {
74            return file.getName().endsWith(".evt");
75          }  };
76          //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
77          // to processDataSourceDirectory according to hierarchy data type format 
78          List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, datasourceDirectory.getPath(),filter,true);
79          for (FileStatus eventfile : eventfiles){
80            Path datatypeDir = eventfile.getPath().getParent();
81            String dirName = HierarchyDataType.getDataType(datatypeDir, srcDir);
82          
83            Path destPath = new Path(destDir + "/" + dirName);
84            log.info("dest directory path: " + destPath);
85            log.info("processClusterDirectory processing Datasource: (" + dirName
86                + ")");
87            StringBuilder dtDir = new StringBuilder(srcDir.toString()).append("/").append(dirName);
88            log.debug("srcDir: " + dtDir.toString());
89            processDatasourceDirectory(srcDir.toString(), new Path(dtDir.toString()), destDir + "/" + dirName);
90          }
91        }
92      }
93      return destFiles;
94    }
95  
96    static Collection<Path> processDatasourceDirectory(String clusterpath, Path srcDir,
97        String destDir) throws Exception {
98      Path cPath = new Path(clusterpath);
99      String cluster = cPath.getName();
100     
101     Collection<Path> destFiles = new HashSet<Path>();
102     String fileName = null;
103     int fileDay = 0;
104     int fileHour = 0;
105     int fileMin = 0;
106 
107     FileStatus[] recordFiles = fs.listStatus(srcDir);
108     for (FileStatus recordFile : recordFiles) {
109       // dataSource_20080915_18_15.1.evt
110       // <datasource>_<yyyyMMdd_HH_mm>.1.evt
111 
112       fileName = recordFile.getPath().getName();
113       log.info("processDatasourceDirectory processing RecordFile: (" + fileName
114           + ")");
115       log.info("fileName: " + fileName);
116 
117       int l = fileName.length();
118       String dataSource = HierarchyDataType.getDataType(srcDir, cPath);
119       log.info("Datasource: " + dataSource);
120 
121       if (fileName.endsWith(".D.evt")) {
122         // Hadoop_dfs_datanode_20080919.D.evt
123 
124         fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
125         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/",
126             recordFile.getPath(),
127             HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
128                 + fileDay);
129         if (destFile != null) {
130           destFiles.add(destFile);
131         }
132       } else if (fileName.endsWith(".H.evt")) {
133         // Hadoop_dfs_datanode_20080925_1.H.evt
134         // Hadoop_dfs_datanode_20080925_12.H.evt
135 
136         String day = null;
137         String hour = null;
138         if (fileName.charAt(l - 8) == '_') {
139           day = fileName.substring(l - 16, l - 8);
140           log.info("day->" + day);
141           hour = "" + fileName.charAt(l - 7);
142           log.info("hour->" + hour);
143         } else {
144           day = fileName.substring(l - 17, l - 9);
145           log.info("day->" + day);
146           hour = fileName.substring(l - 8, l - 6);
147           log.info("hour->" + hour);
148         }
149         fileDay = Integer.parseInt(day);
150         fileHour = Integer.parseInt(hour);
151         // rotate there so spill
152         Path destFile = writeRecordFile(destDir + "/" + fileDay + "/"
153             + fileHour + "/", recordFile.getPath(),
154             HierarchyDataType.getHierarchyDataTypeFileName(dataSource) + "_"
155                 + fileDay + "_" + fileHour);
156         if (destFile != null) {
157           destFiles.add(destFile);
158         }
159         // mark this directory for daily rotate
160         addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
161       } else if (fileName.endsWith(".R.evt")) {
162         if (fileName.charAt(l - 11) == '_') {
163           fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
164           fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
165           fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
166         } else {
167           fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
168           fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
169           fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
170         }
171 
172         log.info("fileDay: " + fileDay);
173         log.info("fileHour: " + fileHour);
174         log.info("fileMin: " + fileMin);
175         Path destFile = writeRecordFile(
176             destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin,
177             recordFile.getPath(),
178             HierarchyDataType.getHierarchyDataTypeFileName(HierarchyDataType.trimSlash(dataSource))
179             + "_" + fileDay + "_" + fileHour + "_" + fileMin);
180         if (destFile != null) {
181           destFiles.add(destFile);
182         }
183         // mark this directory for hourly rotate
184         addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
185       } else {
186         throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
187       }
188     }
189 
190     return destFiles;
191   }
192 
193   static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
194       String cluster, String dataSource) throws IOException {
195     // TODO get root directory from config
196     String rollingDirectory = "/chukwa/rolling/";
197 
198     Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
199         + "/" + dataSource);
200     if (!fs.exists(path)) {
201       fs.mkdirs(path);
202     }
203 
204     if (!isDailyOnly) {
205       path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
206           + cluster + "/" + dataSource);
207       if (!fs.exists(path)) {
208         fs.mkdirs(path);
209       }
210     }
211   }
212 
213   static Path writeRecordFile(String destDir, Path recordFile, String fileName)
214       throws IOException {
215     boolean done = false;
216     int count = 1;
217     do {
218       Path destDirPath = new Path(destDir);
219       Path destFilePath = new Path(destDir + "/" + fileName + "." + count
220           + ".evt");
221 
222       if (!fs.exists(destDirPath)) {
223         fs.mkdirs(destDirPath);
224         log.info(">>>>>>>>>>>> create Dir" + destDirPath);
225       }
226 
227       if (!fs.exists(destFilePath)) {
228         log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
229             + destFilePath);
230         boolean rename = fs.rename(recordFile,destFilePath);
231         done = true;
232         log.info(">>>>>>>>>>>> after Rename" + destFilePath + " , rename:"+rename);
233         return destFilePath;
234       } 
235       count++;
236 
237       if (count > 1000) {
238         log.warn("too many files in this directory: " + destDir);
239       }
240     } while (!done);
241 
242     return null;
243   }
244 
245   static boolean checkRotate(String directoryAsString,
246       boolean createDirectoryIfNotExist) throws IOException {
247     Path directory = new Path(directoryAsString);
248     boolean exist = fs.exists(directory);
249 
250     if (!exist) {
251       if (createDirectoryIfNotExist == true) {
252         fs.mkdirs(directory);
253       }
254       return false;
255     } else {
256       return fs.exists(new Path(directoryAsString + "/rotateDone"));
257     }
258   }
259 
260   public static Path[] doMove(Path srcDir, String destDir) throws Exception {
261     conf = new ChukwaConfiguration();
262     String fsName = conf.get("writer.hdfs.filesystem");
263     fs = FileSystem.get(new URI(fsName), conf);
264     log.info("Start MoveToRepository doMove()");
265 
266     FileStatus fstat = fs.getFileStatus(srcDir);
267 
268     Collection<Path> destinationFiles = new HashSet<Path>();
269     if (!fstat.isDir()) {
270       throw new IOException(srcDir + " is not a directory!");
271     } else {
272       FileStatus[] clusters = fs.listStatus(srcDir);
273       // Run a moveOrMerge on all clusters
274       String name = null;
275       for (FileStatus cluster : clusters) {
276         name = cluster.getPath().getName();
277         // Skip hadoop M/R outputDir
278         if (name.startsWith("_")) {
279           continue;
280         }
281         log.info("main procesing Cluster (" + cluster.getPath().getName() + ")");
282         destinationFiles.addAll(processClusterDirectory(cluster.getPath(),
283             destDir + "/" + cluster.getPath().getName()));
284 
285         // Delete the demux's cluster dir
286         FileUtil.fullyDelete(fs, cluster.getPath());
287       }
288     }
289 
290     log.info("Done with MoveToRepository doMove()");
291     return destinationFiles.toArray(new Path[destinationFiles.size()]);
292   }
293 
294   /**
295    * @param args
296    * @throws Exception
297    */
298   public static void main(String[] args) throws Exception {
299 
300     Path srcDir = new Path(args[0]);
301     String destDir = args[1];
302     doMove(srcDir, destDir);
303   }
304 
305 }