This project has retired. For details please refer to its
Attic page.
AbstractProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.hbase;
20
21 import java.nio.ByteBuffer;
22 import java.nio.charset.Charset;
23 import java.security.MessageDigest;
24 import java.security.NoSuchAlgorithmException;
25 import java.util.ArrayList;
26
27 import org.apache.hadoop.chukwa.Chunk;
28 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter;
29 import org.apache.hadoop.chukwa.util.HBaseUtil;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.log4j.Logger;
32
33 public abstract class AbstractProcessor {
34 static Logger LOG = Logger.getLogger(AbstractProcessor.class);
35
36 protected int entryCount = 0;
37 protected String primaryKeyHelper;
38 protected String sourceHelper;
39
40 protected byte[] key = null;
41 byte[] CF = "t".getBytes(Charset.forName("UTF-8"));
42
43 boolean chunkInErrorSaved = false;
44 ArrayList<Put> output = null;
45 ArrayList<Put> meta = null;
46 Reporter reporter = null;
47 long time = System.currentTimeMillis();
48 Chunk chunk = null;
49 MessageDigest md5 = null;
50
51 public AbstractProcessor() throws NoSuchAlgorithmException {
52 md5 = MessageDigest.getInstance("md5");
53 }
54
55 protected abstract void parse(byte[] recordEntry) throws Throwable;
56
57
58
59
60
61
62
63
64
65
66
67 public void addRecord(long time, String metric, String source, byte[] value,
68 ArrayList<Put> output) {
69 String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
70 .append(metric).toString();
71 byte[] key = HBaseUtil.buildKey(time, primaryKey, source);
72 Put put = new Put(key);
73 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
74 put.addColumn(CF, timeInBytes, time, value);
75 output.add(put);
76 reporter.putMetric(chunk.getDataType(), primaryKey);
77 reporter.putSource(chunk.getDataType(), source);
78 }
79
80 public void addRecord(String primaryKey, String value) {
81 addRecord(primaryKey, value.getBytes(Charset.forName("UTF-8")));
82 }
83
84
85
86
87
88
89
90
91
92 public void addRecord(String metric, byte[] value) {
93 String primaryKey = new StringBuilder(primaryKeyHelper).append(".")
94 .append(metric).toString();
95 byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper);
96 Put put = new Put(key);
97 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
98 put.addColumn(CF, timeInBytes, time, value);
99 output.add(put);
100 reporter.putMetric(chunk.getDataType(), primaryKey);
101 }
102
103
104
105
106
107
108
109
110
111 public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter)
112 throws Throwable {
113 this.output = output;
114 this.reporter = reporter;
115 this.chunk = chunk;
116 this.primaryKeyHelper = chunk.getDataType();
117 this.sourceHelper = chunk.getSource();
118 reporter.putSource(primaryKeyHelper, sourceHelper);
119 parse(chunk.getData());
120 addMeta();
121 }
122
123 protected void addMeta() {
124 byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper);
125 Put put = new Put(key);
126 String family = "a";
127 byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array();
128 put.addColumn(family.getBytes(Charset.forName("UTF-8")), timeInBytes, time, chunk.getTags().getBytes(Charset.forName("UTF-8")));
129 output.add(put);
130 }
131
132 }