This project has retired. For details please refer to its Attic page.
TsProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19  
20  
21  import java.io.IOException;
22  import java.text.ParseException;
23  import java.text.SimpleDateFormat;
24  import java.util.Date;
25  import java.util.Map;
26  import java.util.HashMap;
27  import java.util.regex.Pattern;
28  import java.util.regex.Matcher;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.chukwa.extraction.demux.Demux;
34  import org.apache.hadoop.chukwa.util.RegexUtil;
35  import org.apache.hadoop.mapred.OutputCollector;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.log4j.Logger;
39  
40  /**
41   * TsProcessor is a generic processor that can be configured to find the timestamp
42   * in the text of a record. By default, this class expects that a record
43   * starts with a date in this format: <code>yyyy-MM-dd HH:mm:ss,SSS</code>
44   * <P>
45   * This format can be changed with the following configurations.
46   * <UL>
47   * <LI><code>TsProcessor.default.time.format</code> - Changes the default time
48   * format used by all data types.</LI>
49   * <LI><code>TsProcessor.time.format.[some_data_type]</code> - Overrides the default
50   * format for a specific data type.</LI>
51   * </UL>
52   * If the time string is not at the beginning of the record you can configure a
53   * regular expression to locate the timestamp text with either of the following
54   * configurations. The text found in group 1 of the regular expression match
55   * will be used with the configured date format.
56   * <UL>
57   * <LI><code>TsProcessor.default.time.regex</code> - Changes the default time
58   * location regex of the time text for all data types.</LI>
59   * <LI><code>TsProcessor.time.regex.[some_data_type]</code> - Overrides the
60   * default time location regex for a specific data type.</LI>
61   * </UL>
62   *
63   */
64  @Table(name="TsProcessor",columnFamily="log")
65  public class TsProcessor extends AbstractProcessor {
66    static Logger log = Logger.getLogger(TsProcessor.class);
67  
68    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
69    public static final String DEFAULT_TIME_REGEX = "TsProcessor.default.time.regex";
70    public static final String TIME_REGEX = "TsProcessor.time.regex.";
71  
72    private Map<String, Pattern> datePatternMap;
73    private Map<String, SimpleDateFormat> dateFormatMap;
74  
75    public TsProcessor() {
76      datePatternMap = new HashMap<String, Pattern>();
77      dateFormatMap = new HashMap<String, SimpleDateFormat>();
78    }
79  
80    @Override
81    protected void parse(String recordEntry,
82        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
83        throws Throwable {
84      String dStr = null;
85      try {
86        SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
87        Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
88  
89        // fetch the part of the record that contains the date.
90        if(datePattern != null) {
91          Matcher m = datePattern.matcher(recordEntry);
92          if (!m.matches() || m.groupCount() < 1) {
93            throw new ParseException("Regex " + datePattern +
94                    " couldn't extract date string from record: " + recordEntry, 0);
95          }
96          else {
97            dStr = m.group(1);
98          }
99        }
100       else {
101         dStr = recordEntry;
102       }
103 
104       Date d = sdf.parse(dStr);
105       ChukwaRecord record = new ChukwaRecord();
106       this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
107           .getDataType());
108       output.collect(key, record);
109     } catch (ParseException e) {
110       log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
111           + "], date string='" + dStr + "'", e);
112       e.printStackTrace();
113       throw e;
114     } catch (IOException e) {
115       log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
116           + "]", e);
117       e.printStackTrace();
118       throw e;
119     }
120 
121   }
122   
123   /**
124    * For a given dataType, returns the SimpeDateFormat to use.
125    * @param dataType
126    * @return
127    */
128   private SimpleDateFormat fetchDateFormat(String dataType) {
129     if (dateFormatMap.get(dataType) != null) {
130       return dateFormatMap.get(dataType);
131     }
132 
133     Configuration jobConf = Demux.jobConf;
134     String dateFormat = DEFAULT_DATE_FORMAT;
135 
136     if (jobConf != null) {
137       dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
138       dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
139                                dateFormat);
140     }
141 
142     log.info("dataType: " + chunk.getDataType() + ", dateFormat="+ dateFormat);
143     SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
144     dateFormatMap.put(dataType, sdf);
145 
146     return sdf;
147   }
148 
149   /**
150    * For a given dataType, returns a Pattern that will produce the date portion
151    * of the string.
152    * @param dataType
153    * @return
154    */
155   private Pattern fetchDateLocationPattern(String dataType) {
156     if (datePatternMap.containsKey(dataType)) {
157       return datePatternMap.get(dataType);
158     }
159 
160     Configuration jobConf = Demux.jobConf;
161     String datePattern = null;
162     Pattern pattern = null;
163 
164     if (jobConf != null) {
165       String timeRegexProperty = TIME_REGEX + chunk.getDataType();
166       datePattern = jobConf.get(DEFAULT_TIME_REGEX, null);
167       datePattern = jobConf.get(timeRegexProperty, datePattern);
168       if (datePattern != null) {
169         if (!RegexUtil.isRegex(datePattern, 1)) {
170           log.warn("Error parsing '" + DEFAULT_TIME_REGEX + "' or '"
171               + timeRegexProperty + "' properties as a regex: "
172               + RegexUtil.regexError(datePattern, 1)
173               + ". This date pattern will be skipped.");
174           return null;
175         }
176         pattern = Pattern.compile(datePattern);
177       }
178     }
179 
180     datePatternMap.put(dataType, pattern);
181 
182     return pattern;
183   }
184 
185 }