This project has retired. For details please refer to its Attic page.
HBaseWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20  
21  import java.io.IOException;
22  import java.security.NoSuchAlgorithmException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Timer;
26  import java.util.TimerTask;
27  
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.chukwa.Chunk;
30  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
31  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
32  import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
33  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
34  import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor;
35  import org.apache.hadoop.chukwa.extraction.hbase.ProcessorFactory;
36  import org.apache.hadoop.chukwa.extraction.hbase.UnknownRecordTypeException;
37  import org.apache.hadoop.chukwa.util.ExceptionUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Table;
45  import org.apache.log4j.Logger;
46  
47  public class HBaseWriter extends PipelineableWriter {
48    static Logger log = Logger.getLogger(HBaseWriter.class);
49    private static final String CHUKWA_TABLE = "chukwa";
50    private static final String CHUKWA_META_TABLE = "chukwa_meta";
51    boolean reportStats;
52    volatile long dataSize = 0;
53    final Timer statTimer;
54    private ArrayList<Put> output;
55    private Reporter reporter;
56    private ChukwaConfiguration conf;
57    private Configuration hconf;
58    String defaultProcessor;
59    private static Connection connection;
60    
61    private class StatReportingTask extends TimerTask {
62      private long lastTs = System.currentTimeMillis();
63      private long lastDataSize = 0;
64  
65      public void run() {
66        long time = System.currentTimeMillis();
67        long interval = time - lastTs;
68        lastTs = time;
69  
70        long ds = dataSize;
71        long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
72        // refers only to data field, not including http or chukwa headers
73        lastDataSize = ds;
74  
75        log.info("stat=HBaseWriter|dataRate="
76            + dataRate);
77      }
78    };
79  
80    public HBaseWriter() throws IOException {
81      this(true);
82    }
83  
84    public HBaseWriter(boolean reportStats) throws IOException {
85      /* HBase Version >= 0.89.x */
86      this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
87    }
88  
89    public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
90      this(true, conf, hconf);
91    }
92  
93    private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
94      this.reportStats = reportStats;
95      this.conf = conf;
96      this.hconf = hconf;
97      this.statTimer = new Timer();
98      this.defaultProcessor = conf.get(
99        "chukwa.demux.mapper.default.processor",
100       "org.apache.hadoop.chukwa.extraction.hbase.DefaultProcessor");
101     log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
102     if (reportStats) {
103       statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
104     }
105     output = new ArrayList<Put>();
106     try {
107       reporter = new Reporter();
108     } catch (NoSuchAlgorithmException e) {
109       throw new IOException("Can not register hashing algorithm.");
110     }
111     if (connection == null || connection.isClosed()) {
112       connection = ConnectionFactory.createConnection(hconf);
113     }
114   }
115 
116   public void close() {
117     if (reportStats) {
118       statTimer.cancel();
119     }
120   }
121 
122   public void init(Configuration conf) throws WriterException {
123     if (connection == null || connection.isClosed()) {
124       try {
125         connection = ConnectionFactory.createConnection(hconf);
126       } catch (IOException e) {
127         throw new WriterException("HBase is offline, retry later...");
128       }
129     }
130   }
131 
132   @Override
133   public CommitStatus add(List<Chunk> chunks) throws WriterException {
134     CommitStatus rv = ChukwaWriter.COMMIT_OK;
135     try {
136       Table hbase = connection.getTable(TableName.valueOf(CHUKWA_TABLE));
137       Table meta = connection.getTable(TableName.valueOf(CHUKWA_META_TABLE));
138       for(Chunk chunk : chunks) {
139         synchronized (this) {
140           try {
141             AbstractProcessor processor = getProcessor(chunk.getDataType());
142             processor.process(chunk, output, reporter);
143             hbase.put(output);
144             meta.put(reporter.getInfo());
145           } catch (Throwable e) {
146             log.warn(output);
147             log.warn(ExceptionUtil.getStackTrace(e));
148           }
149           dataSize += chunk.getData().length;
150           output.clear();
151           reporter.clear();
152         }
153       }
154       hbase.close();
155       meta.close();
156     } catch (Exception e) {
157       log.error(ExceptionUtil.getStackTrace(e));
158       throw new WriterException("Failed to store data to HBase.");
159     }    
160     if (next != null) {
161       rv = next.add(chunks); //pass data through
162     }
163     return rv;
164   }
165 
166   private AbstractProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
167     String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
168     return ProcessorFactory.getProcessor(processorClass);
169   }
170 
171   /**
172    * Look for mapper parser class in the demux configuration.
173    * Demux configuration has been changed since CHUKWA-581 to
174    * support mapping of both mapper and reducer, and this utility
175    * class is to detect the mapper class and return the mapper
176    * class only.
177    *
178    */
179   private String findProcessor(String processors, String defaultProcessor) {
180     if(processors.startsWith(",")) {
181       // No mapper class defined.
182       return defaultProcessor;
183     } else if(processors.contains(",")) {
184       // Both mapper and reducer defined.
185       String[] parsers = processors.split(",");
186       return parsers[0];
187     }
188     // No reducer defined.
189     return processors;
190   }
191 }