This project has retired. For details please refer to its Attic page.
PostProcessorManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
31  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD;
32  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD;
33  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD;
34  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME;
35  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_REPOS_DIR_FIELD;
36  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_REPOS_DIR_NAME;
37  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD;
38  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME;
39  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD;
40  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_DATA_LOADER;
41  import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION;
42  import org.apache.hadoop.chukwa.util.ExceptionUtil;
43  import org.apache.hadoop.chukwa.util.HierarchyDataType;
44  import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
45  import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
46  import org.apache.hadoop.fs.FileStatus;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.fs.PathFilter;
50  import org.apache.log4j.Logger;
51  
52  public class PostProcessorManager {
53    static Logger log = Logger.getLogger(PostProcessorManager.class);
54    
55    protected HashMap<String, String> dataSources = new HashMap<String, String>();
56    protected int errorCount = 0;
57    
58    protected int ERROR_SLEEP_TIME = 60;
59    protected ChukwaConfiguration conf = null;
60    protected FileSystem fs = null;
61    protected volatile boolean isRunning = true;
62  
63    private static final int DEFAULT_MAX_ERROR_COUNT = 4;
64    
65    final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
66      public boolean accept(Path file) {
67        return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
68      }     
69    };
70  
71    
72    public PostProcessorManager() throws Exception {
73      this.conf = new ChukwaConfiguration();
74      init();
75    }
76    
77    public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
78      this.conf = conf;
79      init();
80    }
81    
82    protected void init() throws IOException, URISyntaxException {
83      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
84      fs = FileSystem.get(new URI(fsName), conf);
85    }
86    
87    public static void main(String[] args) throws Exception {
88      PostProcessorManager postProcessorManager = new PostProcessorManager();
89      postProcessorManager.start();
90    }
91    
92    public void shutdown() {
93      this.isRunning = false;
94    }
95    
96    public void start() {
97      
98      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
99      if ( ! chukwaRootDir.endsWith("/") ) {
100       chukwaRootDir += "/";
101     }
102     log.info("chukwaRootDir:" + chukwaRootDir);
103     
104     String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
105     if ( ! postProcessDir.endsWith("/") ) {
106       postProcessDir += "/";
107     }
108     
109     String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
110     if ( ! chukwaRootReposDir.endsWith("/") ) {
111       chukwaRootReposDir += "/";
112     }
113  
114     String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
115     if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
116       chukwaPostProcessInErrorDir += "/";
117     }
118 
119     int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
120                                              DEFAULT_MAX_ERROR_COUNT);
121 
122     
123     dataSources = new HashMap<String, String>();
124     Path postProcessDirectory = new Path(postProcessDir);
125     while (isRunning) {
126       
127       if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
128         log.warn("==================\nToo many errors (" + errorCount +
129                  "), Bail out!\n==================");
130         throw new RuntimeException("Bail out!");
131       }
132 
133       try {
134         FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
135         List<String> directories = new ArrayList<String>();
136         for (FileStatus fstatus : demuxOutputDirs) {
137           directories.add(fstatus.getPath().getName());
138         }
139         
140         if (demuxOutputDirs.length == 0) {
141           try { Thread.sleep(10*1000);} catch(InterruptedException e) { /* do nothing*/}
142           continue;
143         }
144         
145         Collections.sort(directories);
146         
147         String directoryToBeProcessed = null;
148         long start = 0;
149         
150         for(String directory : directories) {
151           directoryToBeProcessed = postProcessDirectory + "/"+ directory;
152           
153           log.info("PostProcess Start, directory:" + directoryToBeProcessed);
154           start = System.currentTimeMillis();
155          
156           try {
157             if ( processDataLoaders(directoryToBeProcessed) == true) {
158               Path[] destFiles = movetoMainRepository(
159                 directoryToBeProcessed,chukwaRootReposDir);
160               if (destFiles != null && destFiles.length > 0) {
161                 deleteDirectory(directoryToBeProcessed);
162                 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
163                 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
164                 processPostMoveTriggers(destFiles);
165                 continue;
166               }
167               } else {
168                   log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed + ". Will try again.");
169                   if (errorCount > 3)
170                       moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
171                   else 
172                       errorCount++;
173                   continue;                
174               
175             }
176             
177             // if we are here it's because something bad happen during processing
178             log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
179             moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
180           } catch (Throwable e) {
181             log.warn("Error in processDemuxOutput:" ,e);
182           } 
183         }
184        
185       } catch (Throwable e) {
186         errorCount ++;
187         log.warn(e);
188         try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
189         catch (InterruptedException e1) {/*do nothing*/ }
190       }
191     }
192   }
193   
194   public boolean processDataLoaders(String directory) throws IOException {
195     long start = System.currentTimeMillis();
196     try {
197       String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
198       for(String dataLoaderName : classes) {
199         Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>) Class.forName(dataLoaderName);
200         java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
201             dl.getConstructor();
202         DataLoaderFactory dataloader = c.newInstance();
203         
204           //DataLoaderFactory dataLoader = (DataLoaderFactory) Class.
205           //    forName(dataLoaderName).getConstructor().newInstance();
206         log.info(dataLoaderName+" processing: "+directory);
207         StringBuilder dirSearch = new StringBuilder();
208         dirSearch.append(directory);
209         dirSearch.append("/*/*");
210         log.debug("dirSearch: " + dirSearch);
211         Path demuxDir = new Path(dirSearch.toString());
212         // CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
213         // List all event files under the hierarchy data-type directory
214         PathFilter filter = new PathFilter()
215         {public boolean accept(Path file) {
216           return file.getName().endsWith(".evt");
217         }  };
218         List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, demuxDir,filter,true);
219         FileStatus[] events = eventfiles.toArray(new FileStatus[eventfiles.size()]);
220         dataloader.load(conf, fs, events);
221       }
222     } catch(Exception e) {
223       log.error(ExceptionUtil.getStackTrace(e));
224       return false;
225     }
226     log.info("loadData Duration:" + (System.currentTimeMillis() - start));
227     return true;
228   }
229 
230   public boolean processPostMoveTriggers(Path[] files) throws IOException {
231     long start = System.currentTimeMillis();
232     try {
233       String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
234       if (actions == null || actions.trim().length() == 0) {
235         return true;
236       }
237       log.info("PostProcess firing postMoveTriggers");
238 
239       String[] classes = actions.trim().split(",");
240       for(String actionName : classes) {
241         Class<? extends TriggerAction> actionClass =
242             (Class<? extends TriggerAction>) Class.forName(actionName);
243         java.lang.reflect.Constructor<? extends TriggerAction> c =
244             actionClass.getConstructor();
245         TriggerAction action = c.newInstance();
246 
247         log.info(actionName + " handling " + files.length + " events");
248 
249         //send the files that were just added benieth the repos/ dir.
250         FileStatus[] events = fs.listStatus(files);
251         action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
252       }
253     } catch(Exception e) {
254       log.error(ExceptionUtil.getStackTrace(e));
255       return false;
256     }
257     log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
258     return true;
259   }
260 
261   public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
262     long start = System.currentTimeMillis();
263     Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
264     log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
265     return destFiles;
266   }
267   
268   public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception {
269     Path inErrorDir = new Path(inErrorDirectory);
270     if (!fs.exists(inErrorDir)) {
271       fs.mkdirs(inErrorDir);
272     }
273     
274     if (inErrorDirectory.endsWith("/")) {
275       inErrorDirectory += "/";
276     }
277     String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
278     fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
279     log.warn("Error in postProcess  :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
280     return true;
281   }
282   
283   public boolean deleteDirectory(String directory) throws IOException {
284    return fs.delete(new Path(directory), true);
285   }
286   
287 }