This project has retired. For details please refer to its Attic page.
ChukwaArchiveManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.archive;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.text.SimpleDateFormat;
25  
26  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.util.ToolRunner;
32  import org.apache.log4j.Logger;
33  
34  public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
35    static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
36    SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
37    
38    static final  int ONE_HOUR = 60 * 60 * 1000;
39    static final int ONE_DAY = 24*ONE_HOUR;
40    static final int MAX_FILES = 500;
41  
42    private static final int DEFAULT_MAX_ERROR_COUNT = 4;
43  
44    protected ChukwaConfiguration conf = null;
45    protected FileSystem fs = null;
46    protected boolean isRunning = true;
47    
48    public ChukwaArchiveManager() throws Exception { 
49      conf = new ChukwaConfiguration();
50      init();
51    }
52  
53    protected void init() throws IOException, URISyntaxException {
54      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
55      fs = FileSystem.get(new URI(fsName), conf);
56    }
57  
58    public static void main(String[] args) throws Exception {
59      
60      ChukwaArchiveManager manager = new ChukwaArchiveManager();
61      manager.start();
62    }
63  
64    public void shutdown() {
65      this.isRunning = false;
66    }
67    
68    public void start() throws Exception {
69      
70      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
71      if ( ! chukwaRootDir.endsWith("/") ) {
72        chukwaRootDir += "/";
73      }
74      log.info("chukwaRootDir:" + chukwaRootDir);
75      
76      String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
77      if ( ! archiveRootDir.endsWith("/") ) {
78        archiveRootDir += "/";
79      }
80      log.info("archiveDir:" + archiveRootDir);
81      Path pArchiveRootDir = new Path(archiveRootDir);
82      setup(pArchiveRootDir);
83      
84      String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
85      // String archivesErrorDir = archivesRootProcessingDir + DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME;
86      String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
87      String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
88      String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
89  
90      int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
91                                               DEFAULT_MAX_ERROR_COUNT);
92      
93      Path pDailyRawArchivesInput = new Path(archiveRootDir);
94      Path pArchivesMRInputDir = new Path(archivesMRInputDir);
95      Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
96      Path pFinalArchiveOutput =  new Path(finalArchiveOutput);
97      
98      
99      if (!archivesMRInputDir.endsWith("/")) {
100       archivesMRInputDir +="/";
101     }
102     setup( pArchivesRootProcessingDir );
103     setup( pDailyRawArchivesInput );
104     setup( pFinalArchiveOutput );
105     
106     int errorCount = 0;
107     
108     long lastRun = 0l;
109     
110     while (isRunning) {
111       try {
112         
113         if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
114           log.warn("==================\nToo many errors (" + errorCount +
115                    "), Bail out!\n==================");
116           break;
117         }
118         // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
119         //  to
120         // /chukwa/archives/final/<YYYYMMDD>_<TS>
121         
122         if (fs.exists(pArchivesMRInputDir)) {
123           FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
124           if (days.length > 0) {
125             log.info("reprocessing current Archive input" +  days[0].getPath());
126             
127             runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);  
128             errorCount = 0;
129             continue;
130           }
131         }
132         
133         
134         log.info("Raw Archive dir:" + pDailyRawArchivesInput);
135         long now = System.currentTimeMillis();
136         int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
137         FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
138         
139         if (daysInRawArchiveDir.length == 0 ) { 
140           log.debug( pDailyRawArchivesInput + " is empty, going to sleep for 1 minute"); 
141           Thread.sleep(1 * 60 * 1000); 
142           continue; 
143         } 
144         // We don't want to process DataSink file more than once every 2 hours
145         // for current day
146         if (daysInRawArchiveDir.length == 1 ) {
147           int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
148           long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);// 2h -1min
149           if (workingDay == currentDay && now < nextRun) {
150             log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
151             Thread.sleep(30 * 60 * 1000);
152             continue;
153           }
154         }
155         
156         String dayArchivesMRInputDir = null;
157         for (FileStatus fsDay : daysInRawArchiveDir) {
158           dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
159           processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
160           lastRun = now;
161         }
162         
163       }catch (Throwable e) {
164         errorCount ++;
165         e.printStackTrace();
166         log.warn(e);
167       }
168       
169     }
170     
171   }
172   
173   public void processDay(FileStatus fsDay, String archivesMRInputDir,
174       String archivesMROutputDir,String finalArchiveOutput) throws Exception {
175     FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
176     long now = System.currentTimeMillis();
177     
178     int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
179     int workingDay = Integer.parseInt(fsDay.getPath().getName());
180     
181     long oneHourAgo = now -  ONE_HOUR;
182     if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
183       fs.delete(fsDay.getPath(),false);
184       log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
185       return;
186     }
187     
188     int fileCount = 0;
189     for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
190       long modificationDate = fsDataSinkDir.getModificationTime();
191       if (modificationDate < oneHourAgo || workingDay < currentDay) {
192         log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
193         fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
194         moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
195         // process no more than MAX_FILES directories
196         if (fileCount >= MAX_FILES) {
197           log.info("processDay, reach capacity");
198           runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);  
199           fileCount = 0;
200         } else {
201           log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
202         }
203       }
204     }    
205   }
206   
207   public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
208       String finalArchiveOutput) throws Exception {
209     String[] args = new String[3];
210     
211     
212     args[0] = conf.get("archive.grouper","Stream");
213     args[1] = archivesMRInputDir + "*/*.done" ;
214     args[2] = archivesMROutputDir;
215     
216     Path pArchivesMRInputDir = new Path(archivesMRInputDir);
217     Path pArchivesMROutputDir = new Path(archivesMROutputDir);
218 
219     
220     if (fs.exists(pArchivesMROutputDir)) {
221       log.warn("Deleteing mroutput dir for archive ...");
222       fs.delete(pArchivesMROutputDir, true);
223     }
224     
225     log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
226     int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
227     log.info("Archive result: " + res);
228     if (res != 0) {
229       throw new Exception("Archive result != 0");
230     }
231    
232     if (!finalArchiveOutput.endsWith("/")) {
233       finalArchiveOutput +="/";
234     }
235     String day = pArchivesMRInputDir.getName();
236     finalArchiveOutput += day;
237     Path pDay = new Path(finalArchiveOutput);
238     setup(pDay);
239     
240     finalArchiveOutput += "/archive_" + System.currentTimeMillis();
241     Path pFinalArchiveOutput = new Path(finalArchiveOutput);
242     
243     log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
244     
245     if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
246       log.info("deleting " + pArchivesMRInputDir);
247       fs.delete(pArchivesMRInputDir, true);
248     } else {
249       log.warn("move to final archive folder failed!");
250     }
251     
252 
253     
254   }
255   
256   public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
257       String archivesMRInputDir) throws IOException {
258     
259     if (!archivesMRInputDir.endsWith("/")) {
260       archivesMRInputDir +="/";
261     }
262     
263     Path pArchivesMRInputDir = new Path(archivesMRInputDir);
264     setup(pArchivesMRInputDir);
265     fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
266     log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
267   }
268   
269   /**
270    * Create directory if !exists
271    * @param directory
272    * @throws IOException
273    */
274   protected void setup(Path directory) throws IOException {
275      if ( ! fs.exists(directory)) {
276        fs.mkdirs(directory);
277      }
278   }
279  
280 }