This project has retired. For details please refer to its Attic page.
PostProcessorManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
31  import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32  import org.apache.hadoop.chukwa.util.DaemonWatcher;
33  import org.apache.hadoop.chukwa.util.ExceptionUtil;
34  import org.apache.hadoop.chukwa.util.HierarchyDataType;
35  import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
36  import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.fs.PathFilter;
41  import org.apache.log4j.Logger;
42  
43  public class PostProcessorManager implements CHUKWA_CONSTANT{
44    static Logger log = Logger.getLogger(PostProcessorManager.class);
45    
46    protected static HashMap<String, String> dataSources = new HashMap<String, String>();
47    public static int errorCount = 0;
48    
49    protected int ERROR_SLEEP_TIME = 60;
50    protected ChukwaConfiguration conf = null;
51    protected FileSystem fs = null;
52    protected volatile boolean isRunning = true;
53  
54    private static final int DEFAULT_MAX_ERROR_COUNT = 4;
55    
56    final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
57      public boolean accept(Path file) {
58        return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
59      }     
60    };
61  
62    
63    public PostProcessorManager() throws Exception {
64      this.conf = new ChukwaConfiguration();
65      init();
66    }
67    
68    public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
69      this.conf = conf;
70      init();
71    }
72    
73    protected void init() throws IOException, URISyntaxException {
74      String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
75      fs = FileSystem.get(new URI(fsName), conf);
76    }
77    
78    public static void main(String[] args) throws Exception {
79   
80      DaemonWatcher.createInstance("PostProcessorManager");
81      
82  
83      
84      PostProcessorManager postProcessorManager = new PostProcessorManager();
85      postProcessorManager.start();
86    }
87    
88    public void shutdown() {
89      this.isRunning = false;
90    }
91    
92    public void start() {
93      
94      String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
95      if ( ! chukwaRootDir.endsWith("/") ) {
96        chukwaRootDir += "/";
97      }
98      log.info("chukwaRootDir:" + chukwaRootDir);
99      
100     String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
101     if ( ! postProcessDir.endsWith("/") ) {
102       postProcessDir += "/";
103     }
104     
105     String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
106     if ( ! chukwaRootReposDir.endsWith("/") ) {
107       chukwaRootReposDir += "/";
108     }
109  
110     String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
111     if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
112       chukwaPostProcessInErrorDir += "/";
113     }
114 
115     int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
116                                              DEFAULT_MAX_ERROR_COUNT);
117 
118     
119     dataSources = new HashMap<String, String>();
120     Path postProcessDirectory = new Path(postProcessDir);
121     while (isRunning) {
122       
123       if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
124         log.warn("==================\nToo many errors (" + errorCount +
125                  "), Bail out!\n==================");
126         DaemonWatcher.bailout(-1);
127       }
128 
129       try {
130         FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
131         List<String> directories = new ArrayList<String>();
132         for (FileStatus fstatus : demuxOutputDirs) {
133           directories.add(fstatus.getPath().getName());
134         }
135         
136         if (demuxOutputDirs.length == 0) {
137           try { Thread.sleep(10*1000);} catch(InterruptedException e) { /* do nothing*/}
138           continue;
139         }
140         
141         Collections.sort(directories);
142         
143         String directoryToBeProcessed = null;
144         long start = 0;
145         
146         for(String directory : directories) {
147           directoryToBeProcessed = postProcessDirectory + "/"+ directory;
148           
149           log.info("PostProcess Start, directory:" + directoryToBeProcessed);
150           start = System.currentTimeMillis();
151          
152           try {
153             if ( processDataLoaders(directoryToBeProcessed) == true) {
154               Path[] destFiles = movetoMainRepository(
155                 directoryToBeProcessed,chukwaRootReposDir);
156               if (destFiles != null && destFiles.length > 0) {
157                 deleteDirectory(directoryToBeProcessed);
158                 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
159                 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
160                 processPostMoveTriggers(destFiles);
161                 continue;
162               }
163               } else {
164                   log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed + ". Will try again.");
165                   if (errorCount > 3)
166                       moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
167                   else 
168                       errorCount++;
169                   continue;                
170               
171             }
172             
173             // if we are here it's because something bad happen during processing
174             log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
175             moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir); 
176           } catch (Throwable e) {
177             log.warn("Error in processDemuxOutput:" ,e);
178           } 
179         }
180        
181       } catch (Throwable e) {
182         errorCount ++;
183         log.warn(e);
184         try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
185         catch (InterruptedException e1) {/*do nothing*/ }
186       }
187     }
188   }
189   
190   public boolean processDataLoaders(String directory) throws IOException {
191     long start = System.currentTimeMillis();
192     try {
193       String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
194       for(String dataLoaderName : classes) {
195         Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>) Class.forName(dataLoaderName);
196         java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
197             dl.getConstructor();
198         DataLoaderFactory dataloader = c.newInstance();
199         
200           //DataLoaderFactory dataLoader = (DataLoaderFactory) Class.
201           //    forName(dataLoaderName).getConstructor().newInstance();
202         log.info(dataLoaderName+" processing: "+directory);
203         StringBuilder dirSearch = new StringBuilder();
204         dirSearch.append(directory);
205         dirSearch.append("/*/*");
206         log.debug("dirSearch: " + dirSearch);
207         Path demuxDir = new Path(dirSearch.toString());
208         // CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format
209         // List all event files under the hierarchy data-type directory
210         PathFilter filter = new PathFilter()
211         {public boolean accept(Path file) {
212           return file.getName().endsWith(".evt");
213         }  };
214         List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, demuxDir,filter,true);
215         FileStatus[] events = eventfiles.toArray(new FileStatus[eventfiles.size()]);
216         dataloader.load(conf, fs, events);
217       }
218     } catch(Exception e) {
219       log.error(ExceptionUtil.getStackTrace(e));
220       return false;
221     }
222     log.info("loadData Duration:" + (System.currentTimeMillis() - start));
223     return true;
224   }
225 
226   public boolean processPostMoveTriggers(Path[] files) throws IOException {
227     long start = System.currentTimeMillis();
228     try {
229       String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
230       if (actions == null || actions.trim().length() == 0) {
231         return true;
232       }
233       log.info("PostProcess firing postMoveTriggers");
234 
235       String[] classes = actions.trim().split(",");
236       for(String actionName : classes) {
237         Class<? extends TriggerAction> actionClass =
238             (Class<? extends TriggerAction>) Class.forName(actionName);
239         java.lang.reflect.Constructor<? extends TriggerAction> c =
240             actionClass.getConstructor();
241         TriggerAction action = c.newInstance();
242 
243         log.info(actionName + " handling " + files.length + " events");
244 
245         //send the files that were just added benieth the repos/ dir.
246         FileStatus[] events = fs.listStatus(files);
247         action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
248       }
249     } catch(Exception e) {
250       log.error(ExceptionUtil.getStackTrace(e));
251       return false;
252     }
253     log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
254     return true;
255   }
256 
257   public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
258     long start = System.currentTimeMillis();
259     Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
260     log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
261     return destFiles;
262   }
263   
264   public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception {
265     Path inErrorDir = new Path(inErrorDirectory);
266     if (!fs.exists(inErrorDir)) {
267       fs.mkdirs(inErrorDir);
268     }
269     
270     if (inErrorDirectory.endsWith("/")) {
271       inErrorDirectory += "/";
272     }
273     String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
274     fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
275     log.warn("Error in postProcess  :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
276     return true;
277   }
278   
279   public boolean deleteDirectory(String directory) throws IOException {
280    return fs.delete(new Path(directory), true);
281   }
282   
283 }