This project has retired. For details please refer to its Attic page.
SinkArchiver xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.archive;
19  
20  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.*;
23  import org.apache.hadoop.util.ToolRunner;
24  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
25  import org.apache.log4j.Logger;
26  import java.io.IOException;
27  
28  /**
29   * A lightweight tool for archiving, suitable for small-to-medium-size Chukwa
30   * deployments that don't use Demux.
31   * Grabs everything in the data sink, runs the Archiver MapReduce job,
32   * then promotes output to the archive dir.
33   * 
34   * Input is determined by conf option chukwaArchiveDir; defaults to
35   *   /chukwa/logs
36   *   
37   *   Uses /chukwa/archivesProcessing/mr[Input/Output] as tmp storage
38   *   
39   *   Outputs to /chukwa/archives
40   * 
41   */
42  public class SinkArchiver implements CHUKWA_CONSTANT {
43    
44    final public static PathFilter DATA_SINK_FILTER = new PathFilter() {
45      public boolean accept(Path file) {
46        return file.getName().endsWith(".done");
47      }     
48    };
49    
50    static Logger log = Logger.getLogger(SinkArchiver.class);
51    
52    public static void main(String[] args) {
53      try {
54        Configuration conf = new ChukwaConfiguration();
55        if(conf.get(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME) == null )
56          conf.set(ChukwaArchiveDataTypeOutputFormat.GROUP_BY_CLUSTER_OPTION_NAME, "true");
57        FileSystem fs = FileSystem.get(conf);
58        SinkArchiver archiver = new SinkArchiver();
59        archiver.exec(fs, conf);    
60      } catch(IOException e) {
61        e.printStackTrace();
62      }
63    }
64    
65  
66    /*
67     * Pull most of the logic into instance methods so that we can
68     * more easily unit-test, by altering passed-in configuration.
69     */
70    public void exec(FileSystem fs, Configuration conf) {
71      try {
72        
73        String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
74        if ( ! chukwaRootDir.endsWith("/") ) {
75          chukwaRootDir += "/";
76        }
77        String archiveSource = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
78        if ( ! archiveSource.endsWith("/") ) {
79          archiveSource += "/";
80        }
81        String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
82        
83        //String archivesErrorDir = archivesRootProcessingDir + ARCHIVES_IN_ERROR_DIR_NAME;
84        String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
85        String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
86  
87        Path pSource = new Path(archiveSource);
88        
89        Path pMRInputDir = new Path(archivesMRInputDir);
90        if(!fs.exists(pMRInputDir))
91          fs.mkdirs(pMRInputDir);
92        
93        Path pOutputDir = new Path(archivesMROutputDir);
94        if(!fs.exists(pOutputDir))
95          fs.mkdirs(pOutputDir);
96        
97        if(fs.listStatus(pOutputDir).length == 0)
98          fs.delete(pOutputDir, true);
99        Path archive = new Path(chukwaRootDir + "archive");
100       
101       selectInputs(fs, pSource, pMRInputDir);
102 
103       int result = runMapRedJob(conf, archivesMRInputDir, archivesMROutputDir);
104       if(result == 0) { //success, so empty input dir
105         fs.delete(pMRInputDir, true);
106       }
107       
108       if(!fs.exists(archive)) {
109         fs.mkdirs(archive);
110       }
111       FileStatus[] files = fs.listStatus(pOutputDir);
112       for(FileStatus f: files) {
113         if(!f.getPath().getName().endsWith("_logs"))
114           promoteAndMerge(fs, f.getPath(), archive);
115       }
116 
117       fs.delete(pOutputDir, true);
118       
119     } catch (Exception e) {
120       e.printStackTrace();
121     }
122   }
123   
124   private void selectInputs(FileSystem fs, Path pSource,
125       Path pMRInputDir) throws IOException {
126     
127     FileStatus[] dataSinkFiles = fs.listStatus(pSource, DATA_SINK_FILTER);
128     for(FileStatus fstatus: dataSinkFiles) {
129       boolean rename = fs.rename(fstatus.getPath(),pMRInputDir);
130       log.info("Moving " + fstatus.getPath() + " to " + pMRInputDir 
131           +", status is: " + rename);
132     }
133     
134   }
135 
136   public int runMapRedJob(Configuration conf, String in, String out)
137     throws Exception {
138     String grouper = conf.get("archive.grouper","DataType");
139     String[] args = new String[] {grouper, in, out};
140     int res = ToolRunner.run(conf, new ChukwaArchiveBuilder(),
141         args);
142     return res;
143   }
144   /**
145    * Merges the contents of src into dest.
146    * If src is a file, moves it to dest.
147    * 
148    * @param fs the filesystem in question
149    * @param src a file or directory to merge into dest
150    * @param dest a directory to merge into
151    * @throws IOException
152    */
153   public void promoteAndMerge(FileSystem fs, Path src, Path dest) 
154   throws IOException {
155     FileStatus stat = fs.getFileStatus(src);
156     String baseName = src.getName();
157     Path target = new Path(dest, baseName);
158     if(!fs.exists(target)) {
159       fs.rename(src, dest);
160       System.out.println("moving " + src + " to " + dest);
161     } else if(stat.isDir()) {//recurse
162       FileStatus[] files = fs.listStatus(src);
163       for(FileStatus f: files) {
164         promoteAndMerge(fs, f.getPath(), target);
165       }
166     } else { //append a number to unique-ify filename
167       int i=0;
168       do {
169         //FIXME: can make this more generic
170         String newName;
171         if(baseName.endsWith(".arc")) {
172           newName = baseName.substring(0, baseName.length() - 4) + "-"+i+".arc";
173         }
174         else
175           newName = baseName+"-"+i;
176         target = new Path(dest, newName);
177       } while(fs.exists(target));
178       fs.rename(src, target);
179       System.out.println("promoting " + src + " to " + target);
180     }
181 
182   }
183 }