1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.extraction.hbase;
2021import java.nio.ByteBuffer;
22import java.nio.charset.Charset;
23import java.security.MessageDigest;
24import java.security.NoSuchAlgorithmException;
25import java.util.ArrayList;
2627import org.apache.hadoop.chukwa.Chunk;
28import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
29import org.apache.hadoop.chukwa.util.HBaseUtil;
30import org.apache.hadoop.hbase.client.Put;
31import org.apache.log4j.Logger;
3233publicabstractclassAbstractProcessor {
34static Logger LOG = Logger.getLogger(AbstractProcessor.class);
3536protectedint entryCount = 0;
37protected String primaryKeyHelper;
38protected String sourceHelper;
3940protected byte[] key = null;
41 byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
4243boolean chunkInErrorSaved = false;
44 ArrayList<Put> output = null;
45 ArrayList<Put> meta = null;
46Reporter reporter = null;
47long time = System.currentTimeMillis();
48Chunk chunk = null;
49 MessageDigest md5 = null;
5051publicAbstractProcessor() throws NoSuchAlgorithmException {
52 md5 = MessageDigest.getInstance("md5");
53 }
5455protectedabstractvoid parse(byte[] recordEntry) throws Throwable;
5657/**58 * Generic metric function to add a metric to HBase with full primary key and59 * source computed.60 * 61 * @param time is timestamp in epoch62 * @param metric is metric name63 * @param source is data source name64 * @param value is metric value in bytes65 * @param output is an array list of Put operations66 */67publicvoid addRecord(long time, String metric, String source, byte[] value,
68 ArrayList<Put> output) {
69 String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
70 .append(metric).toString();
71 byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
72 Put put = new Put(key);
73 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
74 put.addColumn(CF, timeInBytes, time, value);
75 output.add(put);
76 reporter.putMetric(chunk.getDataType(), primaryKey);
77 reporter.putSource(chunk.getDataType(), source);
78 }
7980publicvoid addRecord(String primaryKey, String value) {
81 addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
82 }
8384/**85 * Generic function to add a metric to HBase metric table, this function86 * assumes "time" and "source" have been defined and will construct primaryKey87 * only, without recompute time and source md5.88 * 89 * @param metric is metric name90 * @param value is metric value in bytes91 */92publicvoid addRecord(String metric, byte[] value) {
93 String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
94 .append(metric).toString();
95 byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
96 Put put = new Put(key);
97 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
98 put.addColumn(CF, timeInBytes, time, value);
99 output.add(put);
100 reporter.putMetric(chunk.getDataType(), primaryKey);
101 }
102103/**104 * Process a chunk to store in HBase.105 * 106 * @param chunk is a Chukwa chunk107 * @param output is an array of Put operations108 * @param reporter is a reporter to track progress109 * @throws Throwable if there is problem parsing data110 */111publicvoid process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
112throws Throwable {
113this.output = output;
114this.reporter = reporter;
115this.chunk = chunk;
116this.primaryKeyHelper = chunk.getDataType();
117this.sourceHelper = chunk.getSource();
118 reporter.putSource(primaryKeyHelper, sourceHelper);
119 parse(chunk.getData());
120 addMeta();
121 }
122123protectedvoid addMeta() {
124 byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper);
125 Put put = new Put(key);
126 String family = "a";
127 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
128 put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
129 output.add(put);
130 }
131132 }