This project has retired. For details please refer to its
Attic page.
LocalToRemoteHdfsMover xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.localfs;
20
21 import java.io.FileNotFoundException;
22 import java.net.URI;
23 import java.util.concurrent.BlockingQueue;
24
25 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
26 import org.apache.hadoop.chukwa.util.CopySequenceFile;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class LocalToRemoteHdfsMover extends Thread {
49 static Logger log = Logger.getLogger(LocalToRemoteHdfsMover.class);
50
51 private FileSystem remoteFs = null;
52 private FileSystem localFs = null;
53 private Configuration conf = null;
54 private String fsname = null;
55 private String localOutputDir = null;
56 private String remoteOutputDir = null;
57 private boolean exitIfHDFSNotavailable = false;
58 private BlockingQueue<String> fileQueue = null;
59 private volatile boolean isRunning = true;
60
61 public LocalToRemoteHdfsMover(BlockingQueue<String> fileQueue ,Configuration conf) {
62 this.fileQueue = fileQueue;
63 this.conf = conf;
64 this.setDaemon(true);
65 this.setName("LocalToRemoteHdfsMover");
66 this.start();
67 }
68
69 protected void init() throws Throwable {
70
71
72 fsname = conf.get("writer.hdfs.filesystem");
73 if (fsname == null || fsname.equals("")) {
74
75 fsname = conf.get("fs.defaultFS");
76 }
77
78 if (fsname == null) {
79 log.error("no filesystem name");
80 throw new RuntimeException("no filesystem");
81 }
82
83 log.info("remote fs name is " + fsname);
84 exitIfHDFSNotavailable = conf.getBoolean(
85 "localToRemoteHdfsMover.exitIfHDFSNotavailable", false);
86
87 remoteFs = FileSystem.get(new URI(fsname), conf);
88 if (remoteFs == null && exitIfHDFSNotavailable) {
89 log.error("can't connect to HDFS.");
90 throw new WriterException("can't connect to HDFS.");
91 }
92
93 localFs = FileSystem.getLocal(conf);
94
95 remoteOutputDir = conf.get("chukwaCollector.outputDir", "/chukwa/logs/");
96 if (!remoteOutputDir.endsWith("/")) {
97 remoteOutputDir += "/";
98 }
99
100 localOutputDir = conf.get("chukwaCollector.localOutputDir",
101 "/chukwa/datasink/");
102 if (!localOutputDir.endsWith("/")) {
103 localOutputDir += "/";
104 }
105
106 }
107
108 protected void moveFile(String filePath) throws Exception{
109 String remoteFilePath = filePath.substring(filePath.lastIndexOf("/")+1,filePath.lastIndexOf("."));
110 remoteFilePath = remoteOutputDir + remoteFilePath;
111 try {
112 Path pLocalPath = new Path(filePath);
113 Path pRemoteFilePath = new Path(remoteFilePath + ".chukwa");
114 remoteFs.copyFromLocalFile(false, true, pLocalPath, pRemoteFilePath);
115 Path pFinalRemoteFilePath = new Path(remoteFilePath + ".done");
116 if ( remoteFs.rename(pRemoteFilePath, pFinalRemoteFilePath)) {
117 localFs.delete(pLocalPath,false);
118 log.info("move done deleting from local: " + pLocalPath);
119 } else {
120 throw new RuntimeException("Cannot rename remote file, " + pRemoteFilePath + " to " + pFinalRemoteFilePath);
121 }
122 }catch(FileNotFoundException ex) {
123 log.debug("File not found: " + remoteFilePath);
124
125
126 }
127 catch (Exception e) {
128 log.warn("Cannot copy to the remote HDFS",e);
129 throw e;
130 }
131 }
132
133 protected void cleanup() throws Exception{
134 try {
135 int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
136 1000 * 60 * 5);
137
138 Path pLocalOutputDir = new Path(localOutputDir);
139 FileStatus[] files = localFs.listStatus(pLocalOutputDir);
140 String fileName = null;
141 for (FileStatus file: files) {
142 fileName = file.getPath().getName();
143
144 if (fileName.endsWith(".recover")) {
145
146
147 Path recoverPath= new Path(localOutputDir+fileName);
148 localFs.delete(recoverPath, false);
149 log.info("Deleted .recover file, " + localOutputDir + fileName);
150 } else if (fileName.endsWith(".recoverDone")) {
151
152
153
154 String chukwaFileName= fileName.replace(".recoverDone", ".chukwa");
155 Boolean fileNotFound=true;
156 int i=0;
157 while (i<files.length && fileNotFound) {
158 String currentFileName = files[i].getPath().getName();
159
160 if (currentFileName.equals(chukwaFileName)){
161
162
163 fileNotFound = false;
164 Path chukwaFilePath = new Path(localOutputDir+chukwaFileName);
165 localFs.delete(chukwaFilePath,false);
166 log.info(".recoverDone file exists, deleted duplicate .chukwa file, "
167 + localOutputDir + fileName);
168 }
169 i++;
170 }
171
172
173 String doneFileName= fileName.replace(".recoverDone", ".done");
174 Path donePath= new Path(localOutputDir+doneFileName);
175 Path recoverDonePath= new Path(localOutputDir+fileName);
176 localFs.rename(recoverDonePath, donePath);
177 log.info("Renamed .recoverDone file to .done, "+ localOutputDir + fileName);
178 } else if (fileName.endsWith(".done")) {
179 moveFile(localOutputDir + fileName);
180 } else if (fileName.endsWith(".chukwa")) {
181 long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
182 if (file.getModificationTime() < lastPeriod) {
183
184
185 log.info("Copying .chukwa file to valid sink file before moving, " + localOutputDir + fileName);
186 CopySequenceFile.createValidSequenceFile(conf,localOutputDir,fileName,localFs);
187 }
188 }
189 }
190 } catch (Exception e) {
191 log.warn("Cannot copy to the remote HDFS",e);
192 throw e;
193 }
194 }
195
196 @Override
197 public void run() {
198 boolean inError = true;
199 String filePath = null;
200
201 while (isRunning) {
202 try {
203 if (inError) {
204 init();
205 cleanup();
206 inError = false;
207 }
208
209 filePath = fileQueue.take();
210
211 if (filePath == null) {
212 continue;
213 }
214
215 moveFile(filePath);
216 cleanup();
217 filePath = null;
218
219 } catch (Throwable e) {
220 log.warn("Error in LocalToHdfsMover", e);
221 inError = true;
222 try {
223 log.info("Got an exception going to sleep for 60 secs");
224 Thread.sleep(60000);
225 } catch (Throwable e2) {
226 log.warn("Exception while sleeping", e2);
227 }
228 }
229 }
230 log.info(Thread.currentThread().getName() + " is exiting.");
231 }
232
233 public void shutdown() {
234 this.isRunning = false;
235 }
236 }