This project has retired. For details please refer to its Attic page.
AbstractMetricsContext xref
View Javadoc

1   /*
2    * AbstractMetricsContext.java
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.metrics.spi;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.TreeMap;
34  import java.util.Map.Entry;
35  
36  import org.apache.hadoop.metrics.ContextFactory;
37  import org.apache.hadoop.metrics.MetricsContext;
38  import org.apache.hadoop.metrics.MetricsException;
39  import org.apache.hadoop.metrics.MetricsRecord;
40  import org.apache.hadoop.metrics.Updater;
41  
42  /**
43   * The main class of the Service Provider Interface.  This class should be
44   * extended in order to integrate the Metrics API with a specific metrics
45   * client library.
46   *
47   * This class implements the internal table of metric data, and the timer
48   * on which data is to be sent to the metrics system.  Subclasses must
49   * override the abstract <code>emitRecord</code> method in order to transmit
50   * the data.
51   */
52  public abstract class AbstractMetricsContext implements MetricsContext {
53      
54    private int period = MetricsContext.DEFAULT_PERIOD;
55    private Timer timer = null;
56    private boolean computeRate = true;    
57    private Set<Updater> updaters = new HashSet<Updater>(1);
58    private volatile boolean isMonitoring = false;
59      
60    private ContextFactory factory = null;
61    private String contextName = null;
62      
63    static class TagMap extends TreeMap<String,Object> {
64      private static final long serialVersionUID = 3546309335061952993L;
65      TagMap() {
66        super();
67      }
68      TagMap(TagMap orig) {
69        super(orig);
70      }
71      /**
72       * Returns true if this tagmap contains every tag in other.
73       * @param other 
74       * @return 
75       */
76      public boolean containsAll(TagMap other) {
77        for (Map.Entry<String,Object> entry : other.entrySet()) {
78          Object value = get(entry.getKey());
79          if (value == null || !value.equals(entry.getValue())) {
80            // either key does not exist here, or the value is different
81            return false;
82          }
83        }
84        return true;
85      }
86    }
87    
88    static class MetricMap extends TreeMap<String,Number> {
89      private static final long serialVersionUID = -7495051861141631609L;
90    }
91              
92    static class RecordMap extends HashMap<TagMap,MetricMap> {
93      private static final long serialVersionUID = 259835619700264611L;
94    }
95      
96    private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
97      
98  
99    /**
100    * Creates a new instance of AbstractMetricsContext
101    */
102   protected AbstractMetricsContext() {
103   }
104     
105   /**
106    * Initializes the context.
107    */
108   public void init(String contextName, ContextFactory factory) 
109   {
110     this.contextName = contextName;
111     this.factory = factory;
112   }
113     
114   /**
115    * Convenience method for subclasses to access factory attributes.
116    */
117   protected String getAttribute(String attributeName) {
118     String factoryAttribute = contextName + "." + attributeName;
119     return (String) factory.getAttribute(factoryAttribute);  
120   }
121     
122   /**
123    * Returns an attribute-value map derived from the factory attributes
124    * by finding all factory attributes that begin with 
125    * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
126    * those attributes with the contextName and tableName stripped off.
127    */
128   protected Map<String,String> getAttributeTable(String tableName) {
129     String prefix = contextName + "." + tableName + ".";
130     Map<String,String> result = new HashMap<String,String>();
131     for (String attributeName : factory.getAttributeNames()) {
132       if (attributeName.startsWith(prefix)) {
133         String name = attributeName.substring(prefix.length());
134         String value = (String) factory.getAttribute(attributeName);
135         result.put(name, value);
136       }
137     }
138     return result;
139   }
140     
141   /**
142    * Returns the context name.
143    */
144   public String getContextName() {
145     return contextName;
146   }
147     
148   /**
149    * Returns the factory by which this context was created.
150    * @return 
151    */
152   public ContextFactory getContextFactory() {
153     return factory;
154   }
155     
156   /**
157    * Starts or restarts monitoring, the emitting of metrics records.
158    */
159   public synchronized void startMonitoring()
160     throws IOException {
161     if (!isMonitoring) {
162       startTimer();
163       isMonitoring = true;
164     }
165   }
166     
167   /**
168    * Stops monitoring.  This does not free buffered data. 
169    * @see #close()
170    */
171   public synchronized void stopMonitoring() {
172     if (isMonitoring) {
173       stopTimer();
174       isMonitoring = false;
175     }
176   }
177     
178   /**
179    * Returns true if monitoring is currently in progress.
180    */
181   public boolean isMonitoring() {
182     return isMonitoring;
183   }
184     
185   /**
186    * Stops monitoring and frees buffered data, returning this
187    * object to its initial state.  
188    */
189   public synchronized void close() {
190     stopMonitoring();
191     clearUpdaters();
192   } 
193     
194   /**
195    * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
196    * Throws an exception if the metrics implementation is configured with a fixed
197    * set of record names and <code>recordName</code> is not in that set.
198    * 
199    * @param recordName the name of the record
200    * @throws MetricsException if recordName conflicts with configuration data
201    */
202   public final synchronized MetricsRecord createRecord(String recordName) {
203     if (bufferedData.get(recordName) == null) {
204       bufferedData.put(recordName, new RecordMap());
205     }
206     return newRecord(recordName);
207   }
208     
209   /**
210    * Subclasses should override this if they subclass MetricsRecordImpl.
211    * @param recordName the name of the record
212    * @return newly created instance of MetricsRecordImpl or subclass
213    */
214   protected MetricsRecord newRecord(String recordName) {
215     return new MetricsRecordImpl(recordName, this);
216   }
217     
218   /**
219    * Registers a callback to be called at time intervals determined by
220    * the configuration.
221    *
222    * @param updater object to be run periodically; it should update
223    * some metrics records 
224    */
225   public synchronized void registerUpdater(final Updater updater) {
226     if (!updaters.contains(updater)) {
227       updaters.add(updater);
228     }
229   }
230     
231   /**
232    * Removes a callback, if it exists.
233    *
234    * @param updater object to be removed from the callback list
235    */
236   public synchronized void unregisterUpdater(Updater updater) {
237     updaters.remove(updater);
238   }
239     
240   private synchronized void clearUpdaters() {
241     updaters.clear();
242   }
243     
244   /**
245    * Starts timer if it is not already started
246    */
247   private synchronized void startTimer() {
248     if (timer == null) {
249       timer = new Timer("Timer thread for monitoring " + getContextName(), 
250                         true);
251       TimerTask task = new TimerTask() {
252           public void run() {
253             try {
254               timerEvent();
255             }
256             catch (IOException ioe) {
257               ioe.printStackTrace();
258             }
259           }
260         };
261       long millis = period * 1000;
262       timer.scheduleAtFixedRate(task, millis, millis);
263     }
264   }
265     
266   /**
267    * Stops timer if it is running
268    */
269   private synchronized void stopTimer() {
270     if (timer != null) {
271       timer.cancel();
272       timer = null;
273     }
274   }
275     
276   /**
277    * Timer callback.
278    */
279   private void timerEvent() throws IOException {
280     if (isMonitoring) {
281       Collection<Updater> myUpdaters;
282       synchronized (this) {
283         myUpdaters = new ArrayList<Updater>(updaters);
284       }     
285       // Run all the registered updates without holding a lock
286       // on this context
287       for (Updater updater : myUpdaters) {
288         try {
289           updater.doUpdates(this);
290         }
291         catch (Throwable throwable) {
292           throwable.printStackTrace();
293         }
294       }
295       emitRecords();
296     }
297   }
298     
299   /**
300    *  Emits the records.
301    */
302   private synchronized void emitRecords() throws IOException {
303     for (Entry<String, RecordMap> record : bufferedData.entrySet()) {
304       String recordName = record.getKey();
305       RecordMap recordMap = record.getValue();
306       for (Entry<TagMap, MetricMap> entry : record.getValue().entrySet()) {
307         OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
308         emitRecord(contextName, recordName, outRec);
309       }
310     }
311     flush();
312   }
313 
314   /**
315    * Sends a record to the metrics system.
316    */
317   protected abstract void emitRecord(String contextName, String recordName, 
318                                      OutputRecord outRec) throws IOException;
319     
320   /**
321    * Called each period after all records have been emitted, this method does nothing.
322    * Subclasses may override it in order to perform some kind of flush.
323    */
324   protected void flush() throws IOException {
325   }
326     
327   /**
328    * Called by MetricsRecordImpl.update().  Creates or updates a row in
329    * the internal table of metric data.
330    */
331   protected void update(MetricsRecordImpl record) {
332     
333     String recordName = record.getRecordName();
334     TagMap tagTable = record.getTagTable();
335     Map<String,MetricValue> metricUpdates = record.getMetricTable();
336         
337     RecordMap recordMap = getRecordMap(recordName);
338     synchronized (recordMap) {
339       MetricMap metricMap = recordMap.get(tagTable);
340       if (metricMap == null) {
341         metricMap = new MetricMap();
342         TagMap tagMap = new TagMap(tagTable); // clone tags
343         recordMap.put(tagMap, metricMap);
344       }
345 
346       Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
347       for (Entry<String, MetricValue> entry : entrySet) {
348         String metricName = entry.getKey ();
349         MetricValue updateValue = entry.getValue ();
350         Number updateNumber = updateValue.getNumber();
351         Number currentNumber = metricMap.get(metricName);
352         if (currentNumber == null || updateValue.isAbsolute()) {
353           metricMap.put(metricName, updateNumber);
354         }
355         else {
356           Number newNumber = sum(updateNumber, currentNumber);
357           metricMap.put(metricName, newNumber);
358           metricMap.put(metricName+"_raw", updateNumber);
359           if (computeRate ) {
360               double rate = updateNumber.doubleValue() * 60.0 / period;
361               metricMap.put(metricName+"_rate", rate);
362             }
363           computeRate = true;
364         }
365       }
366     }
367   }
368     
369   private synchronized RecordMap getRecordMap(String recordName) {
370     return bufferedData.get(recordName);
371   }
372     
373   /**
374    * Adds two numbers, coercing the second to the type of the first.
375    *
376    */
377   private Number sum(Number a, Number b) {
378     if (a instanceof Integer) {
379       return Integer.valueOf(a.intValue() + b.intValue());
380     }
381     else if (a instanceof Float) {
382       return new Float(a.floatValue() + b.floatValue());
383     }
384     else if (a instanceof Short) {
385       return Short.valueOf((short)(a.shortValue() + b.shortValue()));
386     }
387     else if (a instanceof Byte) {
388       return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
389     }
390     else if (a instanceof Long) {
391       return Long.valueOf((a.longValue() + b.longValue()));
392     }
393     else {
394       // should never happen
395       throw new MetricsException("Invalid number type");
396     }
397             
398   }
399     
400   /**
401    * Called by MetricsRecordImpl.remove().  Removes all matching rows in
402    * the internal table of metric data.  A row matches if it has the same
403    * tag names and values as record, but it may also have additional
404    * tags.
405    */    
406   protected void remove(MetricsRecordImpl record) {
407     String recordName = record.getRecordName();
408     TagMap tagTable = record.getTagTable();
409         
410     RecordMap recordMap = getRecordMap(recordName);
411     synchronized (recordMap) {
412       Iterator<TagMap> it = recordMap.keySet().iterator();
413       while (it.hasNext()) {
414         TagMap rowTags = it.next();
415         if (rowTags.containsAll(tagTable)) {
416           it.remove();
417         }
418       }
419     }
420   }
421     
422   /**
423    * Returns the timer period.
424    */
425   public int getPeriod() {
426     return period;
427   }
428     
429   /**
430    * Sets the timer period
431    */
432   protected void setPeriod(int period) {
433     this.period = period;
434   }
435 }