1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.writer.gora;
1920import java.nio.ByteBuffer;
21import java.util.List;
2223import org.apache.gora.store.DataStore;
24import org.apache.gora.store.DataStoreFactory;
25import org.apache.gora.util.GoraException;
26import org.apache.hadoop.chukwa.Chunk;
27import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
29import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
30import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
31import org.apache.hadoop.chukwa.datacollection.writer.solr.SolrWriter;
32import org.apache.hadoop.chukwa.util.ExceptionUtil;
33import org.apache.hadoop.conf.Configuration;
34import org.apache.log4j.Logger;
3536/**37 * This class leverages <a href="http://gora.apache.org">Apache Gora</a>38 * as a pipeline writer implementation for mapping Chukwa data chunks and 39 * metadata as {@link org.apache.hadoop.chukwa.datacollection.writer.gora.ChukwaChunk}'s. 40 *41 */42publicclassGoraWriterextendsPipelineableWriter {
4344privatestatic Logger log = Logger.getLogger(SolrWriter.class);
4546 DataStore<String, ChukwaChunk> chunkStore;
4748/**49 * Default constructor for this class.50 * @throws WriterException if error writing51 */52publicGoraWriter() throws WriterException {
53 log.debug("Initializing configuration for GoraWriter pipeline...");
54 init(ChukwaAgent.getStaticConfiguration());
55 }
5657/**58 * {@link org.apache.gora.store.DataStore} objects are created from a factory. It is necessary to 59 * provide the key and value class. The datastore class parameters is optional, 60 * and if not specified it will be read from the <code>gora.properties</code> file.61 * @throws WriterException if error occurs62 * @see org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter#init(org.apache.hadoop.conf.Configuration)63 */64 @Override
65publicvoid init(Configuration c) throws WriterException {
66try {
67 chunkStore = DataStoreFactory.getDataStore(String.class, ChukwaChunk.class, c);
68 } catch (GoraException e) {
69 log.error(ExceptionUtil.getStackTrace(e));
70 e.printStackTrace();
71 }
72 }
7374/**75 * <p>76 * If the {@link org.apache.gora.store.DataStore} instance is not null, we77 * execute a {@link org.apache.gora.store.DataStore#flush()}. This forces 78 * the write caches to be flushed. DataStore implementations may optimize 79 * their writing by deferring the actual put / delete operations until 80 * this moment.81 * </p>82 * <p>Otherwise, we utilize {@link org.apache.gora.store.DataStore#close()}83 * which closes the DataStore. This should release any resources held by 84 * the implementation, so that the instance is ready for GC. All other 85 * DataStore methods cannot be used after this method was called. 86 * Subsequent calls of this method are ignored.87 * </p> 88 * @see org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter#close()89 */90 @Override
91publicvoid close() throws WriterException {
92if (chunkStore != null) {
93 chunkStore.flush();
94 } else {
95 chunkStore.close();
96 }
97 log.debug("Gora datastore successfully closed.");
98 }
99100 @Override
101publicCommitStatus add(List<Chunk> chunks) throws WriterException {
102CommitStatus cStatus = ChukwaWriter.COMMIT_OK;
103for(Chunk chunk : chunks) {
104try {
105ChukwaChunk chukwaChunk = ChukwaChunk.newBuilder().build();
106 chukwaChunk.setSource(chunk.getSource());
107 chukwaChunk.setDatatype(chunk.getDataType());
108 chukwaChunk.setSequenceID(chunk.getSeqID());
109 chukwaChunk.setName(chunk.getStreamName());
110 chukwaChunk.setTags(chunk.getTags());
111 chukwaChunk.setData(ByteBuffer.wrap(chunk.getData()));
112 } catch (Exception e) {
113 log.error(ExceptionUtil.getStackTrace(e));
114thrownewWriterException("Failed to store data to Solr Cloud.");
115 }
116 }
117if (next != null) {
118 cStatus = next.add(chunks); //pass data through119 }
120return cStatus;
121 }
122 }