1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */181920package org.apache.hadoop.chukwa.util;
2122import org.apache.hadoop.chukwa.ChukwaArchiveKey;
23import org.apache.hadoop.chukwa.ChunkImpl;
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.fs.ChecksumException;
26import org.apache.hadoop.fs.FSDataOutputStream;
27import org.apache.hadoop.fs.FileSystem;
28import org.apache.hadoop.fs.Path;
29import org.apache.hadoop.io.SequenceFile;
30import org.apache.log4j.Logger;
3132/**33 * This class is used by LocalToRemoteHDFSMover to34 * convert .chukwa files to .done before moving them. 35 * By creating a new sequence file and copying all valid chunks to it,36 * it makes sure that no corrupt sequence files get into HDFS.37 */38publicclassCopySequenceFile {
39static Logger log = Logger.getLogger(CopySequenceFile.class);
40privatestatic SequenceFile.Writer seqFileWriter = null;
41privatestatic SequenceFile.Reader seqFileReader = null;
42privatestatic FSDataOutputStream newOutputStr = null;
4344publicstaticvoid createValidSequenceFile(Configuration conf,
45 String originalFileDir,
46 String originalFileName,
47 FileSystem localFs) {
48try {
49if (!originalFileDir.endsWith("/")) {
50 originalFileDir += "/";
51 }
52 String originalCompleteDir= originalFileDir + originalFileName;
53 Path originalPath= new Path(originalCompleteDir);
54int extensionIndex= originalFileName.indexOf(".chukwa",0);
5556 String recoverFileName=originalFileName.substring(0, extensionIndex)+".recover";
57 String recoverDir= originalFileDir + recoverFileName;
58 Path recoverPath= new Path(recoverDir);
59 String recoverDoneFileName=originalFileName.substring(0, extensionIndex)+".recoverDone";
60 String recoverDoneDir= originalFileDir + recoverDoneFileName;
61 Path recoverDonePath= new Path(recoverDoneDir);
62 String doneFileName=originalFileName.substring(0, extensionIndex)+".done";
63 String doneDir= originalFileDir + doneFileName;
64 Path donePath= new Path(doneDir);
6566 ChukwaArchiveKey key = new ChukwaArchiveKey();
67ChunkImpl evt = ChunkImpl.getBlankChunk();
6869 newOutputStr = localFs.create(recoverPath);
70 seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
71 ChukwaArchiveKey.class, ChunkImpl.class,
72 SequenceFile.CompressionType.NONE, null);
73 seqFileReader = new SequenceFile.Reader(localFs, originalPath, conf);
7475 System.out.println("key class name is " + seqFileReader.getKeyClassName());
76 System.out.println("value class name is " + seqFileReader.getValueClassName());
77try {
78while (seqFileReader.next(key, evt)) {
79 seqFileWriter.append(key, evt);
80 }
81 } catch (ChecksumException e) { //The exception occurs when we read a bad chunk while copying82 log.info("Encountered Bad Chunk while copying .chukwa file, continuing",e);
83 }
84 seqFileReader.close();
85 seqFileWriter.close();
86 newOutputStr.close();
87try {
88 localFs.rename(recoverPath, recoverDonePath); //Rename the destination file from .recover to .recoverDone 89 localFs.delete(originalPath,false); //Delete Original .chukwa file90 localFs.rename(recoverDonePath, donePath); //rename .recoverDone to .done91 } catch (Exception e) {
92 log.warn("Error occured while renaming .recoverDone to .recover or deleting .chukwa",e);
93 e.printStackTrace();
94 }
9596 } catch(Exception e) {
97 log.warn("Error during .chukwa file recovery",e);
98 e.printStackTrace();
99 }
100 }
101 }