This project has retired. For details please refer to its Attic page.
ChukwaRecordDataSource xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
19  
20  
21  import java.io.IOException;
22  import java.text.SimpleDateFormat;
23  import java.util.ArrayList;
24  import java.util.Calendar;
25  import java.util.Date;
26  import java.util.Iterator;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.TreeMap;
30  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
34  import org.apache.hadoop.chukwa.extraction.engine.Record;
35  import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
36  import org.apache.hadoop.chukwa.extraction.engine.Token;
37  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
38  import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
39  import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.io.SequenceFile;
43  import org.apache.log4j.Logger;
44  
45  public class ChukwaRecordDataSource implements DataSource {
46    // TODO need some cleanup after 1st production
47    // First implementation to get it working with the new directory structure
48  
49    static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
50  
51    private static final int dayFolder = 100;
52    private static final int hourFolder = 200;
53    private static final int rawFolder = 300;
54  
55    static final String[] raws = { "0", "5", "10", "15", "20", "25", "30", "35",
56        "40", "45", "50", "55" };
57  
58    private static FileSystem fs = null;
59    private static ChukwaConfiguration conf = null;
60  
61    private static String rootDsFolder = null;
62    private static DataConfig dataConfig = null;
63  
64    static {
65      dataConfig = new DataConfig();
66      rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
67      conf = new ChukwaConfiguration();
68      try {
69        fs = FileSystem.get(conf);
70      } catch (IOException e) {
71        e.printStackTrace();
72      }
73    }
74  
75    @Override
76    public boolean isThreadSafe() {
77      return true;
78    }
79  
80    @Override
81    public SearchResult search(SearchResult result, String cluster,
82        String dataSource, long t0, long t1, String filter, Token token)
83        throws DataSourceException {
84      String filePath = rootDsFolder + "/" + cluster + "/";
85  
86      log.debug("filePath [" + filePath + "]");
87      Calendar calendar = Calendar.getInstance();
88      calendar.setTimeInMillis(t0);
89      SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
90      int maxCount = 200;
91  
92      List<Record> records = new ArrayList<Record>();
93  
94      ChukwaDSInternalResult res = new ChukwaDSInternalResult();
95  
96      if (token != null) {
97        // token.key = day + "|" + hour + "|" + raw + "|" + spill + "|" +
98        // res.currentTs + "|"+ res.position + "|"+ res.fileName;
99        try {
100         String[] vars = token.key.split("\\|");
101         res.day = vars[0];
102         res.hour = Integer.parseInt(vars[1]);
103         res.rawIndex = Integer.parseInt(vars[2]);
104         res.spill = Integer.parseInt(vars[3]);
105         res.currentTs = Long.parseLong(vars[4]);
106         res.position = Long.parseLong(vars[5]);
107         res.fileName = vars[5];
108         log.info("Token is not null! :" + token.key);
109       } catch (Exception e) {
110         log.error("Incalid Key: [" + token.key + "] exception: ", e);
111       }
112     } else {
113       log.debug("Token is  null!");
114     }
115 
116     try {
117       do {
118         log.debug("start Date [" + calendar.getTime() + "]");
119         String workingDay = sdf.format(calendar.getTime());
120         int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
121         int startRawIndex = 0;
122         if (token != null) {
123           workingDay = res.day;
124           workingHour = res.hour;
125           startRawIndex = res.rawIndex;
126         } else {
127           token = new Token();
128         }
129 
130         log.debug("workingDay " + workingDay);
131         log.debug("workingHour " + workingHour);
132 
133         if (exist(dayFolder, filePath, dataSource, workingDay, null, null)) {
134           // Extract Data for Day
135           if (containsRotateFlag(dayFolder, filePath, dataSource, workingDay,
136               null)) {
137             // read data from day
138             // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
139             log.debug("fs.exists(workingDayRotatePath) ");
140             extractRecords(res, ChukwaRecordDataSource.dayFolder, filePath,
141                 dataSource, workingDay, null, -1, token, records, maxCount, t0,
142                 t1, filter);
143             maxCount = maxCount - records.size();
144             if ((maxCount <= 0) || (res.currentTs > t1)) {
145               break;
146             }
147 
148           } // End process Day File
149           else // check for hours
150           {
151             log.debug("check for hours");
152             for (int hour = 0; hour < 24; hour++) {
153               if (workingDay == res.day && hour < workingHour) {
154                 continue;
155               }
156               log.debug(" Hour?  -->" + filePath + dataSource + "/"
157                   + workingDay + "/" + hour);
158               if (exist(dayFolder, filePath, dataSource, workingDay, "" + hour,
159                   null)) {
160                 if (containsRotateFlag(dayFolder, filePath, dataSource,
161                     workingDay, "" + hour)) {
162                   // read data from Hour
163                   // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
164                   extractRecords(res, ChukwaRecordDataSource.hourFolder,
165                       filePath, dataSource, workingDay, "" + hour, -1, token,
166                       records, maxCount, t0, t1, filter);
167                 } else // check for raw
168                 {
169                   log.debug("Working on Raw");
170 
171                   for (int rawIndex = startRawIndex; rawIndex < 12; rawIndex++) {
172                     // read data from Raw
173                     //SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.
174                     // evt
175                     if (exist(dayFolder, filePath, dataSource, workingDay, ""
176                         + hour, raws[rawIndex])) {
177                       extractRecords(res, ChukwaRecordDataSource.rawFolder,
178                           filePath, dataSource, workingDay, "" + hour,
179                           rawIndex, token, records, maxCount, t0, t1, filter);
180                       maxCount = maxCount - records.size();
181                       if ((maxCount <= 0) || (res.currentTs > t1)) {
182                         break;
183                       }
184                     } else {
185                       log.debug("<<<<<<<<<Working on Raw Not exist--> "
186                           + filePath + dataSource + "/" + workingDay + "/"
187                           + workingHour + "/" + raws[rawIndex]);
188                     }
189                     res.spill = 1;
190                   }
191                 }
192               } // End if (fs.exists(new Path(filePath + workingDay+ "/" +
193                 // hour)))
194 
195               maxCount = maxCount - records.size();
196               if ((maxCount <= 0) || (res.currentTs > t1)) {
197                 break;
198               }
199 
200             } // End process all Hourly/raw files
201           }
202         }
203 
204         maxCount = maxCount - records.size();
205         if ((maxCount <= 0) || (res.currentTs > t1)) {
206           break;
207         }
208 
209         // move to the next day
210         calendar.add(Calendar.DAY_OF_MONTH, +1);
211         calendar.set(Calendar.HOUR_OF_DAY, 0);
212         calendar.set(Calendar.MINUTE, 0);
213         calendar.set(Calendar.SECOND, 0);
214 
215       } while (calendar.getTimeInMillis() < t1);
216 
217     } catch (Exception e) {
218       e.printStackTrace();
219       throw new DataSourceException(e);
220     }
221 
222     TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
223     for (Record record : records) {
224       long timestamp = record.getTime();
225       if (recordsInResult.containsKey(timestamp)) {
226         recordsInResult.get(timestamp).add(record);
227       } else {
228         List<Record> list = new LinkedList<Record>();
229         list.add(record);
230         recordsInResult.put(timestamp, list);
231       }
232     }
233     result.setToken(token);
234     return result;
235 
236   }
237 
238   public void extractRecords(ChukwaDSInternalResult res, int directoryType,
239       String rootFolder, String dataSource, String day, String hour,
240       int rawIndex, Token token, List<Record> records, int maxRows, long t0,
241       long t1, String filter) throws Exception {
242     // for each spill file
243     // extract records
244     int spill = res.spill;
245 
246     boolean workdone = false;
247     do {
248       String fileName = buildFileName(directoryType, rootFolder, dataSource,
249           spill, day, hour, rawIndex);
250       log.debug("extractRecords : " + fileName);
251 
252       if (fs.exists(new Path(fileName))) {
253         readData(res, token, fileName, maxRows, t0, t1, filter);
254         res.spill = spill;
255         List<Record> localRecords = res.records;
256         log.debug("localRecords size : " + localRecords.size());
257         maxRows = maxRows - localRecords.size();
258         if (maxRows <= 0) {
259           workdone = true;
260         }
261         records.addAll(localRecords);
262         log.debug("AFTER fileName  [" + fileName + "] count="
263             + localRecords.size() + " maxCount=" + maxRows);
264         spill++;
265       } else {
266         // no more spill
267         workdone = true;
268       }
269     } while (!workdone);
270     token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|"
271         + res.currentTs + "|" + res.position + "|" + res.fileName;
272   }
273 
274   public void readData(ChukwaDSInternalResult res, Token token,
275       String fileName, int maxRows, long t0, long t1, String filter)
276       throws Exception {
277     List<Record> records = new LinkedList<Record>();
278     res.records = records;
279     SequenceFile.Reader r = null;
280     if (filter != null) {
281       filter = filter.toLowerCase();
282     }
283 
284     try {
285 
286       if (!fs.exists(new Path(fileName))) {
287         log.debug("fileName not there!");
288         return;
289       }
290       log.debug("Parser Open [" + fileName + "]");
291 
292       long timestamp = 0;
293       int listSize = 0;
294       ChukwaRecordKey key = new ChukwaRecordKey();
295       ChukwaRecord record = new ChukwaRecord();
296 
297       r = new SequenceFile.Reader(fs, new Path(fileName), conf);
298 
299       log.debug("readData Open2 [" + fileName + "]");
300       if ((fileName.equals(res.fileName)) && (res.position != -1)) {
301         r.seek(res.position);
302       }
303       res.fileName = fileName;
304 
305       while (r.next(key, record)) {
306         if (record != null) {
307           res.position = r.getPosition();
308 
309           timestamp = record.getTime();
310           res.currentTs = timestamp;
311           log.debug("\nSearch for startDate: " + new Date(t0) + " is :"
312               + new Date(timestamp));
313 
314           if (timestamp < t0) {
315             // log.debug("Line not in range. Skipping: " +record);
316             continue;
317           } else if (timestamp < t1) {
318             log.debug("In Range: " + record.toString());
319             boolean valid = false;
320 
321             if ((filter == null || filter.equals(""))) {
322               valid = true;
323             } else if (isValid(record, filter)) {
324               valid = true;
325             }
326 
327             if (valid) {
328               records.add(record);
329               record = new ChukwaRecord();
330               listSize = records.size();
331               if (listSize >= maxRows) {
332                 // maxRow so stop here
333                 // Update token
334                 token.key = key.getKey();
335                 token.hasMore = true;
336                 break;
337               }
338             } else {
339               log.debug("In Range ==================>>>>>>>>> OUT Regex: "
340                   + record);
341             }
342           } else {
343             log.debug("Line out of range. Stopping now: " + record);
344             // Update Token
345             token.key = key.getKey();
346             token.hasMore = false;
347             break;
348           }
349         }
350       }
351 
352     } catch (Exception e) {
353       e.printStackTrace();
354     } finally {
355       try {
356         r.close();
357       } catch (Exception e) {
358       }
359     }
360   }
361 
362   public boolean containsRotateFlag(int directoryType, String rootFolder,
363       String dataSource, String workingDay, String workingHour)
364       throws Exception {
365     boolean contains = false;
366     switch (directoryType) {
367     case ChukwaRecordDataSource.dayFolder:
368       // SystemMetrics/20080922/rotateDone
369       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
370           + "/rotateDone"));
371       break;
372 
373     case ChukwaRecordDataSource.hourFolder:
374       // SystemMetrics/20080922/12/rotateDone
375       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
376           + "/" + workingHour + "/rotateDone"));
377       break;
378 
379     }
380     return contains;
381   }
382 
383   public boolean exist(int directoryType, String rootFolder, String dataSource,
384       String workingDay, String workingHour, String raw) throws Exception {
385     boolean contains = false;
386     switch (directoryType) {
387     case ChukwaRecordDataSource.dayFolder:
388       // SystemMetrics/20080922/rotateDone
389       contains = fs
390           .exists(new Path(rootFolder + dataSource + "/" + workingDay));
391       break;
392 
393     case ChukwaRecordDataSource.hourFolder:
394       // SystemMetrics/20080922/12/rotateDone
395       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
396           + "/" + workingHour));
397       break;
398     case ChukwaRecordDataSource.rawFolder:
399       // SystemMetrics/20080922/12/rotateDone
400       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
401           + "/" + workingHour + "/" + raw));
402       break;
403 
404     }
405     return contains;
406   }
407 
408   protected boolean isValid(ChukwaRecord record, String filter) {
409     String[] fields = record.getFields();
410     for (String field : fields) {
411       if (record.getValue(field).toLowerCase().indexOf(filter) >= 0) {
412         return true;
413       }
414     }
415     return false;
416   }
417 
418   public String buildFileName(int directoryType, String rootFolder,
419       String dataSource, int spill, String day, String hour, int rawIndex) {
420     String fileName = null;
421     // TODO use StringBuilder
422     // TODO revisit the way we're building fileName
423 
424     switch (directoryType) {
425     case ChukwaRecordDataSource.dayFolder:
426       // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
427       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
428           + "_" + day + "." + spill + ".evt";
429       break;
430 
431     case ChukwaRecordDataSource.hourFolder:
432       // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
433       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
434           + dataSource + "_" + day + "_" + hour + "." + spill + ".evt";
435       break;
436 
437     case ChukwaRecordDataSource.rawFolder:
438       // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt
439       fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
440           + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
441           + raws[rawIndex] + "." + spill + ".evt";
442       break;
443     }
444     log.debug("buildFileName :" + fileName);
445     return fileName;
446   }
447 
448   public static void main(String[] args) throws DataSourceException {
449     ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
450     SearchResult result = new ChukwaSearchResult();
451     result.setRecords(new TreeMap<Long, List<Record>>());
452     String cluster = args[0];
453     String dataSource = args[1];
454     long t0 = Long.parseLong(args[2]);
455     long t1 = Long.parseLong(args[3]);
456     String filter = null;
457     Token token = null;
458 
459     if (args.length >= 5 && !args[4].equalsIgnoreCase("null")) {
460       filter = args[4];
461     }
462     if (args.length == 6) {
463       token = new Token();
464       token.key = args[5];
465       System.out.println("token :" + token.key);
466     }
467 
468     System.out.println("cluster :" + cluster);
469     System.out.println("dataSource :" + dataSource);
470     System.out.println("t0 :" + t0);
471     System.out.println("t1 :" + t1);
472     System.out.println("filter :" + filter);
473 
474     ds.search(result, cluster, dataSource, t0, t1, filter, token);
475     TreeMap<Long, List<Record>> records = result.getRecords();
476     Iterator<Long> it = records.keySet().iterator();
477 
478     while (it.hasNext()) {
479       long ts = it.next();
480       System.out.println("\n\nTimestamp: " + new Date(ts));
481       List<Record> list = records.get(ts);
482       for (int i = 0; i < list.size(); i++) {
483         System.out.println(list.get(i));
484       }
485     }
486 
487     if (result.getToken() != null) {
488       System.out.println("Key -->" + result.getToken().key);
489     }
490   }
491 }