This project has retired. For details please refer to its Attic page.
ChukwaRecordDataSource xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
19  
20  
21  import java.io.IOException;
22  import java.text.SimpleDateFormat;
23  import java.util.ArrayList;
24  import java.util.Calendar;
25  import java.util.Date;
26  import java.util.Iterator;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.TreeMap;
32  
33  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
34  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
35  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
36  import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
37  import org.apache.hadoop.chukwa.extraction.engine.Record;
38  import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
39  import org.apache.hadoop.chukwa.extraction.engine.Token;
40  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
41  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
42  import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.io.SequenceFile;
46  import org.apache.log4j.Logger;
47  
48  public class ChukwaRecordDataSource implements DataSource {
49    // TODO need some cleanup after 1st production
50    // First implementation to get it working with the new directory structure
51  
52    static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
53  
54    private static final int dayFolder = 100;
55    private static final int hourFolder = 200;
56    private static final int rawFolder = 300;
57  
58    static final String[] raws = { "0", "5", "10", "15", "20", "25", "30", "35",
59        "40", "45", "50", "55" };
60  
61    private static FileSystem fs = null;
62    private static ChukwaConfiguration conf = null;
63  
64    private static String rootDsFolder = null;
65    private static DataConfig dataConfig = null;
66  
67    static {
68      dataConfig = new DataConfig();
69      rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
70      conf = new ChukwaConfiguration();
71      try {
72        fs = FileSystem.get(conf);
73      } catch (IOException e) {
74        e.printStackTrace();
75      }
76    }
77  
78    @Override
79    public boolean isThreadSafe() {
80      return true;
81    }
82  
83    @Override
84    public SearchResult search(SearchResult result, String cluster,
85        String dataSource, long t0, long t1, String filter, Token token)
86        throws DataSourceException {
87      String filePath = rootDsFolder + "/" + cluster + "/";
88  
89      log.debug("filePath [" + filePath + "]");
90      Calendar calendar = Calendar.getInstance();
91      calendar.setTimeInMillis(t0);
92      SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
93      int maxCount = 200;
94  
95      List<Record> records = new ArrayList<Record>();
96  
97      ChukwaDSInternalResult res = new ChukwaDSInternalResult();
98  
99      if (token != null) {
100       // token.key = day + "|" + hour + "|" + raw + "|" + spill + "|" +
101       // res.currentTs + "|"+ res.position + "|"+ res.fileName;
102       try {
103         String[] vars = token.key.split("\\|");
104         res.day = vars[0];
105         res.hour = Integer.parseInt(vars[1]);
106         res.rawIndex = Integer.parseInt(vars[2]);
107         res.spill = Integer.parseInt(vars[3]);
108         res.currentTs = Long.parseLong(vars[4]);
109         res.position = Long.parseLong(vars[5]);
110         res.fileName = vars[5];
111         log.info("Token is not null! :" + token.key);
112       } catch (Exception e) {
113         log.error("Incalid Key: [" + token.key + "] exception: ", e);
114       }
115     } else {
116       log.debug("Token is  null!");
117     }
118 
119     try {
120       do {
121         log.debug("start Date [" + calendar.getTime() + "]");
122         String workingDay = sdf.format(calendar.getTime());
123         int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
124         int startRawIndex = 0;
125         if (token != null) {
126           workingDay = res.day;
127           workingHour = res.hour;
128           startRawIndex = res.rawIndex;
129         } else {
130           token = new Token();
131         }
132 
133         log.debug("workingDay " + workingDay);
134         log.debug("workingHour " + workingHour);
135 
136         if (exist(dayFolder, filePath, dataSource, workingDay, null, null)) {
137           // Extract Data for Day
138           if (containsRotateFlag(dayFolder, filePath, dataSource, workingDay,
139               null)) {
140             // read data from day
141             // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
142             log.debug("fs.exists(workingDayRotatePath) ");
143             extractRecords(res, ChukwaRecordDataSource.dayFolder, filePath,
144                 dataSource, workingDay, null, -1, token, records, maxCount, t0,
145                 t1, filter);
146             maxCount = maxCount - records.size();
147             if ((maxCount <= 0) || (res.currentTs > t1)) {
148               break;
149             }
150 
151           } // End process Day File
152           else // check for hours
153           {
154             log.debug("check for hours");
155             for (int hour = 0; hour < 24; hour++) {
156               if (workingDay.equals(res.day) && hour < workingHour) {
157                 continue;
158               }
159               log.debug(" Hour?  -->" + filePath + dataSource + "/"
160                   + workingDay + "/" + hour);
161               if (exist(dayFolder, filePath, dataSource, workingDay, "" + hour,
162                   null)) {
163                 if (containsRotateFlag(dayFolder, filePath, dataSource,
164                     workingDay, "" + hour)) {
165                   // read data from Hour
166                   // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
167                   extractRecords(res, ChukwaRecordDataSource.hourFolder,
168                       filePath, dataSource, workingDay, "" + hour, -1, token,
169                       records, maxCount, t0, t1, filter);
170                 } else // check for raw
171                 {
172                   log.debug("Working on Raw");
173 
174                   for (int rawIndex = startRawIndex; rawIndex < 12; rawIndex++) {
175                     // read data from Raw
176                     //SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.
177                     // evt
178                     if (exist(dayFolder, filePath, dataSource, workingDay, ""
179                         + hour, raws[rawIndex])) {
180                       extractRecords(res, ChukwaRecordDataSource.rawFolder,
181                           filePath, dataSource, workingDay, "" + hour,
182                           rawIndex, token, records, maxCount, t0, t1, filter);
183                       maxCount = maxCount - records.size();
184                       if ((maxCount <= 0) || (res.currentTs > t1)) {
185                         break;
186                       }
187                     } else {
188                       log.debug("<<<<<<<<<Working on Raw Not exist--> "
189                           + filePath + dataSource + "/" + workingDay + "/"
190                           + workingHour + "/" + raws[rawIndex]);
191                     }
192                     res.spill = 1;
193                   }
194                 }
195               } // End if (fs.exists(new Path(filePath + workingDay+ "/" +
196                 // hour)))
197 
198               maxCount = maxCount - records.size();
199               if ((maxCount <= 0) || (res.currentTs > t1)) {
200                 break;
201               }
202 
203             } // End process all Hourly/raw files
204           }
205         }
206 
207         maxCount = maxCount - records.size();
208         if ((maxCount <= 0) || (res.currentTs > t1)) {
209           break;
210         }
211 
212         // move to the next day
213         calendar.add(Calendar.DAY_OF_MONTH, +1);
214         calendar.set(Calendar.HOUR_OF_DAY, 0);
215         calendar.set(Calendar.MINUTE, 0);
216         calendar.set(Calendar.SECOND, 0);
217 
218       } while (calendar.getTimeInMillis() < t1);
219 
220     } catch (Exception e) {
221       e.printStackTrace();
222       throw new DataSourceException(e);
223     }
224 
225     TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
226     for (Record record : records) {
227       long timestamp = record.getTime();
228       if (recordsInResult.containsKey(timestamp)) {
229         recordsInResult.get(timestamp).add(record);
230       } else {
231         List<Record> list = new LinkedList<Record>();
232         list.add(record);
233         recordsInResult.put(timestamp, list);
234       }
235     }
236     result.setToken(token);
237     return result;
238 
239   }
240 
241   public void extractRecords(ChukwaDSInternalResult res, int directoryType,
242       String rootFolder, String dataSource, String day, String hour,
243       int rawIndex, Token token, List<Record> records, int maxRows, long t0,
244       long t1, String filter) throws Exception {
245     // for each spill file
246     // extract records
247     int spill = res.spill;
248 
249     boolean workdone = false;
250     do {
251       String fileName = buildFileName(directoryType, rootFolder, dataSource,
252           spill, day, hour, rawIndex);
253       log.debug("extractRecords : " + fileName);
254 
255       if (fs.exists(new Path(fileName))) {
256         readData(res, token, fileName, maxRows, t0, t1, filter);
257         res.spill = spill;
258         List<Record> localRecords = res.records;
259         log.debug("localRecords size : " + localRecords.size());
260         maxRows = maxRows - localRecords.size();
261         if (maxRows <= 0) {
262           workdone = true;
263         }
264         records.addAll(localRecords);
265         log.debug("AFTER fileName  [" + fileName + "] count="
266             + localRecords.size() + " maxCount=" + maxRows);
267         spill++;
268       } else {
269         // no more spill
270         workdone = true;
271       }
272     } while (!workdone);
273     token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|"
274         + res.currentTs + "|" + res.position + "|" + res.fileName;
275   }
276 
277   public void readData(ChukwaDSInternalResult res, Token token,
278       String fileName, int maxRows, long t0, long t1, String filter)
279       throws Exception {
280     List<Record> records = new LinkedList<Record>();
281     res.records = records;
282     SequenceFile.Reader r = null;
283     if (filter != null) {
284       filter = filter.toLowerCase();
285     }
286 
287     try {
288 
289       if (!fs.exists(new Path(fileName))) {
290         log.debug("fileName not there!");
291         return;
292       }
293       log.debug("Parser Open [" + fileName + "]");
294 
295       long timestamp = 0;
296       int listSize = 0;
297       ChukwaRecordKey key = new ChukwaRecordKey();
298       ChukwaRecord record = new ChukwaRecord();
299 
300       r = new SequenceFile.Reader(fs, new Path(fileName), conf);
301 
302       log.debug("readData Open2 [" + fileName + "]");
303       if ((fileName.equals(res.fileName)) && (res.position != -1)) {
304         r.seek(res.position);
305       }
306       res.fileName = fileName;
307 
308       while (r.next(key, record)) {
309         if (record != null) {
310           res.position = r.getPosition();
311 
312           timestamp = record.getTime();
313           res.currentTs = timestamp;
314           log.debug("\nSearch for startDate: " + new Date(t0) + " is :"
315               + new Date(timestamp));
316 
317           if (timestamp < t0) {
318             // log.debug("Line not in range. Skipping: " +record);
319             continue;
320           } else if (timestamp < t1) {
321             log.debug("In Range: " + record.toString());
322             boolean valid = false;
323 
324             if ((filter == null || filter.equals(""))) {
325               valid = true;
326             } else if (isValid(record, filter)) {
327               valid = true;
328             }
329 
330             if (valid) {
331               records.add(record);
332               record = new ChukwaRecord();
333               listSize = records.size();
334               if (listSize >= maxRows) {
335                 // maxRow so stop here
336                 // Update token
337                 token.key = key.getKey();
338                 token.hasMore = true;
339                 break;
340               }
341             } else {
342               log.debug("In Range ==================>>>>>>>>> OUT Regex: "
343                   + record);
344             }
345           } else {
346             log.debug("Line out of range. Stopping now: " + record);
347             // Update Token
348             token.key = key.getKey();
349             token.hasMore = false;
350             break;
351           }
352         }
353       }
354 
355     } catch (IOException e) {
356       e.printStackTrace();
357     } finally {
358       try {
359         r.close();
360       } catch (Exception e) {
361       }
362     }
363   }
364 
365   public boolean containsRotateFlag(int directoryType, String rootFolder,
366       String dataSource, String workingDay, String workingHour)
367       throws Exception {
368     boolean contains = false;
369     switch (directoryType) {
370     case ChukwaRecordDataSource.dayFolder:
371       // SystemMetrics/20080922/rotateDone
372       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
373           + "/rotateDone"));
374       break;
375 
376     case ChukwaRecordDataSource.hourFolder:
377       // SystemMetrics/20080922/12/rotateDone
378       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
379           + "/" + workingHour + "/rotateDone"));
380       break;
381     default:
382       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
383           + "/rotateDone"));
384       break;
385     }
386     return contains;
387   }
388 
389   public boolean exist(int directoryType, String rootFolder, String dataSource,
390       String workingDay, String workingHour, String raw) throws Exception {
391     boolean contains = false;
392     switch (directoryType) {
393     case ChukwaRecordDataSource.dayFolder:
394       // SystemMetrics/20080922/rotateDone
395       contains = fs
396           .exists(new Path(rootFolder + dataSource + "/" + workingDay));
397       break;
398 
399     case ChukwaRecordDataSource.hourFolder:
400       // SystemMetrics/20080922/12/rotateDone
401       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
402           + "/" + workingHour));
403       break;
404     case ChukwaRecordDataSource.rawFolder:
405       // SystemMetrics/20080922/12/rotateDone
406       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
407           + "/" + workingHour + "/" + raw));
408       break;
409     default:
410       contains = fs
411           .exists(new Path(rootFolder + dataSource + "/" + workingDay));
412       break;
413     }
414     return contains;
415   }
416 
417   protected boolean isValid(ChukwaRecord record, String filter) {
418     String[] fields = record.getFields();
419     for (String field : fields) {
420       if (record.getValue(field).toLowerCase().indexOf(filter) >= 0) {
421         return true;
422       }
423     }
424     return false;
425   }
426 
427   public String buildFileName(int directoryType, String rootFolder,
428       String dataSource, int spill, String day, String hour, int rawIndex) {
429     String fileName = null;
430     // TODO use StringBuilder
431     // TODO revisit the way we're building fileName
432 
433     switch (directoryType) {
434     case ChukwaRecordDataSource.dayFolder:
435       // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
436       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
437           + "_" + day + "." + spill + ".evt";
438       break;
439 
440     case ChukwaRecordDataSource.hourFolder:
441       // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
442       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
443           + dataSource + "_" + day + "_" + hour + "." + spill + ".evt";
444       break;
445 
446     case ChukwaRecordDataSource.rawFolder:
447       // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt
448       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
449           + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
450           + raws[rawIndex] + "." + spill + ".evt";
451       break;
452     default:
453       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
454           + "_" + day + "." + spill + ".evt";
455       break;
456     }
457     log.debug("buildFileName :" + fileName);
458     return fileName;
459   }
460 
461   public static void main(String[] args) throws DataSourceException {
462     ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
463     SearchResult result = new ChukwaSearchResult();
464     result.setRecords(new TreeMap<Long, List<Record>>());
465     String cluster = args[0];
466     String dataSource = args[1];
467     long t0 = Long.parseLong(args[2]);
468     long t1 = Long.parseLong(args[3]);
469     String filter = null;
470     Token token = null;
471 
472     if (args.length >= 5 && !args[4].equalsIgnoreCase("null")) {
473       filter = args[4];
474     }
475     if (args.length == 6) {
476       token = new Token();
477       token.key = args[5];
478       System.out.println("token :" + token.key);
479     }
480 
481     System.out.println("cluster :" + cluster);
482     System.out.println("dataSource :" + dataSource);
483     System.out.println("t0 :" + t0);
484     System.out.println("t1 :" + t1);
485     System.out.println("filter :" + filter);
486 
487     ds.search(result, cluster, dataSource, t0, t1, filter, token);
488     TreeMap<Long, List<Record>> records = result.getRecords();
489     for(Entry<Long, List<Record>> entry : records.entrySet()) {      
490       long ts = entry.getKey();
491       System.out.println("\n\nTimestamp: " + new Date(ts));
492       List<Record> list = entry.getValue();
493       for (int i = 0; i < list.size(); i++) {
494         System.out.println(list.get(i));
495       }
496     }
497 
498     if (result.getToken() != null) {
499       System.out.println("Key -->" + result.getToken().key);
500     }
501   }
502 }