This project has retired. For details please refer to its Attic page.
PipelineStageWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import java.io.IOException;
23  import java.util.List;
24  
25  import org.apache.hadoop.chukwa.Chunk;
26  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.security.SecurityUtil;
29  import org.apache.log4j.Logger;
30  
31  /**
32   * A pipeline of Pipelineable writers
33   * Controlled by option 'chukwaCollector.pipeline', which should be a comma-
34   * separated list of classnames. 
35   * 
36   */
37  public class PipelineStageWriter implements ChukwaWriter {
38    Logger log = Logger.getLogger(PipelineStageWriter.class);
39  
40    ChukwaWriter writer; // head of pipeline
41  
42    public PipelineStageWriter() throws WriterException {
43      Configuration conf = new ChukwaConfiguration();
44      init(conf);
45    }
46    
47    public PipelineStageWriter(Configuration conf) throws WriterException {
48      init(conf);
49    }
50    
51    @Override
52    public CommitStatus add(List<Chunk> chunks) throws WriterException {
53      return writer.add(chunks);
54    }
55  
56    @Override
57    public void close() throws WriterException {
58      writer.close();
59    }
60  
61    @Override
62    public void init(Configuration conf) throws WriterException {
63      if (conf.get("chukwa.pipeline") != null) {
64        String pipeline = conf.get("chukwa.pipeline");
65        try {
66          String[] classes = pipeline.split(",");
67          log.info("using pipelined writers, pipe length is " + classes.length);
68          PipelineableWriter lastWriter = null;
69          if (classes.length > 1) {
70            lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
71                .newInstance();
72            lastWriter.init(conf);
73            writer = lastWriter;
74          }
75  
76          for (int i = 1; i < classes.length - 1; ++i) {
77            Class<?> stageClass = conf.getClassByName(classes[i]);
78            Object st = stageClass.newInstance();
79            if (!(st instanceof PipelineableWriter))
80              log.error("class " + classes[i]
81                  + " in processing pipeline isn't a PipelineableWriter.");
82  
83            PipelineableWriter stage = (PipelineableWriter) stageClass
84                .newInstance();
85            stage.init(conf);
86            // throws exception if types don't match or class not found; this is
87            // OK.
88  
89            lastWriter.setNextStage(stage);
90            lastWriter = stage;
91          }
92          // if authentication type is kerberos; login using the specified kerberos principal and keytab file
93          for(int i=0; i<classes.length; i++) {
94            if(classes[i].contains("HBaseWriter")) {
95              loginToKerberos (conf);
96            }
97          }
98          
99          Class<?> stageClass = conf.getClassByName(classes[classes.length - 1]);
100         Object st = stageClass.newInstance();
101 
102         if (!(st instanceof ChukwaWriter)) {
103           log.error("class " + classes[classes.length - 1]
104               + " at end of processing pipeline isn't a ChukwaWriter");
105           throw new WriterException("bad pipeline");
106         } else {
107           ((ChukwaWriter)st).init(conf);
108           if (lastWriter != null)
109             lastWriter.setNextStage((ChukwaWriter) st);
110           else
111             writer = (ChukwaWriter) st; // one stage pipeline
112         }
113         return;
114       } catch (IOException | 
115           WriterException | 
116           ClassNotFoundException | 
117           IllegalAccessException | 
118           InstantiationException e) {
119         // if anything went wrong (missing class, etc) we wind up here.
120         log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
121         // fall through to default case
122         throw new WriterException("bad pipeline");
123       }
124     } else {
125       throw new WriterException("must set chukwa.pipeline");
126     }
127   }
128   
129   /**
130    * If authentication type is "kerberos", this method authenticates the Chukwa agent with Kerberized HBase, using the
131    * Kerberos principal and keytab file specified in chukwa-agent-conf.xml config file.<br>
132    * Does nothing for other authentication type.
133    * 
134    * @throws IOException in event of login failure
135    */
136   private static void loginToKerberos (Configuration config) throws IOException {
137     String agentAuthType = config.get ("chukwaAgent.hadoop.authentication.type");
138     if (null != agentAuthType && "kerberos".equalsIgnoreCase (agentAuthType)) {
139       SecurityUtil.login (config, "chukwaAgent.hadoop.authentication.kerberos.keytab",
140         "chukwaAgent.hadoop.authentication.kerberos.principal");
141     }
142   }
143 
144 }