This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Calendar;
27  import java.util.List;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.chukwa.util.HierarchyDataType;
32  import org.apache.hadoop.conf.Configured;
33  import org.apache.hadoop.fs.FileStatus;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.FileUtil;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.mapred.FileInputFormat;
38  import org.apache.hadoop.mapred.FileOutputFormat;
39  import org.apache.hadoop.mapred.JobClient;
40  import org.apache.hadoop.mapred.JobConf;
41  import org.apache.hadoop.mapred.JobPriority;
42  import org.apache.hadoop.mapred.SequenceFileInputFormat;
43  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
44  import org.apache.hadoop.mapred.lib.IdentityMapper;
45  import org.apache.hadoop.mapred.lib.IdentityReducer;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.log4j.Logger;
48  
49  
50  public class HourlyChukwaRecordRolling extends Configured implements Tool {
51    static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
52  
53    static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
54    static ChukwaConfiguration conf = null;
55    static FileSystem fs = null;
56    static final String HadoopLogDir = "_logs";
57    static final String hadoopTempDir = "_temporary";
58  
59    static boolean rollInSequence = true;
60    static boolean deleteRawdata = false;
61  
62    public static void usage() {
63      System.err
64          .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
65    }
66  
67    public static void buildHourlyFiles(String chukwaMainRepository,
68        String tempDir, String rollingFolder, int workingDay, int workingHour)
69        throws IOException {
70      
71      Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
72          + workingHour);
73      FileStatus[] clustersFS = fs.listStatus(hourPath);
74      for (FileStatus clusterFs : clustersFS) {
75        String cluster = clusterFs.getPath().getName();
76  
77        Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
78            + workingDay + "/" + workingHour + "/" + cluster);
79        FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
80        
81        for (FileStatus dataSourceFS : dataSourcesFS) {
82          
83          for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs,
84              dataSourceFS.getPath(), true)) {
85            String dataSource = HierarchyDataType.getDataType(
86                dataSourcePath.getPath(),
87                fs.getFileStatus(dataSourceClusterHourPaths).getPath());
88            
89  
90  
91  
92  
93  
94  
95  
96  
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145 
146 
147 
148 
149 
150 
151 
152 
153 
154 
155 
156 
157 
158   public static void main(String[] args) throws Exception {
159     
160     conf = new ChukwaConfiguration();
161     String fsName = conf.get("writer.hdfs.filesystem");
162     fs = FileSystem.get(new URI(fsName), conf);
163 
164     
165     String rollingFolder = "/chukwa/rolling/";
166     String chukwaMainRepository = "/chukwa/repos/";
167     String tempDir = "/chukwa/temp/hourlyRolling/";
168 
169     
170     if (args.length != 4) {
171       usage();
172       return;
173     }
174 
175     if (!args[0].equalsIgnoreCase("rollInSequence")) {
176       usage();
177       return;
178     }
179 
180     if (!args[2].equalsIgnoreCase("deleteRawdata")) {
181       usage();
182       return;
183     }
184 
185     if (args[1].equalsIgnoreCase("true")) {
186       rollInSequence = true;
187     } else {
188       rollInSequence = false;
189     }
190 
191     if (args[3].equalsIgnoreCase("true")) {
192       deleteRawdata = true;
193     } else {
194       deleteRawdata = false;
195     }
196 
197     Calendar calendar = Calendar.getInstance();
198     int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
199     int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
200     log.info("CurrentDay: " + currentDay);
201     log.info("currentHour" + currentHour);
202 
203     Path rootFolder = new Path(rollingFolder + "/hourly/");
204 
205     FileStatus[] daysFS = fs.listStatus(rootFolder);
206     for (FileStatus dayFS : daysFS) {
207       try {
208         log.info("dayFs:" + dayFS.getPath().getName());
209         int workingDay = Integer.parseInt(dayFS.getPath().getName());
210 
211         Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
212         FileStatus[] hoursFS = fs.listStatus(hourlySrc);
213         for (FileStatus hourFS : hoursFS) {
214           String workinhHourStr = hourFS.getPath().getName();
215           int workingHour = Integer.parseInt(workinhHourStr);
216           if ((workingDay < currentDay) || 
217               ((workingDay == currentDay) && (workingHour < currentHour)) 
218                                                                           
219                                                                           
220                                                                           
221                                                                           
222           ) {
223 
224             try {
225               buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
226                   workingDay, workingHour);
227             } catch(Throwable e) {
228               e.printStackTrace();
229               log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
230             }
231 
232           } 
233             
234         } 
235       } 
236         
237       catch (NumberFormatException e) { 
238         log.warn("Exception in hourlyRolling:", e);
239       }
240 
241     } 
242   }
243 
244   public int run(String[] args) throws Exception {
245     JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
246 
247     conf.setJobName("HourlyChukwa-Rolling");
248     conf.setInputFormat(SequenceFileInputFormat.class);
249 
250     conf.setMapperClass(IdentityMapper.class);
251     conf.setReducerClass(IdentityReducer.class);
252 
253     conf.setOutputKeyClass(ChukwaRecordKey.class);
254     conf.setOutputValueClass(ChukwaRecord.class);
255     conf.setOutputFormat(SequenceFileOutputFormat.class);
256 
257     log.info("HourlyChukwaRecordRolling input: " + args[0]);
258     log.info("HourlyChukwaRecordRolling output: " + args[1]);
259 
260     FileInputFormat.setInputPaths(conf, args[0]);
261     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
262     conf.setJobPriority(JobPriority.LOW);
263     conf.setNumReduceTasks(1);
264     
265     JobClient.runJob(conf);
266     return 0;
267   }
268 
269 }