This project has retired. For details please refer to its Attic page.
AbstractProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.hbase;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.charset.Charset;
23  import java.security.MessageDigest;
24  import java.security.NoSuchAlgorithmException;
25  import java.util.ArrayList;
26  
27  import org.apache.hadoop.chukwa.Chunk;
28  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
29  import org.apache.hadoop.chukwa.util.HBaseUtil;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.log4j.Logger;
32  
33  public abstract class AbstractProcessor {
34    static Logger LOG = Logger.getLogger(AbstractProcessor.class);
35  
36    protected int entryCount = 0;
37    protected String primaryKeyHelper;
38    protected String sourceHelper;
39  
40    protected byte[] key = null;
41    byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
42  
43    boolean chunkInErrorSaved = false;
44    ArrayList<Put> output = null;
45    ArrayList<Put> meta = null;
46    Reporter reporter = null;
47    long time = System.currentTimeMillis();
48    Chunk chunk = null;
49    MessageDigest md5 = null;
50  
51    public AbstractProcessor() throws NoSuchAlgorithmException {
52      md5 = MessageDigest.getInstance("md5");
53    }
54  
55    protected abstract void parse(byte[] recordEntry) throws Throwable;
56  
57    /**
58     * Generic metric function to add a metric to HBase with full primary key and
59     * source computed.
60     * 
61     * @param time is timestamp in epoch
62     * @param metric is metric name
63     * @param source is data source name
64     * @param value is metric value in bytes
65     * @param output is an array list of Put operations
66     */
67    public void addRecord(long time, String metric, String source, byte[] value,
68        ArrayList<Put> output) {
69      String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
70          .append(metric).toString();
71      byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
72      Put put = new Put(key);
73      byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
74      put.addColumn(CF, timeInBytes, time, value);
75      output.add(put);
76      reporter.putMetric(chunk.getDataType(), primaryKey);
77      reporter.putSource(chunk.getDataType(), source);
78    }
79  
80    public void addRecord(String primaryKey, String value) {
81      addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
82    }
83  
84    /**
85     * Generic function to add a metric to HBase metric table, this function
86     * assumes "time" and "source" have been defined and will construct primaryKey
87     * only, without recompute time and source md5.
88     * 
89     * @param metric is metric name
90     * @param value is metric value in bytes
91     */
92    public void addRecord(String metric, byte[] value) {
93      String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
94          .append(metric).toString();
95      byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
96      Put put = new Put(key);
97      byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
98      put.addColumn(CF, timeInBytes, time, value);
99      output.add(put);
100     reporter.putMetric(chunk.getDataType(), primaryKey);
101   }
102 
103   /**
104    * Process a chunk to store in HBase.
105    * 
106    * @param chunk is a Chukwa chunk
107    * @param output is an array of Put operations
108    * @param reporter is a reporter to track progress
109    * @throws Throwable if there is problem parsing data
110    */
111   public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
112       throws Throwable {
113     this.output = output;
114     this.reporter = reporter;
115     this.chunk = chunk;
116     this.primaryKeyHelper = chunk.getDataType();
117     this.sourceHelper = chunk.getSource();
118     reporter.putSource(primaryKeyHelper, sourceHelper);
119     parse(chunk.getData());
120     addMeta();
121   }
122 
123   protected void addMeta() {
124     byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper);
125     Put put = new Put(key);
126     String family = "a";
127     byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
128     put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
129     output.add(put);
130   }
131 
132 }