This project has retired. For details please refer to its
Attic page.
OozieAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor;
20
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.Calendar;
24 import java.util.TimeZone;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.hadoop.chukwa.ChunkImpl;
31 import org.apache.hadoop.chukwa.util.ChukwaUtil;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.chukwa.util.RestUtil;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.security.SecurityUtil;
36 import org.apache.hadoop.security.UserGroupInformation;
37 import org.apache.log4j.Logger;
38
39 public class OozieAdaptor extends AbstractAdaptor {
40
41 private static Logger log = Logger.getLogger(OozieAdaptor.class);
42 private String uri;
43
44 private long sendOffset;
45 private Configuration chukwaConfiguration = null;
46 private static UserGroupInformation UGI = null;
47 private boolean isKerberosEnabled = false;
48 private int length = 0;
49 private final ScheduledExecutorService scheduler = Executors
50 .newScheduledThreadPool(1);
51 private static final long initialDelay = 60;
52 private static long periodicity = 60;
53 private ScheduledFuture<?> scheduledCollectorThread;
54
55 @Override
56 public String parseArgs(String s) {
57 String[] tokens = s.split(" ");
58 if (tokens.length == 2) {
59 uri = tokens[0];
60 try {
61 periodicity = Integer.parseInt(tokens[1]);
62 } catch (NumberFormatException e) {
63 log.warn("OozieAdaptor: incorrect argument for period. Expecting number");
64 return null;
65 }
66 } else {
67 log.warn("bad syntax in OozieAdaptor args");
68 return null;
69 }
70 return s;
71 }
72
73 @Override
74 public void start(long offset) throws AdaptorException {
75 sendOffset = offset;
76 init();
77 log.info("Starting Oozie Adaptor with [ " + sendOffset + " ] offset");
78 scheduledCollectorThread = scheduler.scheduleAtFixedRate(
79 new OozieMetricsCollector(), initialDelay, periodicity,
80 TimeUnit.SECONDS);
81 log.info("scheduled");
82 }
83
84 @Override
85 public String getCurrentStatus() {
86 StringBuilder buffer = new StringBuilder();
87 buffer.append(type);
88 buffer.append(" ");
89 buffer.append(uri);
90 buffer.append(" ");
91 buffer.append(periodicity);
92 return buffer.toString();
93 }
94
95 @Override
96 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
97 throws AdaptorException {
98 scheduledCollectorThread.cancel(true);
99 scheduler.shutdown();
100 return sendOffset;
101 }
102
103 private class OozieMetricsCollector implements Runnable {
104 @Override
105 public void run() {
106 try {
107 if (isKerberosEnabled) {
108 if (UGI == null) {
109 throw new IllegalStateException("UGI Login context is null");
110 }
111
112 UGI.checkTGTAndReloginFromKeytab();
113 length = UGI.doAs(new PrivilegedExceptionAction<Integer>() {
114 @Override
115 public Integer run() throws Exception {
116 return processMetrics();
117 }
118 });
119
120 } else {
121 length = processMetrics();
122 }
123
124 if (length <= 0) {
125 log.warn("Oozie is either not responding or sending zero payload");
126 } else {
127 log.info("Processing a oozie instrumentation payload of [" + length
128 + "] bytes");
129 }
130 } catch (Exception e) {
131 log.error(ExceptionUtil.getStackTrace(e));
132 log.error("Exception occured while getting oozie metrics " + e);
133 }
134 }
135 }
136
137 private void init() {
138 if (getChukwaConfiguration() == null) {
139 setChukwaConfiguration(ChukwaUtil.readConfiguration());
140 }
141 String authType = getChukwaConfiguration().get(
142 "chukwaAgent.hadoop.authentication.type");
143 if (authType != null && authType.equalsIgnoreCase("kerberos")) {
144 login();
145 isKerberosEnabled = true;
146 }
147 }
148
149 private void login() {
150 try {
151 String principalConfig = getChukwaConfiguration().get(
152 "chukwaAgent.hadoop.authentication.kerberos.principal",
153 System.getProperty("user.name"));
154 String hostname = null;
155 String principalName = SecurityUtil.getServerPrincipal(principalConfig,
156 hostname);
157 UGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
158 principalName,
159 getChukwaConfiguration().get(
160 "chukwaAgent.hadoop.authentication.kerberos.keytab"));
161 } catch (IOException e) {
162 log.error(ExceptionUtil.getStackTrace(e));
163 }
164 }
165
166 private int processMetrics() {
167 return addChunkToReceiver(getOozieMetrics().getBytes());
168 }
169
170 private String getOozieMetrics() {
171 return RestUtil.getResponseAsString(uri);
172 }
173
174 public int addChunkToReceiver(byte[] data) {
175 try {
176 sendOffset += data.length;
177 ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, this);
178 long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
179 .getTimeInMillis();
180 c.addTag("timeStamp=\"" + rightNow + "\"");
181 dest.add(c);
182 } catch (Exception e) {
183 log.error(ExceptionUtil.getStackTrace(e));
184 }
185 return data.length;
186 }
187
188 public Configuration getChukwaConfiguration() {
189 return chukwaConfiguration;
190 }
191
192 public void setChukwaConfiguration(Configuration chukwaConfiguration) {
193 this.chukwaConfiguration = chukwaConfiguration;
194 }
195 }