1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
192021import java.io.IOException;
22import java.text.ParseException;
23import java.text.SimpleDateFormat;
24import java.util.Date;
25import java.util.Map;
26import java.util.HashMap;
27import java.util.regex.Pattern;
28import java.util.regex.Matcher;
2930import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33import org.apache.hadoop.chukwa.extraction.demux.Demux;
34import org.apache.hadoop.chukwa.util.RegexUtil;
35import org.apache.hadoop.mapred.OutputCollector;
36import org.apache.hadoop.mapred.Reporter;
37import org.apache.hadoop.conf.Configuration;
38import org.apache.log4j.Logger;
3940/**41 * TsProcessor is a generic processor that can be configured to find the timestamp42 * in the text of a record. By default, this class expects that a record43 * starts with a date in this format: <code>yyyy-MM-dd HH:mm:ss,SSS</code>44 * <P>45 * This format can be changed with the following configurations.46 * <UL>47 * <LI><code>TsProcessor.default.time.format</code> - Changes the default time48 * format used by all data types.</LI>49 * <LI><code>TsProcessor.time.format.[some_data_type]</code> - Overrides the default50 * format for a specific data type.</LI>51 * </UL>52 * If the time string is not at the beginning of the record you can configure a53 * regular expression to locate the timestamp text with either of the following54 * configurations. The text found in group 1 of the regular expression match55 * will be used with the configured date format.56 * <UL>57 * <LI><code>TsProcessor.default.time.regex</code> - Changes the default time58 * location regex of the time text for all data types.</LI>59 * <LI><code>TsProcessor.time.regex.[some_data_type]</code> - Overrides the60 * default time location regex for a specific data type.</LI>61 * </UL>62 *63 */64 @Table(name="TsProcessor",columnFamily="log")
65publicclassTsProcessorextendsAbstractProcessor {
66static Logger log = Logger.getLogger(TsProcessor.class);
6768publicstaticfinal String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
69publicstaticfinal String DEFAULT_TIME_REGEX = "TsProcessor.default.time.regex";
70publicstaticfinal String TIME_REGEX = "TsProcessor.time.regex.";
7172private Map<String, Pattern> datePatternMap;
73private Map<String, SimpleDateFormat> dateFormatMap;
7475publicTsProcessor() {
76 datePatternMap = new HashMap<String, Pattern>();
77 dateFormatMap = new HashMap<String, SimpleDateFormat>();
78 }
7980 @Override
81protectedvoid parse(String recordEntry,
82 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
83throws Throwable {
84 String dStr = null;
85try {
86 SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
87 Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
8889// fetch the part of the record that contains the date.90if(datePattern != null) {
91 Matcher m = datePattern.matcher(recordEntry);
92if (!m.matches() || m.groupCount() < 1) {
93thrownew ParseException("Regex " + datePattern +
94" couldn't extract date string from record: " + recordEntry, 0);
95 }
96else {
97 dStr = m.group(1);
98 }
99 }
100else {
101 dStr = recordEntry;
102 }
103104 Date d = sdf.parse(dStr);
105ChukwaRecord record = newChukwaRecord();
106this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
107 .getDataType());
108 output.collect(key, record);
109 } catch (ParseException e) {
110 log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
111 + "], date string='" + dStr + "'", e);
112 e.printStackTrace();
113throw e;
114 } catch (IOException e) {
115 log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
116 + "]", e);
117 e.printStackTrace();
118throw e;
119 }
120121 }
122123/**124 * For a given dataType, returns the SimpeDateFormat to use.125 * @param dataType126 * @return127 */128private SimpleDateFormat fetchDateFormat(String dataType) {
129if (dateFormatMap.get(dataType) != null) {
130return dateFormatMap.get(dataType);
131 }
132133 Configuration jobConf = Demux.jobConf;
134 String dateFormat = DEFAULT_DATE_FORMAT;
135136if (jobConf != null) {
137 dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
138 dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
139 dateFormat);
140 }
141142 log.info("dataType: " + chunk.getDataType() + ", dateFormat="+ dateFormat);
143 SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
144 dateFormatMap.put(dataType, sdf);
145146return sdf;
147 }
148149/**150 * For a given dataType, returns a Pattern that will produce the date portion151 * of the string.152 * @param dataType153 * @return154 */155private Pattern fetchDateLocationPattern(String dataType) {
156if (datePatternMap.containsKey(dataType)) {
157return datePatternMap.get(dataType);
158 }
159160 Configuration jobConf = Demux.jobConf;
161 String datePattern = null;
162 Pattern pattern = null;
163164if (jobConf != null) {
165 String timeRegexProperty = TIME_REGEX + chunk.getDataType();
166 datePattern = jobConf.get(DEFAULT_TIME_REGEX, null);
167 datePattern = jobConf.get(timeRegexProperty, datePattern);
168if (datePattern != null) {
169if (!RegexUtil.isRegex(datePattern, 1)) {
170 log.warn("Error parsing '" + DEFAULT_TIME_REGEX + "' or '"171 + timeRegexProperty + "' properties as a regex: "172 + RegexUtil.regexError(datePattern, 1)
173 + ". This date pattern will be skipped.");
174returnnull;
175 }
176 pattern = Pattern.compile(datePattern);
177 }
178 }
179180 datePatternMap.put(dataType, pattern);
181182return pattern;
183 }
184185 }