1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.writer.solr;
1920import java.util.List;
2122import org.apache.hadoop.chukwa.Chunk;
23import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
24import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
25import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
26import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
27import org.apache.hadoop.chukwa.util.ExceptionUtil;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.log4j.Logger;
30import org.apache.solr.client.solrj.impl.CloudSolrServer;
31import org.apache.solr.common.SolrInputDocument;
3233publicclassSolrWriterextendsPipelineableWriter {
34privatestatic Logger log = Logger.getLogger(SolrWriter.class);
35privatestatic CloudSolrServer server;
36privatestatic String ID = "id";
37privatestatic String SEQ_ID = "seqId";
38privatestatic String DATA_TYPE = "type";
39privatestatic String STREAM_NAME = "stream";
40privatestatic String TAGS = "tags";
41privatestatic String SOURCE = "source";
42privatestatic String DATA = "data";
4344publicSolrWriter() throws WriterException {
45 init(ChukwaAgent.getStaticConfiguration());
46 }
4748 @Override
49publicvoid init(Configuration c) throws WriterException {
50 String serverName = c.get("solr.cloud.address");
51if (serverName == null) {
52thrownewWriterException("Solr server address is not defined.");
53 }
54 String collection = c.get("solr.collection", "logs");
55 server = new CloudSolrServer(serverName);
56 server.setDefaultCollection(collection);
57 }
5859 @Override
60publicvoid close() throws WriterException {
61 }
6263 @Override
64publicCommitStatus add(List<Chunk> chunks) throws WriterException {
65CommitStatus rv = ChukwaWriter.COMMIT_OK;
66for(Chunk chunk : chunks) {
67try {
68 SolrInputDocument doc = new SolrInputDocument();
69 doc.addField(ID, chunk.getSource() + "_" + chunk.getSeqID());
70 doc.addField(TAGS, chunk.getTags());
71 doc.addField(STREAM_NAME, chunk.getStreamName());
72 doc.addField(SOURCE, chunk.getSource());
73 doc.addField(SEQ_ID, chunk.getSeqID());
74 doc.addField(DATA_TYPE, chunk.getDataType());
75 doc.addField(DATA, new String(chunk.getData()));
76 server.add(doc);
77 server.commit();
78 } catch (Exception e) {
79 log.error(ExceptionUtil.getStackTrace(e));
80thrownewWriterException("Failed to store data to Solr Cloud.");
81 }
82 }
83if (next != null) {
84 rv = next.add(chunks); //pass data through85 }
86return rv;
87 }
88 }