This project has retired. For details please refer to its Attic page.
OozieAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.Calendar;
24  import java.util.TimeZone;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.apache.hadoop.chukwa.ChunkImpl;
31  import org.apache.hadoop.chukwa.util.ChukwaUtil;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  import org.apache.hadoop.chukwa.util.RestUtil;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.security.SecurityUtil;
36  import org.apache.hadoop.security.UserGroupInformation;
37  import org.apache.log4j.Logger;
38  
39  public class OozieAdaptor extends AbstractAdaptor {
40  
41    private static Logger log = Logger.getLogger(OozieAdaptor.class);
42    private String uri;
43  
44    private long sendOffset;
45    private Configuration chukwaConfiguration = null;
46    private static UserGroupInformation UGI = null;
47    private boolean isKerberosEnabled = false;
48    private int length = 0;
49    private final ScheduledExecutorService scheduler = Executors
50        .newScheduledThreadPool(1);
51    private static final long initialDelay = 60; // seconds
52    private static long periodicity = 60; // seconds
53    private ScheduledFuture<?> scheduledCollectorThread;
54  
55    @Override
56    public String parseArgs(String s) {
57      String[] tokens = s.split(" ");
58      if (tokens.length == 2) {
59        uri = tokens[0];
60        try {
61          periodicity = Integer.parseInt(tokens[1]);
62        } catch (NumberFormatException e) {
63          log.warn("OozieAdaptor: incorrect argument for period. Expecting number");
64          return null;
65        }
66      } else {
67        log.warn("bad syntax in OozieAdaptor args");
68        return null;
69      }
70      return s;
71    }
72  
73    @Override
74    public void start(long offset) throws AdaptorException {
75      sendOffset = offset;
76      init(); // initialize the configuration
77      log.info("Starting Oozie Adaptor with [ " + sendOffset + " ] offset");
78      scheduledCollectorThread = scheduler.scheduleAtFixedRate(
79          new OozieMetricsCollector(), initialDelay, periodicity,
80          TimeUnit.SECONDS);
81      log.info("scheduled");
82    }
83  
84    @Override
85    public String getCurrentStatus() {
86      StringBuilder buffer = new StringBuilder();
87      buffer.append(type);
88      buffer.append(" ");
89      buffer.append(uri);
90      buffer.append(" ");
91      buffer.append(periodicity);
92      return buffer.toString();
93    }
94  
95    @Override
96    public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
97        throws AdaptorException {
98      scheduledCollectorThread.cancel(true);
99      scheduler.shutdown();
100     return sendOffset;
101   }
102 
103   private class OozieMetricsCollector implements Runnable {
104     @Override
105     public void run() {
106       try {
107         if (isKerberosEnabled) {
108           if (UGI == null) {
109             throw new IllegalStateException("UGI Login context is null");
110           }
111 
112           UGI.checkTGTAndReloginFromKeytab();
113           length = UGI.doAs(new PrivilegedExceptionAction<Integer>() {
114             @Override
115             public Integer run() throws Exception {
116               return processMetrics();
117             }
118           });
119 
120         } else {
121           length = processMetrics();
122         }
123 
124         if (length <= 0) {
125           log.warn("Oozie is either not responding or sending zero payload");
126         } else {
127           log.info("Processing a oozie instrumentation payload of [" + length
128               + "] bytes");
129         }
130       } catch (Exception e) {
131         log.error(ExceptionUtil.getStackTrace(e));
132         log.error("Exception occured while getting oozie metrics " + e);
133       }
134     }
135   }
136 
137   private void init() {
138     if (getChukwaConfiguration() == null) {
139       setChukwaConfiguration(ChukwaUtil.readConfiguration());
140     }
141     String authType = getChukwaConfiguration().get(
142         "chukwaAgent.hadoop.authentication.type");
143     if (authType != null && authType.equalsIgnoreCase("kerberos")) {
144       login(); // get the UGI context
145       isKerberosEnabled = true;
146     }
147   }
148 
149   private void login() {
150     try {
151       String principalConfig = getChukwaConfiguration().get(
152           "chukwaAgent.hadoop.authentication.kerberos.principal",
153           System.getProperty("user.name"));
154       String hostname = null;
155       String principalName = SecurityUtil.getServerPrincipal(principalConfig,
156           hostname);
157       UGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
158           principalName,
159           getChukwaConfiguration().get(
160               "chukwaAgent.hadoop.authentication.kerberos.keytab"));
161     } catch (IOException e) {
162       log.error(ExceptionUtil.getStackTrace(e));
163     }
164   }
165 
166   private int processMetrics() {
167     return addChunkToReceiver(getOozieMetrics().getBytes());
168   }
169 
170   private String getOozieMetrics() {
171     return RestUtil.getResponseAsString(uri);
172   }
173 
174   public int addChunkToReceiver(byte[] data) {
175     try {
176       sendOffset += data.length;
177       ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, this);
178       long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
179           .getTimeInMillis();
180       c.addTag("timeStamp=\"" + rightNow + "\"");
181       dest.add(c);
182     } catch (Exception e) {
183       log.error(ExceptionUtil.getStackTrace(e));
184     }
185     return data.length;
186   }
187 
188   public Configuration getChukwaConfiguration() {
189     return chukwaConfiguration;
190   }
191 
192   public void setChukwaConfiguration(Configuration chukwaConfiguration) {
193     this.chukwaConfiguration = chukwaConfiguration;
194   }
195 }