This project has retired. For details please refer to its
Attic page.
PostProcessorManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
31 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD;
32 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD;
33 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD;
34 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME;
35 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_REPOS_DIR_FIELD;
36 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_REPOS_DIR_NAME;
37 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD;
38 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME;
39 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD;
40 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_DATA_LOADER;
41 import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION;
42 import org.apache.hadoop.chukwa.util.ExceptionUtil;
43 import org.apache.hadoop.chukwa.util.HierarchyDataType;
44 import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
45 import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
46 import org.apache.hadoop.fs.FileStatus;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.PathFilter;
50 import org.apache.log4j.Logger;
51
52 public class PostProcessorManager {
53 static Logger log = Logger.getLogger(PostProcessorManager.class);
54
55 protected HashMap<String, String> dataSources = new HashMap<String, String>();
56 protected int errorCount = 0;
57
58 protected int ERROR_SLEEP_TIME = 60;
59 protected ChukwaConfiguration conf = null;
60 protected FileSystem fs = null;
61 protected volatile boolean isRunning = true;
62
63 private static final int DEFAULT_MAX_ERROR_COUNT = 4;
64
65 final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
66 public boolean accept(Path file) {
67 return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
68 }
69 };
70
71
72 public PostProcessorManager() throws Exception {
73 this.conf = new ChukwaConfiguration();
74 init();
75 }
76
77 public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
78 this.conf = conf;
79 init();
80 }
81
82 protected void init() throws IOException, URISyntaxException {
83 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
84 fs = FileSystem.get(new URI(fsName), conf);
85 }
86
87 public static void main(String[] args) throws Exception {
88 PostProcessorManager postProcessorManager = new PostProcessorManager();
89 postProcessorManager.start();
90 }
91
92 public void shutdown() {
93 this.isRunning = false;
94 }
95
96 public void start() {
97
98 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
99 if ( ! chukwaRootDir.endsWith("/") ) {
100 chukwaRootDir += "/";
101 }
102 log.info("chukwaRootDir:" + chukwaRootDir);
103
104 String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
105 if ( ! postProcessDir.endsWith("/") ) {
106 postProcessDir += "/";
107 }
108
109 String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
110 if ( ! chukwaRootReposDir.endsWith("/") ) {
111 chukwaRootReposDir += "/";
112 }
113
114 String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
115 if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
116 chukwaPostProcessInErrorDir += "/";
117 }
118
119 int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
120 DEFAULT_MAX_ERROR_COUNT);
121
122
123 dataSources = new HashMap<String, String>();
124 Path postProcessDirectory = new Path(postProcessDir);
125 while (isRunning) {
126
127 if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
128 log.warn("==================\nToo many errors (" + errorCount +
129 "), Bail out!\n==================");
130 throw new RuntimeException("Bail out!");
131 }
132
133 try {
134 FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
135 List<String> directories = new ArrayList<String>();
136 for (FileStatus fstatus : demuxOutputDirs) {
137 directories.add(fstatus.getPath().getName());
138 }
139
140 if (demuxOutputDirs.length == 0) {
141 try { Thread.sleep(10*1000);} catch(InterruptedException e) {
142 continue;
143 }
144
145 Collections.sort(directories);
146
147 String directoryToBeProcessed = null;
148 long start = 0;
149
150 for(String directory : directories) {
151 directoryToBeProcessed = postProcessDirectory + "/"+ directory;
152
153 log.info("PostProcess Start, directory:" + directoryToBeProcessed);
154 start = System.currentTimeMillis();
155
156 try {
157 if ( processDataLoaders(directoryToBeProcessed) == true) {
158 Path[] destFiles = movetoMainRepository(
159 directoryToBeProcessed,chukwaRootReposDir);
160 if (destFiles != null && destFiles.length > 0) {
161 deleteDirectory(directoryToBeProcessed);
162 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
163 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
164 processPostMoveTriggers(destFiles);
165 continue;
166 }
167 } else {
168 log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed + ". Will try again.");
169 if (errorCount > 3)
170 moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);
171 else
172 errorCount++;
173 continue;
174
175 }
176
177
178 log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
179 moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);
180 } catch (Throwable e) {
181 log.warn("Error in processDemuxOutput:" ,e);
182 }
183 }
184
185 } catch (Throwable e) {
186 errorCount ++;
187 log.warn(e);
188 try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
189 catch (InterruptedException e1) {
190 }
191 }
192 }
193
194 public boolean processDataLoaders(String directory) throws IOException {
195 long start = System.currentTimeMillis();
196 try {
197 String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
198 for(String dataLoaderName : classes) {
199 Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>) Class.forName(dataLoaderName);
200 java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
201 dl.getConstructor();
202 DataLoaderFactory dataloader = c.newInstance();
203
204
205
206 log.info(dataLoaderName+" processing: "+directory);
207 StringBuilder dirSearch = new StringBuilder();
208 dirSearch.append(directory);
209 dirSearch.append("/*/*");
210 log.debug("dirSearch: " + dirSearch);
211 Path demuxDir = new Path(dirSearch.toString());
212
213
214 PathFilter filter = new PathFilter()
215 {public boolean accept(Path file) {
216 return file.getName().endsWith(".evt");
217 } };
218 List<FileStatus> eventfiles = HierarchyDataType.globStatus(fs, demuxDir,filter,true);
219 FileStatus[] events = eventfiles.toArray(new FileStatus[eventfiles.size()]);
220 dataloader.load(conf, fs, events);
221 }
222 } catch(Exception e) {
223 log.error(ExceptionUtil.getStackTrace(e));
224 return false;
225 }
226 log.info("loadData Duration:" + (System.currentTimeMillis() - start));
227 return true;
228 }
229
230 public boolean processPostMoveTriggers(Path[] files) throws IOException {
231 long start = System.currentTimeMillis();
232 try {
233 String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
234 if (actions == null || actions.trim().length() == 0) {
235 return true;
236 }
237 log.info("PostProcess firing postMoveTriggers");
238
239 String[] classes = actions.trim().split(",");
240 for(String actionName : classes) {
241 Class<? extends TriggerAction> actionClass =
242 (Class<? extends TriggerAction>) Class.forName(actionName);
243 java.lang.reflect.Constructor<? extends TriggerAction> c =
244 actionClass.getConstructor();
245 TriggerAction action = c.newInstance();
246
247 log.info(actionName + " handling " + files.length + " events");
248
249
250 FileStatus[] events = fs.listStatus(files);
251 action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
252 }
253 } catch(Exception e) {
254 log.error(ExceptionUtil.getStackTrace(e));
255 return false;
256 }
257 log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
258 return true;
259 }
260
261 public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
262 long start = System.currentTimeMillis();
263 Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
264 log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
265 return destFiles;
266 }
267
268 public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception {
269 Path inErrorDir = new Path(inErrorDirectory);
270 if (!fs.exists(inErrorDir)) {
271 fs.mkdirs(inErrorDir);
272 }
273
274 if (inErrorDirectory.endsWith("/")) {
275 inErrorDirectory += "/";
276 }
277 String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
278 fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
279 log.warn("Error in postProcess :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
280 return true;
281 }
282
283 public boolean deleteDirectory(String directory) throws IOException {
284 return fs.delete(new Path(directory), true);
285 }
286
287 }