This project has retired. For details please refer to its Attic page.
CopySequenceFile xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  
20  package org.apache.hadoop.chukwa.util;
21  
22  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
23  import org.apache.hadoop.chukwa.ChunkImpl;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.ChecksumException;
26  import org.apache.hadoop.fs.FSDataOutputStream;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.io.SequenceFile;
30  import org.apache.log4j.Logger;
31  
32  /**
33   * This class is used by LocalToRemoteHDFSMover to
34   * convert .chukwa files to .done before moving them. 
35   * By creating a new sequence file and copying all valid chunks to it,
36   * it makes sure that no corrupt sequence files get into HDFS.
37   */
38  public class CopySequenceFile {
39    static Logger log = Logger.getLogger(CopySequenceFile.class);
40    private static SequenceFile.Writer seqFileWriter = null;
41    private static SequenceFile.Reader seqFileReader = null; 
42    private static FSDataOutputStream newOutputStr = null;
43    
44    public static void createValidSequenceFile(Configuration conf,
45  		                                     String originalFileDir, 
46  		                                     String originalFileName,
47  		                                     FileSystem localFs) {
48      try {
49        if (!originalFileDir.endsWith("/")) {
50      	      originalFileDir += "/";
51        }	
52  	  String originalCompleteDir= originalFileDir + originalFileName;
53  	  Path originalPath= new Path(originalCompleteDir);
54  	  int extensionIndex= originalFileName.indexOf(".chukwa",0);
55        
56  	  String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover";
57  	  String recoverDir= originalFileDir + recoverFileName;
58  	  Path recoverPath= new Path(recoverDir);
59  	  String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone";
60   	  String recoverDoneDir= originalFileDir + recoverDoneFileName;
61   	  Path recoverDonePath= new Path(recoverDoneDir);
62  	  String doneFileName=originalFileName.substring(0, extensionIndex)+".done";
63  	  String doneDir= originalFileDir + doneFileName;
64  	  Path donePath= new Path(doneDir);
65  	  
66  	  ChukwaArchiveKey key = new ChukwaArchiveKey();
67        ChunkImpl evt = ChunkImpl.getBlankChunk();
68  
69  	  newOutputStr = localFs.create(recoverPath);
70        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
71                                                  ChukwaArchiveKey.class, ChunkImpl.class,
72                                                  SequenceFile.CompressionType.NONE, null);
73        seqFileReader = new SequenceFile.Reader(localFs, originalPath, conf);
74          
75        System.out.println("key class name is " + seqFileReader.getKeyClassName());
76        System.out.println("value class name is " + seqFileReader.getValueClassName());
77        try { 
78          while (seqFileReader.next(key, evt)) {
79            seqFileWriter.append(key, evt);
80          }
81         } catch (ChecksumException e) { //The exception occurs when we read a bad chunk while copying
82             log.info("Encountered Bad Chunk while copying .chukwa file, continuing",e);	 
83         }
84         seqFileReader.close();
85  	   seqFileWriter.close();
86  	   newOutputStr.close();
87         try {
88  	     localFs.rename(recoverPath, recoverDonePath); //Rename the destination file from .recover to .recoverDone 
89     	     localFs.delete(originalPath,false); //Delete Original .chukwa file
90  	     localFs.rename(recoverDonePath, donePath); //rename .recoverDone to .done
91         } catch (Exception e) {
92             log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e);		 
93      	   e.printStackTrace();
94         }
95  
96  	} catch(Exception e) {
97  	    log.warn("Error during .chukwa file recovery",e);	 
98  	    e.printStackTrace();
99  	}	
100   }
101 }