1/*2 * AbstractMetricsContext.java3 *4 * Licensed to the Apache Software Foundation (ASF) under one5 * or more contributor license agreements. See the NOTICE file6 * distributed with this work for additional information7 * regarding copyright ownership. The ASF licenses this file8 * to you under the Apache License, Version 2.0 (the9 * "License"); you may not use this file except in compliance10 * with the License. You may obtain a copy of the License at11 *12 * http://www.apache.org/licenses/LICENSE-2.013 *14 * Unless required by applicable law or agreed to in writing, software15 * distributed under the License is distributed on an "AS IS" BASIS,16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.17 * See the License for the specific language governing permissions and18 * limitations under the License.19 */2021package org.apache.hadoop.metrics.spi;
2223import java.io.IOException;
24import java.util.ArrayList;
25import java.util.Collection;
26import java.util.HashMap;
27import java.util.HashSet;
28import java.util.Iterator;
29import java.util.Map;
30import java.util.Set;
31import java.util.Timer;
32import java.util.TimerTask;
33import java.util.TreeMap;
34import java.util.Map.Entry;
3536import org.apache.hadoop.metrics.ContextFactory;
37import org.apache.hadoop.metrics.MetricsContext;
38import org.apache.hadoop.metrics.MetricsException;
39import org.apache.hadoop.metrics.MetricsRecord;
40import org.apache.hadoop.metrics.Updater;
4142/**43 * The main class of the Service Provider Interface. This class should be44 * extended in order to integrate the Metrics API with a specific metrics45 * client library. 46 *47 * This class implements the internal table of metric data, and the timer48 * on which data is to be sent to the metrics system. Subclasses must49 * override the abstract <code>emitRecord</code> method in order to transmit50 * the data.51 */52publicabstractclassAbstractMetricsContextimplements MetricsContext {
5354privateint period = MetricsContext.DEFAULT_PERIOD;
55private Timer timer = null;
56privateboolean computeRate = true;
57private Set<Updater> updaters = new HashSet<Updater>(1);
58privatevolatileboolean isMonitoring = false;
5960private ContextFactory factory = null;
61private String contextName = null;
6263staticclassTagMapextends TreeMap<String,Object> {
64privatestaticfinallong serialVersionUID = 3546309335061952993L;
65TagMap() {
66super();
67 }
68TagMap(TagMap orig) {
69super(orig);
70 }
71/**72 * Returns true if this tagmap contains every tag in other.73 * @param other 74 * @return 75 */76publicboolean containsAll(TagMap other) {
77for (Map.Entry<String,Object> entry : other.entrySet()) {
78 Object value = get(entry.getKey());
79if (value == null || !value.equals(entry.getValue())) {
80// either key does not exist here, or the value is different81return false;
82 }
83 }
84returntrue;
85 }
86 }
8788staticclassMetricMapextends TreeMap<String,Number> {
89privatestaticfinallong serialVersionUID = -7495051861141631609L;
90 }
9192staticclassRecordMapextends HashMap<TagMap,MetricMap> {
93privatestaticfinallong serialVersionUID = 259835619700264611L;
94 }
9596private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
979899/**100 * Creates a new instance of AbstractMetricsContext101 */102protectedAbstractMetricsContext() {
103 }
104105/**106 * Initializes the context.107 */108publicvoid init(String contextName, ContextFactory factory)
109 {
110this.contextName = contextName;
111this.factory = factory;
112 }
113114/**115 * Convenience method for subclasses to access factory attributes.116 */117protected String getAttribute(String attributeName) {
118 String factoryAttribute = contextName + "." + attributeName;
119return (String) factory.getAttribute(factoryAttribute);
120 }
121122/**123 * Returns an attribute-value map derived from the factory attributes124 * by finding all factory attributes that begin with 125 * <i>contextName</i>.<i>tableName</i>. The returned map consists of126 * those attributes with the contextName and tableName stripped off.127 */128protected Map<String,String> getAttributeTable(String tableName) {
129 String prefix = contextName + "." + tableName + ".";
130 Map<String,String> result = new HashMap<String,String>();
131for (String attributeName : factory.getAttributeNames()) {
132if (attributeName.startsWith(prefix)) {
133 String name = attributeName.substring(prefix.length());
134 String value = (String) factory.getAttribute(attributeName);
135 result.put(name, value);
136 }
137 }
138return result;
139 }
140141/**142 * Returns the context name.143 */144public String getContextName() {
145return contextName;
146 }
147148/**149 * Returns the factory by which this context was created.150 * @return the factory by which the context was created151 */152public ContextFactory getContextFactory() {
153return factory;
154 }
155156/**157 * Starts or restarts monitoring, the emitting of metrics records.158 */159publicsynchronizedvoid startMonitoring()
160throws IOException {
161if (!isMonitoring) {
162 startTimer();
163 isMonitoring = true;
164 }
165 }
166167/**168 * Stops monitoring. This does not free buffered data. 169 * @see #close()170 */171publicsynchronizedvoid stopMonitoring() {
172if (isMonitoring) {
173 stopTimer();
174 isMonitoring = false;
175 }
176 }
177178/**179 * Returns true if monitoring is currently in progress.180 */181publicboolean isMonitoring() {
182return isMonitoring;
183 }
184185/**186 * Stops monitoring and frees buffered data, returning this187 * object to its initial state. 188 */189publicsynchronizedvoid close() {
190 stopMonitoring();
191 clearUpdaters();
192 }
193194/**195 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.196 * Throws an exception if the metrics implementation is configured with a fixed197 * set of record names and <code>recordName</code> is not in that set.198 * 199 * @param recordName the name of the record200 * @throws MetricsException if recordName conflicts with configuration data201 */202publicfinalsynchronized MetricsRecord createRecord(String recordName) {
203if (bufferedData.get(recordName) == null) {
204 bufferedData.put(recordName, newRecordMap());
205 }
206return newRecord(recordName);
207 }
208209/**210 * Subclasses should override this if they subclass MetricsRecordImpl.211 * @param recordName the name of the record212 * @return newly created instance of MetricsRecordImpl or subclass213 */214protected MetricsRecord newRecord(String recordName) {
215returnnew MetricsRecordImpl(recordName, this);
216 }
217218/**219 * Registers a callback to be called at time intervals determined by220 * the configuration.221 *222 * @param updater object to be run periodically; it should update223 * some metrics records 224 */225publicsynchronizedvoid registerUpdater(final Updater updater) {
226if (!updaters.contains(updater)) {
227 updaters.add(updater);
228 }
229 }
230231/**232 * Removes a callback, if it exists.233 *234 * @param updater object to be removed from the callback list235 */236publicsynchronizedvoid unregisterUpdater(Updater updater) {
237 updaters.remove(updater);
238 }
239240privatesynchronizedvoid clearUpdaters() {
241 updaters.clear();
242 }
243244/**245 * Starts timer if it is not already started246 */247privatesynchronizedvoid startTimer() {
248if (timer == null) {
249 timer = new Timer("Timer thread for monitoring " + getContextName(),
250true);
251 TimerTask task = new TimerTask() {
252publicvoid run() {
253try {
254 timerEvent();
255 }
256catch (IOException ioe) {
257 ioe.printStackTrace();
258 }
259 }
260 };
261long millis = period * 1000;
262 timer.scheduleAtFixedRate(task, millis, millis);
263 }
264 }
265266/**267 * Stops timer if it is running268 */269privatesynchronizedvoid stopTimer() {
270if (timer != null) {
271 timer.cancel();
272 timer = null;
273 }
274 }
275276/**277 * Timer callback.278 */279privatevoid timerEvent() throws IOException {
280if (isMonitoring) {
281 Collection<Updater> myUpdaters;
282synchronized (this) {
283 myUpdaters = new ArrayList<Updater>(updaters);
284 }
285// Run all the registered updates without holding a lock286// on this context287for (Updater updater : myUpdaters) {
288try {
289 updater.doUpdates(this);
290 }
291catch (Throwable throwable) {
292 throwable.printStackTrace();
293 }
294 }
295 emitRecords();
296 }
297 }
298299/**300 * Emits the records.301 */302privatesynchronizedvoid emitRecords() throws IOException {
303for (Entry<String, RecordMap> record : bufferedData.entrySet()) {
304 String recordName = record.getKey();
305RecordMap recordMap = record.getValue();
306for (Entry<TagMap, MetricMap> entry : record.getValue().entrySet()) {
307 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
308 emitRecord(contextName, recordName, outRec);
309 }
310 }
311 flush();
312 }
313314/**315 * Sends a record to the metrics system.316 */317protectedabstractvoid emitRecord(String contextName, String recordName,
318 OutputRecord outRec) throws IOException;
319320/**321 * Called each period after all records have been emitted, this method does nothing.322 * Subclasses may override it in order to perform some kind of flush.323 */324protectedvoid flush() throws IOException {
325 }
326327/**328 * Called by MetricsRecordImpl.update(). Creates or updates a row in329 * the internal table of metric data.330 */331protectedvoid update(MetricsRecordImpl record) {
332333 String recordName = record.getRecordName();
334TagMap tagTable = record.getTagTable();
335 Map<String,MetricValue> metricUpdates = record.getMetricTable();
336337RecordMap recordMap = getRecordMap(recordName);
338synchronized (recordMap) {
339MetricMap metricMap = recordMap.get(tagTable);
340if (metricMap == null) {
341 metricMap = newMetricMap();
342TagMap tagMap = newTagMap(tagTable); // clone tags343 recordMap.put(tagMap, metricMap);
344 }
345346 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
347for (Entry<String, MetricValue> entry : entrySet) {
348 String metricName = entry.getKey ();
349 MetricValue updateValue = entry.getValue ();
350 Number updateNumber = updateValue.getNumber();
351 Number currentNumber = metricMap.get(metricName);
352if (currentNumber == null || updateValue.isAbsolute()) {
353 metricMap.put(metricName, updateNumber);
354 }
355else {
356 Number newNumber = sum(updateNumber, currentNumber);
357 metricMap.put(metricName, newNumber);
358 metricMap.put(metricName+"_raw", updateNumber);
359if (computeRate ) {
360double rate = updateNumber.doubleValue() * 60.0 / period;
361 metricMap.put(metricName+"_rate", rate);
362 }
363 computeRate = true;
364 }
365 }
366 }
367 }
368369privatesynchronizedRecordMap getRecordMap(String recordName) {
370return bufferedData.get(recordName);
371 }
372373/**374 * Adds two numbers, coercing the second to the type of the first.375 *376 */377private Number sum(Number a, Number b) {
378if (a instanceof Integer) {
379return Integer.valueOf(a.intValue() + b.intValue());
380 }
381elseif (a instanceof Float) {
382returnnew Float(a.floatValue() + b.floatValue());
383 }
384elseif (a instanceof Short) {
385return Short.valueOf((short)(a.shortValue() + b.shortValue()));
386 }
387elseif (a instanceof Byte) {
388return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
389 }
390elseif (a instanceof Long) {
391return Long.valueOf((a.longValue() + b.longValue()));
392 }
393else {
394// should never happen395thrownew MetricsException("Invalid number type");
396 }
397398 }
399400/**401 * Called by MetricsRecordImpl.remove(). Removes all matching rows in402 * the internal table of metric data. A row matches if it has the same403 * tag names and values as record, but it may also have additional404 * tags.405 */406protectedvoid remove(MetricsRecordImpl record) {
407 String recordName = record.getRecordName();
408TagMap tagTable = record.getTagTable();
409410RecordMap recordMap = getRecordMap(recordName);
411synchronized (recordMap) {
412 Iterator<TagMap> it = recordMap.keySet().iterator();
413while (it.hasNext()) {
414TagMap rowTags = it.next();
415if (rowTags.containsAll(tagTable)) {
416 it.remove();
417 }
418 }
419 }
420 }
421422/**423 * Returns the timer period.424 */425publicint getPeriod() {
426return period;
427 }
428429/**430 * Sets the timer period431 */432protectedvoid setPeriod(int period) {
433this.period = period;
434 }
435 }