This project has retired. For details please refer to its Attic page.
HBaseWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.Timer;
24  import java.util.TimerTask;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28  import org.apache.hadoop.chukwa.Chunk;
29  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
31  import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
32  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
33  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
36  import org.apache.hadoop.chukwa.extraction.demux.Demux;
37  import org.apache.hadoop.chukwa.util.ClassUtils;
38  import org.apache.hadoop.chukwa.util.DaemonWatcher;
39  import org.apache.hadoop.chukwa.util.ExceptionUtil;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HConnection;
47  import org.apache.hadoop.hbase.client.HConnectionManager;
48  import org.apache.hadoop.hbase.client.HTableInterface;
49  import org.apache.hadoop.hbase.client.HTablePool;
50  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
51  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
52  import org.apache.log4j.Logger;
53  
54  public class HBaseWriter extends PipelineableWriter {
55    static Logger log = Logger.getLogger(HBaseWriter.class);
56    boolean reportStats;
57    volatile long dataSize = 0;
58    final Timer statTimer;
59    private OutputCollector output;
60    private Reporter reporter;
61    private ChukwaConfiguration conf;
62    String defaultProcessor;
63    private HConnection connection;
64    private Configuration hconf;
65    
66    private class StatReportingTask extends TimerTask {
67      private long lastTs = System.currentTimeMillis();
68      private long lastDataSize = 0;
69  
70      public void run() {
71        long time = System.currentTimeMillis();
72        long interval = time - lastTs;
73        lastTs = time;
74  
75        long ds = dataSize;
76        long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
77        // refers only to data field, not including http or chukwa headers
78        lastDataSize = ds;
79  
80        log.info("stat=HBaseWriter|dataRate="
81            + dataRate);
82      }
83    };
84  
85    public HBaseWriter() throws IOException {
86      this(true);
87    }
88  
89    public HBaseWriter(boolean reportStats) throws IOException {
90      /* HBase Version >= 0.89.x */
91      this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
92    }
93  
94    public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
95      this(true, conf, hconf);
96    }
97  
98    private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
99      this.reportStats = reportStats;
100     this.conf = conf;
101     this.hconf = hconf;
102     this.statTimer = new Timer();
103     this.defaultProcessor = conf.get(
104       "chukwa.demux.mapper.default.processor",
105       "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
106     Demux.jobConf = conf;
107     log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
108     if (reportStats) {
109       statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
110     }
111     output = new OutputCollector();
112     reporter = new Reporter();
113     if(conf.getBoolean("hbase.writer.verify.schema", false)) {
114       verifyHbaseSchema();      
115     }
116     connection = HConnectionManager.createConnection(hconf);
117   }
118 
119   public void close() {
120     if (reportStats) {
121       statTimer.cancel();
122     }
123   }
124 
125   public void init(Configuration conf) throws WriterException {
126   }
127 
128   private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
129     boolean status = false;
130     try {
131       if(admin.tableExists(table.name())) {
132         HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
133         HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
134         for(HColumnDescriptor cd : columnDescriptors) {
135           if(cd.getNameAsString().equals(table.columnFamily())) {
136             log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
137             status = true;
138           }
139         }
140       } else {
141         throw new Exception("HBase table: "+table.name()+ " does not exist.");
142       }
143     } catch(Exception e) {
144       log.error(ExceptionUtil.getStackTrace(e));
145       status = false;
146     }
147     return status;    
148   }
149   
150   private void verifyHbaseSchema() {
151     log.debug("Verify Demux parser with HBase schema");
152     boolean schemaVerified = true;
153     try {
154       HBaseAdmin admin = new HBaseAdmin(hconf);
155       List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
156       for(Class<?> x : demuxParsers) {
157         if(x.isAnnotationPresent(Tables.class)) {
158           Tables list = x.getAnnotation(Tables.class);
159           for(Table table : list.annotations()) {
160             if(!verifyHbaseTable(admin, table)) {
161               schemaVerified = false;
162               log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");              
163             }
164           }
165         } else if(x.isAnnotationPresent(Table.class)) {
166           Table table = x.getAnnotation(Table.class);
167           if(!verifyHbaseTable(admin, table)) {
168             schemaVerified = false;
169             log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
170           }
171         }
172       }
173     } catch (Exception e) {
174       schemaVerified = false;
175       log.error(ExceptionUtil.getStackTrace(e));
176     }
177     if(!schemaVerified) {
178       log.error("Hbase schema mismatch with demux parser.");
179       if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
180         log.error("Exiting...");
181         DaemonWatcher.bailout(-1);
182       }
183     }
184   }
185 
186   @Override
187   public CommitStatus add(List<Chunk> chunks) throws WriterException {
188     CommitStatus rv = ChukwaWriter.COMMIT_OK;
189     try {
190       for(Chunk chunk : chunks) {
191         synchronized (this) {
192           try {
193             Table table = findHBaseTable(chunk.getDataType());
194 
195             if(table!=null) {
196               HTableInterface hbase = connection.getTable(table.name());              
197               MapProcessor processor = getProcessor(chunk.getDataType());
198               processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
199               hbase.put(output.getKeyValues());
200             } else {
201               log.warn("Error finding HBase table for data type:"+chunk.getDataType());
202             }
203           } catch (Exception e) {
204             log.warn(output.getKeyValues());
205             log.warn(ExceptionUtil.getStackTrace(e));
206           }
207           dataSize += chunk.getData().length;
208           output.clear();
209           reporter.clear();
210         }
211       }
212     } catch (Exception e) {
213       log.error(ExceptionUtil.getStackTrace(e));
214       throw new WriterException("Failed to store data to HBase.");
215     }    
216     if (next != null) {
217       rv = next.add(chunks); //pass data through
218     }
219     return rv;
220   }
221 
222   public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
223     MapProcessor processor = getProcessor(dataType);
224 
225     Table table = null;
226     if(processor.getClass().isAnnotationPresent(Table.class)) {
227       return processor.getClass().getAnnotation(Table.class);
228     } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
229       Tables tables = processor.getClass().getAnnotation(Tables.class);
230       for(Table t : tables.annotations()) {
231         table = t;
232       }
233     }
234 
235     return table;
236   }
237 
238   public String findHBaseColumnFamilyName(String dataType)
239           throws UnknownRecordTypeException {
240     Table table = findHBaseTable(dataType);
241     return table.columnFamily();
242   }
243 
244   private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
245     String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
246     return MapProcessorFactory.getProcessor(processorClass);
247   }
248 
249   /**
250    * Look for mapper parser class in the demux configuration.
251    * Demux configuration has been changed since CHUKWA-581 to
252    * support mapping of both mapper and reducer, and this utility
253    * class is to detect the mapper class and return the mapper
254    * class only.
255    *
256    */
257   private String findProcessor(String processors, String defaultProcessor) {
258     if(processors.startsWith(",")) {
259       // No mapper class defined.
260       return defaultProcessor;
261     } else if(processors.contains(",")) {
262       // Both mapper and reducer defined.
263       String[] parsers = processors.split(",");
264       return parsers[0];
265     }
266     // No reducer defined.
267     return processors;
268   }
269 }