This project has retired. For details please refer to its Attic page.
GoraWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer.gora;
19  
20  import java.nio.ByteBuffer;
21  import java.util.List;
22  
23  import org.apache.gora.store.DataStore;
24  import org.apache.gora.store.DataStoreFactory;
25  import org.apache.gora.util.GoraException;
26  import org.apache.hadoop.chukwa.Chunk;
27  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
29  import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
30  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
31  import org.apache.hadoop.chukwa.datacollection.writer.solr.SolrWriter;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.log4j.Logger;
35  
36  /**
37   * This class leverages <a href="http://gora.apache.org">Apache Gora</a>
38   * as a pipeline writer implementation for mapping Chukwa data chunks and 
39   * metadata as {@link org.apache.hadoop.chukwa.datacollection.writer.gora.ChukwaChunk}'s. 
40   *
41   */
42  public class GoraWriter extends PipelineableWriter {
43    
44    private static Logger log = Logger.getLogger(SolrWriter.class);
45    
46    DataStore<String, ChukwaChunk> chunkStore;
47  
48    /**
49     * Default constructor for this class.
50     * @throws WriterException if error writing
51     */
52    public GoraWriter() throws WriterException {
53      log.debug("Initializing configuration for GoraWriter pipeline...");
54      init(ChukwaAgent.getStaticConfiguration());
55    }
56  
57    /**
58     * {@link org.apache.gora.store.DataStore} objects are created from a factory. It is necessary to 
59     * provide the key and value class. The datastore class parameters is optional, 
60     * and if not specified it will be read from the <code>gora.properties</code> file.
61     * @throws WriterException if error occurs
62     * @see org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter#init(org.apache.hadoop.conf.Configuration)
63     */
64    @Override
65    public void init(Configuration c) throws WriterException {
66      try {
67        chunkStore = DataStoreFactory.getDataStore(String.class, ChukwaChunk.class, c);
68      } catch (GoraException e) {
69        log.error(ExceptionUtil.getStackTrace(e));
70        e.printStackTrace();
71      } 
72    }
73  
74    /**
75     * <p>
76     * If the {@link org.apache.gora.store.DataStore} instance is not null, we
77     * execute a {@link org.apache.gora.store.DataStore#flush()}. This forces 
78     * the write caches to be flushed. DataStore implementations may optimize 
79     * their writing by deferring the actual put / delete operations until 
80     * this moment.
81     * </p>
82     * <p>Otherwise, we utilize {@link org.apache.gora.store.DataStore#close()}
83     * which closes the DataStore. This should release any resources held by 
84     * the implementation, so that the instance is ready for GC. All other 
85     * DataStore methods cannot be used after this method was called. 
86     * Subsequent calls of this method are ignored.
87     * </p>  
88     * @see org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter#close()
89     */
90    @Override
91    public void close() throws WriterException {
92      if (chunkStore != null) {
93        chunkStore.flush();
94      } else {
95        chunkStore.close();
96      }
97      log.debug("Gora datastore successfully closed.");
98    }
99  
100   @Override
101   public CommitStatus add(List<Chunk> chunks) throws WriterException {
102     CommitStatus cStatus = ChukwaWriter.COMMIT_OK;
103     for(Chunk chunk : chunks) {
104       try {
105         ChukwaChunk chukwaChunk = ChukwaChunk.newBuilder().build();
106         chukwaChunk.setSource(chunk.getSource());
107         chukwaChunk.setDatatype(chunk.getDataType());
108         chukwaChunk.setSequenceID(chunk.getSeqID());
109         chukwaChunk.setName(chunk.getStreamName());
110         chukwaChunk.setTags(chunk.getTags());
111         chukwaChunk.setData(ByteBuffer.wrap(chunk.getData()));
112       } catch (Exception e) {
113         log.error(ExceptionUtil.getStackTrace(e));
114         throw new WriterException("Failed to store data to Solr Cloud.");
115       }
116     }
117     if (next != null) {
118       cStatus = next.add(chunks); //pass data through
119     }
120     return cStatus;
121   }
122 }