This project has retired. For details please refer to its Attic page.
HourlyChukwaRecordRolling xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.HierarchyDataType;
32  import org.apache.hadoop.conf.Configured;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FileUtil;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.mapred.FileInputFormat;
38  import org.apache.hadoop.mapred.FileOutputFormat;
39  import org.apache.hadoop.mapred.JobClient;
40  import org.apache.hadoop.mapred.JobConf;
41  import org.apache.hadoop.mapred.JobPriority;
42  import org.apache.hadoop.mapred.SequenceFileInputFormat;
43  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
44  import org.apache.hadoop.mapred.lib.IdentityMapper;
45  import org.apache.hadoop.mapred.lib.IdentityReducer;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.log4j.Logger;
48  
49  // TODO do an abstract class for all rolling 
50  public class HourlyChukwaRecordRolling extends Configured implements Tool {
51    static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
52  
53    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
54    static ChukwaConfiguration conf = null;
55    static FileSystem fs = null;
56    static final String HadoopLogDir = "_logs";
57    static final String hadoopTempDir = "_temporary";
58  
59    static boolean rollInSequence = true;
60    static boolean deleteRawdata = false;
61  
62    public static void usage() {
63      System.err
64          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
65    }
66  
67    public static void buildHourlyFiles(String chukwaMainRepository,
68        String tempDir, String rollingFolder, int workingDay, int workingHour)
69        throws IOException {
70      // process
71      Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
72          + workingHour);
73      FileStatus[] clustersFS = fs.listStatus(hourPath);
74      for (FileStatus clusterFs : clustersFS) {
75        String cluster = clusterFs.getPath().getName();
76  
77        Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
78            + workingDay + "/" + workingHour + "/" + cluster);
79        FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
80        
81        for (FileStatus dataSourceFS : dataSourcesFS) {
82          //CHUKWA-648:  Make Chukwa Reduce Type to support hierarchy format  
83          for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
84              dataSourceFS.getPath(), true)) {
85            String dataSource = HierarchyDataType.getDataType(
86                dataSourcePath.getPath(),
87                fs.getFileStatus(dataSourceClusterHourPaths).getPath());
88            // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt
89  
90            // put the rotate flag
91            fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
92                + dataSource + "/" + workingDay + "/" + workingHour
93                + "/rotateDone"));
94  
95            // rotate
96            // Merge
97            String[] mergeArgs = new String[5];
98            // input
99            mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
100               + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
101           // temp dir
102           mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
103               + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
104           // final output dir
105           mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
106               + "/" + workingDay + "/" + workingHour;
107           // final output fileName
108           mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour;
109           // delete rolling directory
110           mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
111               + workingHour + "/" + cluster + "/" + dataSource;
112 
113           log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
114           log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
115           log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
116           log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
117           log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
118 
119           RecordMerger merge = new RecordMerger(conf, fs,
120               new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
121           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
122           if (rollInSequence) {
123             merge.mergeRecords();
124           } else {
125             allMerge.add(merge);
126             merge.start();
127           }
128 
129           // join all Threads
130           if (!rollInSequence) {
131             while (allMerge.size() > 0) {
132               RecordMerger m = allMerge.remove(0);
133               try {
134                 m.join();
135               } catch (InterruptedException e) {
136               }
137             }
138           } // End if (!rollInSequence)
139         }
140         // Delete the processed dataSourceFS
141         FileUtil.fullyDelete(fs, dataSourceFS.getPath());
142 
143       } // End for(FileStatus dataSourceFS : dataSourcesFS)
144 
145       // Delete the processed clusterFs
146       FileUtil.fullyDelete(fs, clusterFs.getPath());
147 
148     } // End for(FileStatus clusterFs : clustersFS)
149 
150     // Delete the processed hour
151     FileUtil.fullyDelete(fs, hourPath);
152   }
153 
154   /**
155    * @param args
156    * @throws Exception
157    */
158   public static void main(String[] args) throws Exception {
159     
160     conf = new ChukwaConfiguration();
161     String fsName = conf.get("writer.hdfs.filesystem");
162     fs = FileSystem.get(new URI(fsName), conf);
163 
164     // TODO read from config
165     String rollingFolder = "/chukwa/rolling/";
166     String chukwaMainRepository = "/chukwa/repos/";
167     String tempDir = "/chukwa/temp/hourlyRolling/";
168 
169     // TODO do a real parameter parsing
170     if (args.length != 4) {
171       usage();
172       return;
173     }
174 
175     if (!args[0].equalsIgnoreCase("rollInSequence")) {
176       usage();
177       return;
178     }
179 
180     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
181       usage();
182       return;
183     }
184 
185     if (args[1].equalsIgnoreCase("true")) {
186       rollInSequence = true;
187     } else {
188       rollInSequence = false;
189     }
190 
191     if (args[3].equalsIgnoreCase("true")) {
192       deleteRawdata = true;
193     } else {
194       deleteRawdata = false;
195     }
196 
197     Calendar calendar = Calendar.getInstance();
198     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
199     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
200     log.info("CurrentDay: " + currentDay);
201     log.info("currentHour" + currentHour);
202 
203     Path rootFolder = new Path(rollingFolder + "/hourly/");
204 
205     FileStatus[] daysFS = fs.listStatus(rootFolder);
206     for (FileStatus dayFS : daysFS) {
207       try {
208         log.info("dayFs:" + dayFS.getPath().getName());
209         int workingDay = Integer.parseInt(dayFS.getPath().getName());
210 
211         Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
212         FileStatus[] hoursFS = fs.listStatus(hourlySrc);
213         for (FileStatus hourFS : hoursFS) {
214           String workinhHourStr = hourFS.getPath().getName();
215           int workingHour = Integer.parseInt(workinhHourStr);
216           if ((workingDay < currentDay) || // all previous days
217               ((workingDay == currentDay) && (workingHour < currentHour)) // Up
218                                                                           // to
219                                                                           // the
220                                                                           // last
221                                                                           // hour
222           ) {
223 
224             try {
225               buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
226                   workingDay, workingHour);
227             } catch(Throwable e) {
228               e.printStackTrace();
229               log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
230             }
231 
232           } // End if ( (workingDay < currentDay) || ( (workingDay ==
233             // currentDay) && (intHour < currentHour) ) )
234         } // End for(FileStatus hourFS : hoursFS)
235       } // End Try workingDay =
236         // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
237       catch (NumberFormatException e) { /* Not a standard Day directory skip */
238         log.warn("Exception in hourlyRolling:", e);
239       }
240 
241     } // for(FileStatus dayFS : daysFS)
242   }
243 
244   public int run(String[] args) throws Exception {
245     JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
246 
247     conf.setJobName("HourlyChukwa-Rolling");
248     conf.setInputFormat(SequenceFileInputFormat.class);
249 
250     conf.setMapperClass(IdentityMapper.class);
251     conf.setReducerClass(IdentityReducer.class);
252 
253     conf.setOutputKeyClass(ChukwaRecordKey.class);
254     conf.setOutputValueClass(ChukwaRecord.class);
255     conf.setOutputFormat(SequenceFileOutputFormat.class);
256 
257     log.info("HourlyChukwaRecordRolling input: " + args[0]);
258     log.info("HourlyChukwaRecordRolling output: " + args[1]);
259 
260     FileInputFormat.setInputPaths(conf, args[0]);
261     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
262     conf.setJobPriority(JobPriority.LOW);
263     conf.setNumReduceTasks(1);
264     
265     JobClient.runJob(conf);
266     return 0;
267   }
268 
269 }