This project has retired. For details please refer to its
Attic page.
HBaseWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Timer;
24 import java.util.TimerTask;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28 import org.apache.hadoop.chukwa.Chunk;
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
31 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
32 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
36 import org.apache.hadoop.chukwa.extraction.demux.Demux;
37 import org.apache.hadoop.chukwa.util.ClassUtils;
38 import org.apache.hadoop.chukwa.util.DaemonWatcher;
39 import org.apache.hadoop.chukwa.util.ExceptionUtil;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
45 import org.apache.hadoop.hbase.client.HBaseAdmin;
46 import org.apache.hadoop.hbase.client.HConnection;
47 import org.apache.hadoop.hbase.client.HConnectionManager;
48 import org.apache.hadoop.hbase.client.HTableInterface;
49 import org.apache.hadoop.hbase.client.HTablePool;
50 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
51 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
52 import org.apache.log4j.Logger;
53
54 public class HBaseWriter extends PipelineableWriter {
55 static Logger log = Logger.getLogger(HBaseWriter.class);
56 boolean reportStats;
57 volatile long dataSize = 0;
58 final Timer statTimer;
59 private OutputCollector output;
60 private Reporter reporter;
61 private ChukwaConfiguration conf;
62 String defaultProcessor;
63 private HConnection connection;
64 private Configuration hconf;
65
66 private class StatReportingTask extends TimerTask {
67 private long lastTs = System.currentTimeMillis();
68 private long lastDataSize = 0;
69
70 public void run() {
71 long time = System.currentTimeMillis();
72 long interval = time - lastTs;
73 lastTs = time;
74
75 long ds = dataSize;
76 long dataRate = 1000 * (ds - lastDataSize) / interval;
77
78 lastDataSize = ds;
79
80 log.info("stat=HBaseWriter|dataRate="
81 + dataRate);
82 }
83 };
84
85 public HBaseWriter() throws IOException {
86 this(true);
87 }
88
89 public HBaseWriter(boolean reportStats) throws IOException {
90
91 this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
92 }
93
94 public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
95 this(true, conf, hconf);
96 }
97
98 private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
99 this.reportStats = reportStats;
100 this.conf = conf;
101 this.hconf = hconf;
102 this.statTimer = new Timer();
103 this.defaultProcessor = conf.get(
104 "chukwa.demux.mapper.default.processor",
105 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
106 Demux.jobConf = conf;
107 log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
108 if (reportStats) {
109 statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
110 }
111 output = new OutputCollector();
112 reporter = new Reporter();
113 if(conf.getBoolean("hbase.writer.verify.schema", false)) {
114 verifyHbaseSchema();
115 }
116 connection = HConnectionManager.createConnection(hconf);
117 }
118
119 public void close() {
120 if (reportStats) {
121 statTimer.cancel();
122 }
123 }
124
125 public void init(Configuration conf) throws WriterException {
126 }
127
128 private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
129 boolean status = false;
130 try {
131 if(admin.tableExists(table.name())) {
132 HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
133 HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
134 for(HColumnDescriptor cd : columnDescriptors) {
135 if(cd.getNameAsString().equals(table.columnFamily())) {
136 log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
137 status = true;
138 }
139 }
140 } else {
141 throw new Exception("HBase table: "+table.name()+ " does not exist.");
142 }
143 } catch(Exception e) {
144 log.error(ExceptionUtil.getStackTrace(e));
145 status = false;
146 }
147 return status;
148 }
149
150 private void verifyHbaseSchema() {
151 log.debug("Verify Demux parser with HBase schema");
152 boolean schemaVerified = true;
153 try {
154 HBaseAdmin admin = new HBaseAdmin(hconf);
155 List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
156 for(Class<?> x : demuxParsers) {
157 if(x.isAnnotationPresent(Tables.class)) {
158 Tables list = x.getAnnotation(Tables.class);
159 for(Table table : list.annotations()) {
160 if(!verifyHbaseTable(admin, table)) {
161 schemaVerified = false;
162 log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
163 }
164 }
165 } else if(x.isAnnotationPresent(Table.class)) {
166 Table table = x.getAnnotation(Table.class);
167 if(!verifyHbaseTable(admin, table)) {
168 schemaVerified = false;
169 log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
170 }
171 }
172 }
173 } catch (Exception e) {
174 schemaVerified = false;
175 log.error(ExceptionUtil.getStackTrace(e));
176 }
177 if(!schemaVerified) {
178 log.error("Hbase schema mismatch with demux parser.");
179 if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
180 log.error("Exiting...");
181 DaemonWatcher.bailout(-1);
182 }
183 }
184 }
185
186 @Override
187 public CommitStatus add(List<Chunk> chunks) throws WriterException {
188 CommitStatus rv = ChukwaWriter.COMMIT_OK;
189 try {
190 for(Chunk chunk : chunks) {
191 synchronized (this) {
192 try {
193 Table table = findHBaseTable(chunk.getDataType());
194
195 if(table!=null) {
196 HTableInterface hbase = connection.getTable(table.name());
197 MapProcessor processor = getProcessor(chunk.getDataType());
198 processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
199 hbase.put(output.getKeyValues());
200 } else {
201 log.warn("Error finding HBase table for data type:"+chunk.getDataType());
202 }
203 } catch (Exception e) {
204 log.warn(output.getKeyValues());
205 log.warn(ExceptionUtil.getStackTrace(e));
206 }
207 dataSize += chunk.getData().length;
208 output.clear();
209 reporter.clear();
210 }
211 }
212 } catch (Exception e) {
213 log.error(ExceptionUtil.getStackTrace(e));
214 throw new WriterException("Failed to store data to HBase.");
215 }
216 if (next != null) {
217 rv = next.add(chunks);
218 }
219 return rv;
220 }
221
222 public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
223 MapProcessor processor = getProcessor(dataType);
224
225 Table table = null;
226 if(processor.getClass().isAnnotationPresent(Table.class)) {
227 return processor.getClass().getAnnotation(Table.class);
228 } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
229 Tables tables = processor.getClass().getAnnotation(Tables.class);
230 for(Table t : tables.annotations()) {
231 table = t;
232 }
233 }
234
235 return table;
236 }
237
238 public String findHBaseColumnFamilyName(String dataType)
239 throws UnknownRecordTypeException {
240 Table table = findHBaseTable(dataType);
241 return table.columnFamily();
242 }
243
244 private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
245 String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
246 return MapProcessorFactory.getProcessor(processorClass);
247 }
248
249
250
251
252
253
254
255
256
257 private String findProcessor(String processors, String defaultProcessor) {
258 if(processors.startsWith(",")) {
259
260 return defaultProcessor;
261 } else if(processors.contains(",")) {
262
263 String[] parsers = processors.split(",");
264 return parsers[0];
265 }
266
267 return processors;
268 }
269 }