This project has retired. For details please refer to its Attic page.
DailyChukwaRecordRolling xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.DaemonWatcher;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.chukwa.util.HierarchyDataType;
34  import org.apache.hadoop.conf.Configured;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.FileUtil;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.mapred.FileInputFormat;
40  import org.apache.hadoop.mapred.FileOutputFormat;
41  import org.apache.hadoop.mapred.JobClient;
42  import org.apache.hadoop.mapred.JobConf;
43  import org.apache.hadoop.mapred.JobPriority;
44  import org.apache.hadoop.mapred.SequenceFileInputFormat;
45  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
46  import org.apache.hadoop.mapred.lib.IdentityMapper;
47  import org.apache.hadoop.mapred.lib.IdentityReducer;
48  import org.apache.hadoop.util.Tool;
49  import org.apache.log4j.Logger;
50  
51  // TODO do an abstract class for all rolling 
52  public class DailyChukwaRecordRolling extends Configured implements Tool {
53    static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
54  
55    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
56    static ChukwaConfiguration conf = null;
57    static FileSystem fs = null;
58    static final String HadoopLogDir = "_logs";
59    static final String hadoopTempDir = "_temporary";
60  
61    static boolean rollInSequence = true;
62    static boolean deleteRawdata = false;
63  
64    public static void usage() {
65      System.err
66          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
67      System.exit(-1);
68    }
69  
70    public static boolean hourlyRolling(String dailyStreamDirectory) {
71     
72      Path pHour = null;
73      try {
74        log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
75        
76        for (int i=0;i<24;i++) {
77          pHour = new Path(dailyStreamDirectory + "/" + i);
78          if (! fs.exists(pHour)) {
79            log.info("HourlyData is missing for:" + pHour);
80            continue;
81          } else {
82            FileStatus[] files = fs.listStatus(pHour);
83            boolean containsHourly = false;
84            for(FileStatus file: files) {
85              log.info("Debug checking" + file.getPath());
86              if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
87                containsHourly = true;
88                break;
89              }
90            }
91            if (containsHourly == false) {
92              log.info("HourlyDone is missing for : " + pHour);
93              return false;
94            }
95          }
96        }
97        return true;
98      }catch(Exception e) {
99        e.printStackTrace();
100       return false;
101     }
102   }
103   public static void buildDailyFiles(String chukwaMainRepository,
104       String tempDir, String rollingFolder, int workingDay) throws IOException {
105     // process
106     
107     boolean alldone = true;
108     
109     Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
110     FileStatus[] clustersFS = fs.listStatus(dayPath);
111     for (FileStatus clusterFs : clustersFS) {
112       String cluster = clusterFs.getPath().getName();
113 
114       Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
115           + workingDay + "/" + cluster);
116       FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
117       for (FileStatus dataSourceFS : dataSourcesFS) {
118         //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
119         for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
120             dataSourceFS.getPath(), true)) {
121           String dataSource = HierarchyDataType.getDataType(
122               dataSourcePath.getPath(),
123               fs.getFileStatus(dataSourceClusterHourPaths).getPath());
124           // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
125 
126 
127           // put the rotate flag
128           fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
129               + dataSource + "/" + workingDay + "/rotateDone"));
130 
131           if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) {
132             log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/"
133                 + dataSource + "/" + workingDay );
134             alldone = false;
135             continue;
136           } 
137 
138           log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/"
139               + dataSource + "/" + workingDay + "/rotateDone");
140 
141           // rotate
142           // Merge
143           String[] mergeArgs = new String[5];
144           // input
145           mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
146               + "/" + workingDay + "/[0-9]*/*.evt";
147           // temp dir
148           mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
149               + workingDay + "_" + System.currentTimeMillis();
150           // final output dir
151           mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
152               + "/" + workingDay;
153           // final output fileName
154           mergeArgs[3] = dataSource + "_DailyDone_"  + workingDay;
155           // delete rolling directory
156           mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
157               + "/" + dataSource;
158 
159           log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
160           log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
161           log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
162           log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
163           log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
164 
165           RecordMerger merge = new RecordMerger(conf, fs,
166               new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
167           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
168           if (rollInSequence) {
169             merge.run();
170           } else {
171             allMerge.add(merge);
172             merge.start();
173           }
174 
175           // join all Threads
176           if (!rollInSequence) {
177             while (allMerge.size() > 0) {
178               RecordMerger m = allMerge.remove(0);
179               try {
180                 m.join();
181               } catch (InterruptedException e) {
182               }
183             }
184           } // End if (!rollInSequence)
185 
186           // Delete the processed dataSourceFS
187           FileUtil.fullyDelete(fs, dataSourceFS.getPath());
188 
189         } // End for(FileStatus dataSourceFS : dataSourcesFS)
190 
191         // Delete the processed clusterFs
192         if (alldone == true) {
193           FileUtil.fullyDelete(fs, clusterFs.getPath());
194         }
195 
196 
197       } // End for(FileStatus clusterFs : clustersFS)
198     }
199     // Delete the processed dayPath
200     if (alldone == true) {
201       FileUtil.fullyDelete(fs, dayPath);
202     }
203     
204   }
205 
206   /**
207    * @param args
208    * @throws Exception
209    */
210   public static void main(String[] args) throws Exception {
211     
212     DaemonWatcher.createInstance("DailyChukwaRecordRolling");
213     
214     conf = new ChukwaConfiguration();
215     String fsName = conf.get("writer.hdfs.filesystem");
216     fs = FileSystem.get(new URI(fsName), conf);
217 
218     // TODO read from config
219     String rollingFolder = "/chukwa/rolling/";
220     String chukwaMainRepository = "/chukwa/repos/";
221     String tempDir = "/chukwa/temp/dailyRolling/";
222 
223     // TODO do a real parameter parsing
224     if (args.length != 4) {
225       usage();
226     }
227 
228     if (!args[0].equalsIgnoreCase("rollInSequence")) {
229       usage();
230     }
231 
232     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
233       usage();
234     }
235 
236     if (args[1].equalsIgnoreCase("true")) {
237       rollInSequence = true;
238     } else {
239       rollInSequence = false;
240     }
241 
242     if (args[3].equalsIgnoreCase("true")) {
243       deleteRawdata = true;
244     } else {
245       deleteRawdata = false;
246     }
247 
248     log.info("rollInSequence: " + rollInSequence);
249     log.info("deleteRawdata: " + deleteRawdata);
250 
251     Calendar calendar = Calendar.getInstance();
252     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
253     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
254     log.info("CurrentDay: " + currentDay);
255     log.info("currentHour" + currentHour);
256 
257     Path rootFolder = new Path(rollingFolder + "/daily/");
258 
259     FileStatus[] daysFS = fs.listStatus(rootFolder);
260     for (FileStatus dayFS : daysFS) {
261       try {
262         int workingDay = Integer.parseInt(dayFS.getPath().getName());
263         log.info("Daily working on :" + workingDay);
264         if (workingDay < currentDay) {
265           
266           try {
267             buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
268                 workingDay);
269           } catch(Throwable e) {
270             e.printStackTrace();
271             log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay  ) ;
272           }
273           
274         } // End if ( workingDay < currentDay)
275       } // End Try workingDay =
276         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
277       catch (NumberFormatException e) { /* Not a standard Day directory skip */
278         log.debug(ExceptionUtil.getStackTrace(e));
279       }
280 
281     } // for(FileStatus dayFS : daysFS)
282   }
283 
284   public int run(String[] args) throws Exception {
285     JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
286 
287     conf.setJobName("DailyChukwa-Rolling");
288     conf.setInputFormat(SequenceFileInputFormat.class);
289 
290     conf.setMapperClass(IdentityMapper.class);
291     conf.setReducerClass(IdentityReducer.class);
292 
293     conf.setOutputKeyClass(ChukwaRecordKey.class);
294     conf.setOutputValueClass(ChukwaRecord.class);
295     conf.setOutputFormat(SequenceFileOutputFormat.class);
296 
297     log.info("DailyChukwaRecordRolling input: " + args[0]);
298     log.info("DailyChukwaRecordRolling output: " + args[1]);
299 
300     FileInputFormat.setInputPaths(conf, args[0]);
301     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
302     conf.setJobPriority(JobPriority.LOW);
303     conf.setNumReduceTasks(1);
304     JobClient.runJob(conf);
305     return 0;
306   }
307 
308 }