This project has retired. For details please refer to its Attic page.
AbstractProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.util.Calendar;
23  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
24  import org.apache.hadoop.chukwa.Chunk;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27  import org.apache.hadoop.chukwa.extraction.engine.Record;
28  import org.apache.hadoop.chukwa.util.RecordConstants;
29  import org.apache.hadoop.mapred.OutputCollector;
30  import org.apache.hadoop.mapred.Reporter;
31  import org.apache.log4j.Logger;
32  
33  public abstract class AbstractProcessor implements MapProcessor {
34    static Logger log = Logger.getLogger(AbstractProcessor.class);
35  
36    Calendar calendar = Calendar.getInstance();
37    byte[] bytes;
38    int[] recordOffsets;
39    protected int currentPos = 0;
40    protected int startOffset = 0;
41  
42    protected ChukwaArchiveKey archiveKey = null;
43    protected ChukwaRecordKey key = new ChukwaRecordKey();
44    protected Chunk chunk = null;
45  
46    boolean chunkInErrorSaved = false;
47    OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
48    Reporter reporter = null;
49  
50    public AbstractProcessor() {
51    }
52  
53    protected abstract void parse(String recordEntry,
54        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
55        throws Throwable;
56  
57    protected void saveChunkInError(Throwable throwable) {
58      if (chunkInErrorSaved == false) {
59        try {
60          ChunkSaver.saveChunk(chunk, throwable, output, reporter);
61          chunkInErrorSaved = true;
62        } catch (Exception e) {
63          e.printStackTrace();
64        }
65      }
66  
67    }
68  
69    public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
70        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
71      chunkInErrorSaved = false;
72  
73      this.archiveKey = archiveKey;
74      this.output = output;
75      this.reporter = reporter;
76  
77      reset(chunk);
78  
79      while (hasNext()) {
80        try {
81          parse(nextLine(), output, reporter);
82        } catch (Throwable e) {
83          saveChunkInError(e);
84        }
85      }
86    }
87  
88    protected void buildGenericRecord(ChukwaRecord record, String body,
89        long timestamp, String dataSource) {
90      calendar.setTimeInMillis(timestamp);
91      calendar.set(Calendar.MINUTE, 0);
92      calendar.set(Calendar.SECOND, 0);
93      calendar.set(Calendar.MILLISECOND, 0);
94  
95      key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
96          + timestamp);
97      key.setReduceType(dataSource);
98  
99      if (body != null) {
100       record.add(Record.bodyField, body);
101     }
102     record.setTime(timestamp);
103 
104     record.add(Record.tagsField, chunk.getTags());
105     record.add(Record.sourceField, chunk.getSource());
106     record.add(Record.applicationField, chunk.getStreamName());
107 
108   }
109 
110   protected void reset(Chunk chunk) {
111     this.chunk = chunk;
112     this.bytes = chunk.getData();
113     this.recordOffsets = chunk.getRecordOffsets();
114     currentPos = 0;
115     startOffset = 0;
116   }
117 
118   protected boolean hasNext() {
119     return (currentPos < recordOffsets.length);
120   }
121 
122   protected String nextLine() {
123     String log = new String(bytes, startOffset, (recordOffsets[currentPos]
124         - startOffset + 1));
125     startOffset = recordOffsets[currentPos] + 1;
126     currentPos++;
127     return RecordConstants.recoverRecordSeparators("\n", log);
128   }
129 
130 public int getCurrentPos() {
131 	return currentPos;
132 }
133 
134 public void setCurrentPos(int currentPos) {
135 	this.currentPos = currentPos;
136 }
137 
138 public int getStartOffset() {
139 	return startOffset;
140 }
141 
142 public void setStartOffset(int startOffset) {
143 	this.startOffset = startOffset;
144 }  
145 }