This project has retired. For details please refer to its Attic page.
DailyChukwaRecordRolling xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.ExceptionUtil;
32  import org.apache.hadoop.chukwa.util.HierarchyDataType;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FileUtil;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.mapred.FileInputFormat;
39  import org.apache.hadoop.mapred.FileOutputFormat;
40  import org.apache.hadoop.mapred.JobClient;
41  import org.apache.hadoop.mapred.JobConf;
42  import org.apache.hadoop.mapred.JobPriority;
43  import org.apache.hadoop.mapred.SequenceFileInputFormat;
44  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45  import org.apache.hadoop.mapred.lib.IdentityMapper;
46  import org.apache.hadoop.mapred.lib.IdentityReducer;
47  import org.apache.hadoop.util.Tool;
48  import org.apache.log4j.Logger;
49  
50  // TODO do an abstract class for all rolling 
51  public class DailyChukwaRecordRolling extends Configured implements Tool {
52    static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
53  
54    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
55    static ChukwaConfiguration conf = null;
56    static FileSystem fs = null;
57    static final String HadoopLogDir = "_logs";
58    static final String hadoopTempDir = "_temporary";
59  
60    static boolean rollInSequence = true;
61    static boolean deleteRawdata = false;
62  
63    public static void usage() {
64      System.err
65          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
66    }
67  
68    public static boolean hourlyRolling(String dailyStreamDirectory) {
69     
70      Path pHour = null;
71      try {
72        log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
73        
74        for (int i=0;i<24;i++) {
75          pHour = new Path(dailyStreamDirectory + "/" + i);
76          if (! fs.exists(pHour)) {
77            log.info("HourlyData is missing for:" + pHour);
78            continue;
79          } else {
80            FileStatus[] files = fs.listStatus(pHour);
81            boolean containsHourly = false;
82            for(FileStatus file: files) {
83              log.info("Debug checking" + file.getPath());
84              if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
85                containsHourly = true;
86                break;
87              }
88            }
89            if (containsHourly == false) {
90              log.info("HourlyDone is missing for : " + pHour);
91              return false;
92            }
93          }
94        }
95        return true;
96      }catch(Exception e) {
97        e.printStackTrace();
98        return false;
99      }
100   }
101   public static void buildDailyFiles(String chukwaMainRepository,
102       String tempDir, String rollingFolder, int workingDay) throws IOException {
103     // process
104     
105     boolean alldone = true;
106     
107     Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
108     FileStatus[] clustersFS = fs.listStatus(dayPath);
109     for (FileStatus clusterFs : clustersFS) {
110       String cluster = clusterFs.getPath().getName();
111 
112       Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
113           + workingDay + "/" + cluster);
114       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
115       for (FileStatus dataSourceFS : dataSourcesFS) {
116         //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
117         for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
118             dataSourceFS.getPath(), true)) {
119           String dataSource = HierarchyDataType.getDataType(
120               dataSourcePath.getPath(),
121               fs.getFileStatus(dataSourceClusterHourPaths).getPath());
122           // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
123 
124 
125           // put the rotate flag
126           fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
127               + dataSource + "/" + workingDay + "/rotateDone"));
128 
129           if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
130             log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
131                 + dataSource + "/" + workingDay );
132             alldone = false;
133             continue;
134           } 
135 
136           log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
137               + dataSource + "/" + workingDay + "/rotateDone");
138 
139           // rotate
140           // Merge
141           String[] mergeArgs = new String[5];
142           // input
143           mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
144               + "/" + workingDay + "/[0-9]*/*.evt";
145           // temp dir
146           mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
147               + workingDay + "_" + System.currentTimeMillis();
148           // final output dir
149           mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
150               + "/" + workingDay;
151           // final output fileName
152           mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
153           // delete rolling directory
154           mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
155               + "/" + dataSource;
156 
157           log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
158           log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
159           log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
160           log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
161           log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
162 
163           RecordMerger merge = new RecordMerger(conf, fs,
164               new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
165           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
166           if (rollInSequence) {
167             merge.mergeRecords();
168           } else {
169             allMerge.add(merge);
170             merge.start();
171           }
172 
173           // join all Threads
174           if (!rollInSequence) {
175             while (allMerge.size() > 0) {
176               RecordMerger m = allMerge.remove(0);
177               try {
178                 m.join();
179               } catch (InterruptedException e) {
180               }
181             }
182           } // End if (!rollInSequence)
183 
184           // Delete the processed dataSourceFS
185           FileUtil.fullyDelete(fs, dataSourceFS.getPath());
186 
187         } // End for(FileStatus dataSourceFS : dataSourcesFS)
188 
189         // Delete the processed clusterFs
190         if (alldone == true) {
191           FileUtil.fullyDelete(fs, clusterFs.getPath());
192         }
193 
194 
195       } // End for(FileStatus clusterFs : clustersFS)
196     }
197     // Delete the processed dayPath
198     if (alldone == true) {
199       FileUtil.fullyDelete(fs, dayPath);
200     }
201     
202   }
203 
204   /**
205    * @param args
206    * @throws Exception
207    */
208   public static void main(String[] args) throws Exception {
209     
210     
211     conf = new ChukwaConfiguration();
212     String fsName = conf.get("writer.hdfs.filesystem");
213     fs = FileSystem.get(new URI(fsName), conf);
214 
215     // TODO read from config
216     String rollingFolder = "/chukwa/rolling/";
217     String chukwaMainRepository = "/chukwa/repos/";
218     String tempDir = "/chukwa/temp/dailyRolling/";
219 
220     // TODO do a real parameter parsing
221     if (args.length != 4) {
222       usage();
223       return;
224     }
225 
226     if (!args[0].equalsIgnoreCase("rollInSequence")) {
227       usage();
228       return;
229     }
230 
231     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
232       usage();
233       return;
234     }
235 
236     if (args[1].equalsIgnoreCase("true")) {
237       rollInSequence = true;
238     } else {
239       rollInSequence = false;
240     }
241 
242     if (args[3].equalsIgnoreCase("true")) {
243       deleteRawdata = true;
244     } else {
245       deleteRawdata = false;
246     }
247 
248     log.info("rollInSequence: " + rollInSequence);
249     log.info("deleteRawdata: " + deleteRawdata);
250 
251     Calendar calendar = Calendar.getInstance();
252     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
253     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
254     log.info("CurrentDay: " + currentDay);
255     log.info("currentHour" + currentHour);
256 
257     Path rootFolder = new Path(rollingFolder + "/daily/");
258 
259     FileStatus[] daysFS = fs.listStatus(rootFolder);
260     for (FileStatus dayFS : daysFS) {
261       try {
262         int workingDay = Integer.parseInt(dayFS.getPath().getName());
263         log.info("Daily working on :" + workingDay);
264         if (workingDay < currentDay) {
265           
266           try {
267             buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
268                 workingDay);
269           } catch(Throwable e) {
270             e.printStackTrace();
271             log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay  ) ;
272           }
273           
274         } // End if ( workingDay < currentDay)
275       } // End Try workingDay =
276         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
277       catch (NumberFormatException e) { /* Not a standard Day directory skip */
278         log.debug(ExceptionUtil.getStackTrace(e));
279       }
280 
281     } // for(FileStatus dayFS : daysFS)
282   }
283 
284   public int run(String[] args) throws Exception {
285     JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
286 
287     conf.setJobName("DailyChukwa-Rolling");
288     conf.setInputFormat(SequenceFileInputFormat.class);
289 
290     conf.setMapperClass(IdentityMapper.class);
291     conf.setReducerClass(IdentityReducer.class);
292 
293     conf.setOutputKeyClass(ChukwaRecordKey.class);
294     conf.setOutputValueClass(ChukwaRecord.class);
295     conf.setOutputFormat(SequenceFileOutputFormat.class);
296 
297     log.info("DailyChukwaRecordRolling input: " + args[0]);
298     log.info("DailyChukwaRecordRolling output: " + args[1]);
299 
300     FileInputFormat.setInputPaths(conf, args[0]);
301     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
302     conf.setJobPriority(JobPriority.LOW);
303     conf.setNumReduceTasks(1);
304     JobClient.runJob(conf);
305     return 0;
306   }
307 
308 }