This project has retired. For details please refer to its Attic page.
HourlyChukwaRecordRolling xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.HierarchyDataType;
32  import org.apache.hadoop.chukwa.util.DaemonWatcher;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FileStatus;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.FileUtil;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.mapred.FileInputFormat;
39  import org.apache.hadoop.mapred.FileOutputFormat;
40  import org.apache.hadoop.mapred.JobClient;
41  import org.apache.hadoop.mapred.JobConf;
42  import org.apache.hadoop.mapred.JobPriority;
43  import org.apache.hadoop.mapred.SequenceFileInputFormat;
44  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45  import org.apache.hadoop.mapred.lib.IdentityMapper;
46  import org.apache.hadoop.mapred.lib.IdentityReducer;
47  import org.apache.hadoop.util.Tool;
48  import org.apache.log4j.Logger;
49  
50  // TODO do an abstract class for all rolling 
51  public class HourlyChukwaRecordRolling extends Configured implements Tool {
52    static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
53  
54    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
55    static ChukwaConfiguration conf = null;
56    static FileSystem fs = null;
57    static final String HadoopLogDir = "_logs";
58    static final String hadoopTempDir = "_temporary";
59  
60    static boolean rollInSequence = true;
61    static boolean deleteRawdata = false;
62  
63    public static void usage() {
64      System.err
65          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
66      System.exit(-1);
67    }
68  
69    public static void buildHourlyFiles(String chukwaMainRepository,
70        String tempDir, String rollingFolder, int workingDay, int workingHour)
71        throws IOException {
72      // process
73      Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
74          + workingHour);
75      FileStatus[] clustersFS = fs.listStatus(hourPath);
76      for (FileStatus clusterFs : clustersFS) {
77        String cluster = clusterFs.getPath().getName();
78  
79        Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
80            + workingDay + "/" + workingHour + "/" + cluster);
81        FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
82        
83        for (FileStatus dataSourceFS : dataSourcesFS) {
84          //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
85          for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
86              dataSourceFS.getPath(), true)) {
87            String dataSource = HierarchyDataType.getDataType(
88                dataSourcePath.getPath(),
89                fs.getFileStatus(dataSourceClusterHourPaths).getPath());
90            // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
91  
92            // put the rotate flag
93            fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
94                + dataSource + "/" + workingDay + "/" + workingHour
95                + "/rotateDone"));
96  
97            // rotate
98            // Merge
99            String[] mergeArgs = new String[5];
100           // input
101           mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
102               + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
103           // temp dir
104           mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
105               + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
106           // final output dir
107           mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
108               + "/" + workingDay + "/" + workingHour;
109           // final output fileName
110           mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
111           // delete rolling directory
112           mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
113               + workingHour + "/" + cluster + "/" + dataSource;
114 
115           log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
116           log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
117           log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
118           log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
119           log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
120 
121           RecordMerger merge = new RecordMerger(conf, fs,
122               new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
123           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
124           if (rollInSequence) {
125             merge.run();
126           } else {
127             allMerge.add(merge);
128             merge.start();
129           }
130 
131           // join all Threads
132           if (!rollInSequence) {
133             while (allMerge.size() > 0) {
134               RecordMerger m = allMerge.remove(0);
135               try {
136                 m.join();
137               } catch (InterruptedException e) {
138               }
139             }
140           } // End if (!rollInSequence)
141         }
142         // Delete the processed dataSourceFS
143         FileUtil.fullyDelete(fs, dataSourceFS.getPath());
144 
145       } // End for(FileStatus dataSourceFS : dataSourcesFS)
146 
147       // Delete the processed clusterFs
148       FileUtil.fullyDelete(fs, clusterFs.getPath());
149 
150     } // End for(FileStatus clusterFs : clustersFS)
151 
152     // Delete the processed hour
153     FileUtil.fullyDelete(fs, hourPath);
154   }
155 
156   /**
157    * @param args
158    * @throws Exception
159    */
160   public static void main(String[] args) throws Exception {
161     DaemonWatcher.createInstance("HourlyChukwaRecordRolling");
162     
163     conf = new ChukwaConfiguration();
164     String fsName = conf.get("writer.hdfs.filesystem");
165     fs = FileSystem.get(new URI(fsName), conf);
166 
167     // TODO read from config
168     String rollingFolder = "/chukwa/rolling/";
169     String chukwaMainRepository = "/chukwa/repos/";
170     String tempDir = "/chukwa/temp/hourlyRolling/";
171 
172     // TODO do a real parameter parsing
173     if (args.length != 4) {
174       usage();
175     }
176 
177     if (!args[0].equalsIgnoreCase("rollInSequence")) {
178       usage();
179     }
180 
181     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
182       usage();
183     }
184 
185     if (args[1].equalsIgnoreCase("true")) {
186       rollInSequence = true;
187     } else {
188       rollInSequence = false;
189     }
190 
191     if (args[3].equalsIgnoreCase("true")) {
192       deleteRawdata = true;
193     } else {
194       deleteRawdata = false;
195     }
196 
197     Calendar calendar = Calendar.getInstance();
198     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
199     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
200     log.info("CurrentDay: " + currentDay);
201     log.info("currentHour" + currentHour);
202 
203     Path rootFolder = new Path(rollingFolder + "/hourly/");
204 
205     FileStatus[] daysFS = fs.listStatus(rootFolder);
206     for (FileStatus dayFS : daysFS) {
207       try {
208         log.info("dayFs:" + dayFS.getPath().getName());
209         int workingDay = Integer.parseInt(dayFS.getPath().getName());
210 
211         Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
212         FileStatus[] hoursFS = fs.listStatus(hourlySrc);
213         for (FileStatus hourFS : hoursFS) {
214           String workinhHourStr = hourFS.getPath().getName();
215           int workingHour = Integer.parseInt(workinhHourStr);
216           if ((workingDay < currentDay) || // all previous days
217               ((workingDay == currentDay) && (workingHour < currentHour)) // Up
218                                                                           // to
219                                                                           // the
220                                                                           // last
221                                                                           // hour
222           ) {
223 
224             try {
225               buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
226                   workingDay, workingHour);
227             } catch(Throwable e) {
228               e.printStackTrace();
229               log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
230             }
231 
232           } // End if ( (workingDay < currentDay) || ( (workingDay ==
233             // currentDay) && (intHour < currentHour) ) )
234         } // End for(FileStatus hourFS : hoursFS)
235       } // End Try workingDay =
236         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
237       catch (NumberFormatException e) { /* Not a standard Day directory skip */
238         log.warn("Exception in hourlyRolling:", e);
239       }
240 
241     } // for(FileStatus dayFS : daysFS)
242   }
243 
244   public int run(String[] args) throws Exception {
245     JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
246 
247     conf.setJobName("HourlyChukwa-Rolling");
248     conf.setInputFormat(SequenceFileInputFormat.class);
249 
250     conf.setMapperClass(IdentityMapper.class);
251     conf.setReducerClass(IdentityReducer.class);
252 
253     conf.setOutputKeyClass(ChukwaRecordKey.class);
254     conf.setOutputValueClass(ChukwaRecord.class);
255     conf.setOutputFormat(SequenceFileOutputFormat.class);
256 
257     log.info("HourlyChukwaRecordRolling input: " + args[0]);
258     log.info("HourlyChukwaRecordRolling output: " + args[1]);
259 
260     FileInputFormat.setInputPaths(conf, args[0]);
261     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
262     conf.setJobPriority(JobPriority.LOW);
263     conf.setNumReduceTasks(1);
264     
265     JobClient.runJob(conf);
266     return 0;
267   }
268 
269 }