This project has retired. For details please refer to its Attic page.
LocalToRemoteHdfsMover xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20  
21  import java.io.FileNotFoundException;
22  import java.net.URI;
23  import java.util.concurrent.BlockingQueue;
24  
25  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
26  import org.apache.hadoop.chukwa.util.CopySequenceFile;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.log4j.Logger;
32  
33  
34  /**
35   * This class is used by LocalWriter.java.
36   * 
37   * The only role of this class is to move dataSink files
38   * from the local file system to the remote HDFS.
39   * 
40   * Those 2 classes are using a blockingQueue to exchange 
41   * information.
42   * 
43   * This class will also take care of moving all existing 
44   * done dataSink files (.done) and any dataSink file that
45   * has not been changed for at least (rotatePeriod+2min).
46   * 
47   */
48  public class LocalToRemoteHdfsMover extends Thread {
49    static Logger log = Logger.getLogger(LocalToRemoteHdfsMover.class);
50  
51    private FileSystem remoteFs = null;
52    private FileSystem localFs = null;
53    private Configuration conf = null;
54    private String fsname = null;
55    private String localOutputDir = null;
56    private String remoteOutputDir = null;
57    private boolean exitIfHDFSNotavailable = false;
58    private BlockingQueue<String> fileQueue = null;
59    private volatile boolean isRunning = true;
60    
61    public LocalToRemoteHdfsMover(BlockingQueue<String> fileQueue ,Configuration conf) {
62      this.fileQueue = fileQueue;
63      this.conf = conf;
64      this.setDaemon(true);
65      this.setName("LocalToRemoteHdfsMover");
66      this.start();
67    }
68  
69    protected void init() throws Throwable {
70  
71      // check if they've told us the file system to use
72      fsname = conf.get("writer.hdfs.filesystem");
73      if (fsname == null || fsname.equals("")) {
74        // otherwise try to get the filesystem from hadoop
75        fsname = conf.get("fs.defaultFS");
76      }
77  
78      if (fsname == null) {
79        log.error("no filesystem name");
80        throw new RuntimeException("no filesystem");
81      }
82  
83      log.info("remote fs name is " + fsname);
84      exitIfHDFSNotavailable = conf.getBoolean(
85          "localToRemoteHdfsMover.exitIfHDFSNotavailable", false);
86  
87      remoteFs = FileSystem.get(new URI(fsname), conf);
88      if (remoteFs == null && exitIfHDFSNotavailable) {
89        log.error("can't connect to HDFS.");
90        throw new WriterException("can't connect to HDFS.");
91      } 
92      
93      localFs = FileSystem.getLocal(conf);
94      
95      remoteOutputDir = conf.get("chukwaCollector.outputDir", "/chukwa/logs/");
96      if (!remoteOutputDir.endsWith("/")) {
97        remoteOutputDir += "/";
98      }
99      
100     localOutputDir = conf.get("chukwaCollector.localOutputDir",
101     "/chukwa/datasink/");
102     if (!localOutputDir.endsWith("/")) {
103       localOutputDir += "/";
104     }
105     
106   }
107 
108   protected void moveFile(String filePath) throws Exception{
109     String remoteFilePath = filePath.substring(filePath.lastIndexOf("/")+1,filePath.lastIndexOf("."));
110     remoteFilePath = remoteOutputDir + remoteFilePath;
111     try {
112       Path pLocalPath = new Path(filePath);
113       Path pRemoteFilePath = new Path(remoteFilePath + ".chukwa");
114       remoteFs.copyFromLocalFile(false, true, pLocalPath, pRemoteFilePath);
115       Path pFinalRemoteFilePath = new Path(remoteFilePath + ".done");
116       if ( remoteFs.rename(pRemoteFilePath, pFinalRemoteFilePath)) {
117         localFs.delete(pLocalPath,false);
118         log.info("move done deleting from local: " + pLocalPath);
119       } else {
120         throw new RuntimeException("Cannot rename remote file, " + pRemoteFilePath + " to " + pFinalRemoteFilePath);
121       }
122     }catch(FileNotFoundException ex) {
123       log.debug("File not found: " + remoteFilePath);
124       //do nothing since if the file is no longer there it's
125       // because it has already been moved over by the cleanup task.
126     }
127     catch (Exception e) {
128       log.warn("Cannot copy to the remote HDFS",e);
129       throw e;
130     }
131   }
132   
133   protected void cleanup() throws Exception{
134     try {
135       int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
136           1000 * 60 * 5);// defaults to 5 minutes
137       
138       Path pLocalOutputDir = new Path(localOutputDir);
139       FileStatus[] files = localFs.listStatus(pLocalOutputDir);
140       String fileName = null;
141       for (FileStatus file: files) {
142         fileName = file.getPath().getName();
143        
144         if (fileName.endsWith(".recover")) {
145           //.recover files indicate a previously failed copying attempt of .chukwa files
146         	
147           Path recoverPath= new Path(localOutputDir+fileName);
148           localFs.delete(recoverPath, false);
149           log.info("Deleted .recover file, " + localOutputDir + fileName);
150         } else if (fileName.endsWith(".recoverDone")) {
151             //.recoverDone files are valid sink files that have not been renamed to .done
152         	// First, check if there are still any .chukwa files with the same name
153          	
154             String chukwaFileName= fileName.replace(".recoverDone", ".chukwa");
155         	Boolean fileNotFound=true;
156         	int i=0;
157         	while (i<files.length && fileNotFound) {
158         	   String currentFileName = files[i].getPath().getName();
159         	   
160         	   if (currentFileName.equals(chukwaFileName)){
161         	      //Remove the .chukwa file found as it has already been recovered
162         	      
163         	     fileNotFound = false;
164         	     Path chukwaFilePath = new Path(localOutputDir+chukwaFileName);
165         	     localFs.delete(chukwaFilePath,false);	
166         	     log.info(".recoverDone file exists, deleted duplicate .chukwa file, "
167         	    		 + localOutputDir + fileName);
168         	   }
169         	   i++;
170         	}
171         	 //Finally, rename .recoverDone file to .done
172         	 
173         	String doneFileName= fileName.replace(".recoverDone", ".done");
174         	Path donePath= new Path(localOutputDir+doneFileName);
175         	Path recoverDonePath= new Path(localOutputDir+fileName);
176         	localFs.rename(recoverDonePath, donePath);
177         	log.info("Renamed .recoverDone file to .done, "+ localOutputDir + fileName);
178          }  else if (fileName.endsWith(".done")) {
179               moveFile(localOutputDir + fileName);
180             }  else if (fileName.endsWith(".chukwa")) {
181                  long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
182                  if (file.getModificationTime() < lastPeriod) { 
183         	       //. chukwa file has not modified for some time, may indicate collector had previously crashed
184          
185                    log.info("Copying .chukwa file to valid sink file before moving, " + localOutputDir + fileName);
186                    CopySequenceFile.createValidSequenceFile(conf,localOutputDir,fileName,localFs);
187                   }
188                } 
189         }
190     } catch (Exception e) {
191         log.warn("Cannot copy to the remote HDFS",e);
192         throw e;
193       }
194   }
195   
196   @Override
197   public void run() {
198     boolean inError = true;
199     String filePath = null;
200     
201     while (isRunning) {
202       try {
203         if (inError) {
204           init();
205           cleanup();
206           inError = false;
207         }
208 
209         filePath = fileQueue.take();
210         
211         if (filePath == null) {
212           continue;
213         }
214         
215         moveFile(filePath);
216         cleanup();
217         filePath = null;
218         
219       } catch (Throwable e) {
220         log.warn("Error in LocalToHdfsMover", e);
221         inError = true;
222         try {
223           log.info("Got an exception going to sleep for 60 secs");
224           Thread.sleep(60000);
225         } catch (Throwable e2) {
226           log.warn("Exception while sleeping", e2);
227         }
228       }
229     }
230     log.info(Thread.currentThread().getName() + " is exiting.");
231   }
232 
233   public void shutdown() {
234     this.isRunning = false;
235   }
236 }