This project has retired. For details please refer to its
Attic page.
HBaseWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20
21 import java.io.IOException;
22 import java.security.NoSuchAlgorithmException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.chukwa.Chunk;
30 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
32 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
33 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
34 import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
35 import org.apache.hadoop.chukwa.extraction.hbase.ProcessorFactory;
36 import org.apache.hadoop.chukwa.extraction.hbase.UnknownRecordTypeException;
37 import org.apache.hadoop.chukwa.util.ExceptionUtil;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.log4j.Logger;
46
47 public class HBaseWriter extends PipelineableWriter {
48 static Logger log = Logger.getLogger(HBaseWriter.class);
49 private static final String CHUKWA_TABLE = "chukwa";
50 private static final String CHUKWA_META_TABLE = "chukwa_meta";
51 boolean reportStats;
52 volatile long dataSize = 0;
53 final Timer statTimer;
54 private ArrayList<Put> output;
55 private Reporter reporter;
56 private ChukwaConfiguration conf;
57 private Configuration hconf;
58 String defaultProcessor;
59 private static Connection connection;
60
61 private class StatReportingTask extends TimerTask {
62 private long lastTs = System.currentTimeMillis();
63 private long lastDataSize = 0;
64
65 public void run() {
66 long time = System.currentTimeMillis();
67 long interval = time - lastTs;
68 lastTs = time;
69
70 long ds = dataSize;
71 long dataRate = 1000 * (ds - lastDataSize) / interval;
72
73 lastDataSize = ds;
74
75 log.info("stat=HBaseWriter|dataRate="
76 + dataRate);
77 }
78 };
79
80 public HBaseWriter() throws IOException {
81 this(true);
82 }
83
84 public HBaseWriter(boolean reportStats) throws IOException {
85
86 this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
87 }
88
89 public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
90 this(true, conf, hconf);
91 }
92
93 private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
94 this.reportStats = reportStats;
95 this.conf = conf;
96 this.hconf = hconf;
97 this.statTimer = new Timer();
98 this.defaultProcessor = conf.get(
99 "chukwa.demux.mapper.default.processor",
100 "org.apache.hadoop.chukwa.extraction.hbase.DefaultProcessor");
101 log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
102 if (reportStats) {
103 statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
104 }
105 output = new ArrayList<Put>();
106 try {
107 reporter = new Reporter();
108 } catch (NoSuchAlgorithmException e) {
109 throw new IOException("Can not register hashing algorithm.");
110 }
111 if (connection == null || connection.isClosed()) {
112 connection = ConnectionFactory.createConnection(hconf);
113 }
114 }
115
116 public void close() {
117 if (reportStats) {
118 statTimer.cancel();
119 }
120 }
121
122 public void init(Configuration conf) throws WriterException {
123 if (connection == null || connection.isClosed()) {
124 try {
125 connection = ConnectionFactory.createConnection(hconf);
126 } catch (IOException e) {
127 throw new WriterException("HBase is offline, retry later...");
128 }
129 }
130 }
131
132 @Override
133 public CommitStatus add(List<Chunk> chunks) throws WriterException {
134 CommitStatus rv = ChukwaWriter.COMMIT_OK;
135 try {
136 Table hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
137 Table meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
138 for(Chunk chunk : chunks) {
139 synchronized (this) {
140 try {
141 AbstractProcessor processor = getProcessor(chunk.getDataType());
142 processor.process(chunk, output, reporter);
143 hbase.put(output);
144 meta.put(reporter.getInfo());
145 } catch (Throwable e) {
146 log.warn(output);
147 log.warn(ExceptionUtil.getStackTrace(e));
148 }
149 dataSize += chunk.getData().length;
150 output.clear();
151 reporter.clear();
152 }
153 }
154 hbase.close();
155 meta.close();
156 } catch (Exception e) {
157 log.error(ExceptionUtil.getStackTrace(e));
158 throw new WriterException("Failed to store data to HBase.");
159 }
160 if (next != null) {
161 rv = next.add(chunks);
162 }
163 return rv;
164 }
165
166 private AbstractProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
167 String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
168 return ProcessorFactory.getProcessor(processorClass);
169 }
170
171
172
173
174
175
176
177
178
179 private String findProcessor(String processors, String defaultProcessor) {
180 if(processors.startsWith(",")) {
181
182 return defaultProcessor;
183 } else if(processors.contains(",")) {
184
185 String[] parsers = processors.split(",");
186 return parsers[0];
187 }
188
189 return processors;
190 }
191 }