1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.datacollection.writer;
202122import java.io.IOException;
23import java.util.List;
2425import org.apache.hadoop.chukwa.Chunk;
26import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.security.SecurityUtil;
29import org.apache.log4j.Logger;
3031/**32 * A pipeline of Pipelineable writers33 * Controlled by option 'chukwaCollector.pipeline', which should be a comma-34 * separated list of classnames. 35 * 36 */37publicclassPipelineStageWriterimplementsChukwaWriter {
38 Logger log = Logger.getLogger(PipelineStageWriter.class);
3940ChukwaWriter writer; // head of pipeline4142publicPipelineStageWriter() throws WriterException {
43 Configuration conf = newChukwaConfiguration();
44 init(conf);
45 }
4647publicPipelineStageWriter(Configuration conf) throws WriterException {
48 init(conf);
49 }
5051 @Override
52publicCommitStatus add(List<Chunk> chunks) throws WriterException {
53return writer.add(chunks);
54 }
5556 @Override
57publicvoid close() throws WriterException {
58 writer.close();
59 }
6061 @Override
62publicvoid init(Configuration conf) throws WriterException {
63if (conf.get("chukwa.pipeline") != null) {
64 String pipeline = conf.get("chukwa.pipeline");
65try {
66 String[] classes = pipeline.split(",");
67 log.info("using pipelined writers, pipe length is " + classes.length);
68PipelineableWriter lastWriter = null;
69if (classes.length > 1) {
70 lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
71 .newInstance();
72 lastWriter.init(conf);
73 writer = lastWriter;
74 }
7576for (int i = 1; i < classes.length - 1; ++i) {
77 Class<?> stageClass = conf.getClassByName(classes[i]);
78 Object st = stageClass.newInstance();
79if (!(st instanceof PipelineableWriter))
80 log.error("class " + classes[i]
81 + " in processing pipeline isn't a PipelineableWriter.");
8283PipelineableWriter stage = (PipelineableWriter) stageClass
84 .newInstance();
85 stage.init(conf);
86// throws exception if types don't match or class not found; this is87// OK.8889 lastWriter.setNextStage(stage);
90 lastWriter = stage;
91 }
92// if authentication type is kerberos; login using the specified kerberos principal and keytab file93for(int i=0; i<classes.length; i++) {
94if(classes[i].contains("HBaseWriter")) {
95 loginToKerberos (conf);
96 }
97 }
9899 Class<?> stageClass = conf.getClassByName(classes[classes.length - 1]);
100 Object st = stageClass.newInstance();
101102if (!(st instanceof ChukwaWriter)) {
103 log.error("class " + classes[classes.length - 1]
104 + " at end of processing pipeline isn't a ChukwaWriter");
105thrownewWriterException("bad pipeline");
106 } else {
107 ((ChukwaWriter)st).init(conf);
108if (lastWriter != null)
109 lastWriter.setNextStage((ChukwaWriter) st);
110else111 writer = (ChukwaWriter) st; // one stage pipeline112 }
113return;
114 } catch (Exception e) {
115// if anything went wrong (missing class, etc) we wind up here.116 log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
117// fall through to default case118thrownewWriterException("bad pipeline");
119 }
120 } else {
121thrownewWriterException("must set chukwa.pipeline");
122 }
123 }
124125/**126 * If authentication type is "kerberos", this method authenticates the Chukwa agent with Kerberized HBase, using the127 * Kerberos principal and keytab file specified in chukwa-agent-conf.xml config file.<br>128 * Does nothing for other authentication type.129 * 130 * @throws IOException in event of login failure131 */132privatestaticvoid loginToKerberos (Configuration config) throws IOException {
133 String agentAuthType = config.get ("chukwaAgent.hadoop.authentication.type");
134if (null != agentAuthType && "kerberos".equalsIgnoreCase (agentAuthType)) {
135 SecurityUtil.login (config, "chukwaAgent.hadoop.authentication.kerberos.keytab",
136"chukwaAgent.hadoop.authentication.kerberos.principal");
137 }
138 }
139140 }