This project has retired. For details please refer to its Attic page.
OffsetStatsManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection;
19  
20  import org.apache.log4j.Logger;
21  
22  import java.util.Map;
23  import java.util.LinkedList;
24  import java.util.Date;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Manages stats for multiple objects of type T. T can be any class that is used
30   * as a key for offset statistics (i.e. Agent, Collector, etc.). A client would
31   * create an instance of this class and call <code>addOffsetDataPoint<code>
32   * repeatedly over time. Then <code>calcAverageRate</code> can be called to
33   * retrieve the average offset-unit per second over a given time interval.
34   * <P>
35   * For a given object T that is actively adding data points, stats are kept for
36   * up to 20 minutes.
37   * <P>
38   * Care should be taken to always call <code>remove()</code> when old T objects
39   * should no longer be tracked.
40   * @param <T> 
41   */
42  public class OffsetStatsManager<T> {
43    protected Logger log = Logger.getLogger(getClass());
44  
45    /*
46     * This value is how far back we keep data for. Old data is purge when new
47     * data is added.
48     */
49    private static long DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L; // 20 minutes
50  
51    /**
52     * How far back can our data be to be considered fresh enough, relative to the
53     * interval requests. For example if this value is 0.25 and interval requested
54     * is 60 seconds, our most recent data point must be no more than 15 seconds old.
55     */
56    private static double DEFAULT_STALE_THRESHOLD = 0.25;
57  
58  
59    /**
60     * How far back do we need to have historical data for, relative to the
61     * interval requested. For example if this value is 0.25 and the interval
62     * requested is 60 seconds, our most oldest data point must be within 15
63     * seconds of the most recent data point - 60.
64     */
65    private static double DEFAULT_AGE_THRESHOLD = 0.25;
66  
67    // These can be made configurable if someone needs to do so
68    private long statsDataTTL = DEFAULT_STATS_DATA_TTL;
69    private double staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
70    private double ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
71  
72    private Map<T, OffsetDataStats> offsetStatsMap =
73            new ConcurrentHashMap<T, OffsetDataStats>();
74  
75    public OffsetStatsManager() {
76      this(DEFAULT_STATS_DATA_TTL);
77    }
78  
79    public OffsetStatsManager(long statsDataTTL) {
80      this.statsDataTTL = statsDataTTL;
81    }
82  
83    /**
84     * Record that at a given point in time an object key had a given offset.
85     * @param key Object to key this data point to
86     * @param offset How much of an offset to record
87     * @param timestamp The time the offset occured
88     */
89    public void addOffsetDataPoint(T key, long offset, long timestamp) {
90      OffsetDataStats stats = null;
91  
92        if (offsetStatsMap.get(key) == null)
93          offsetStatsMap.put(key, new OffsetDataStats());
94  
95        stats = offsetStatsMap.get(key);
96  
97      stats.add(new OffsetData(offset, timestamp));
98      stats.prune(statsDataTTL);
99  
100     if (log.isDebugEnabled())
101       log.debug("Added offset - key=" + key + ", offset=" + offset +
102                 ", time=" + new Date(timestamp) + ", dataCount=" +
103                 stats.getOffsetDataList().size());
104   }
105 
106   public double calcAverageRate(T key, long timeIntervalSecs) {
107     OffsetDataStats stats = get(key);
108     if (stats == null) {
109       if (log.isDebugEnabled())
110         log.debug("No stats data found key=" + key);
111       return -1;
112     }
113 
114     // first get the most recent data point to see if we're stale
115     long now = System.currentTimeMillis();
116     long mostRecentThreashold = now -
117             timeIntervalSecs * (long)(staleThresholdPercent * 1000);
118     OffsetData newestOffsetData = stats.mostRecentDataPoint();
119 
120     if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
121       if (log.isDebugEnabled())
122         log.debug("Stats data too stale for key=" + key);
123 
124       return -1; // data is too stale
125     }
126 
127     // then get the oldest data point to see if we have enough coverage
128     long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
129     long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
130 
131     OffsetData oldestOffsetData = null;
132     long minDiff = -1;
133     long lastDiff = -1;
134     for (OffsetData offsetData : stats.getOffsetDataList()) {
135       long diff = offsetData.within(then, thenDelta);
136 
137       if (diff < 0) continue;
138 
139       if (minDiff == -1 || minDiff < diff) {
140         // this is the data point closest to our target then time
141         minDiff = diff;
142         oldestOffsetData = offsetData;
143       }
144 
145       // optimize so is we got a minDiff, but the diffs are getting worse, then
146       // we've found the closet point and we can move on
147       if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
148         break;
149       }
150 
151       lastDiff = diff;
152     }
153 
154     if (oldestOffsetData == null) {
155       if (log.isDebugEnabled())
156         log.debug("Stats data history too short for key=" + key);
157 
158       return -1;
159     }
160 
161     return newestOffsetData.averageRate(oldestOffsetData);
162   }
163 
164   public OffsetData oldestDataPoint(T key) {
165     OffsetDataStats stats = get(key);
166     return stats.oldestDataPoint();
167   }
168 
169   public OffsetData mostRecentDataPoint(T key) {
170     OffsetDataStats stats = get(key);
171     return stats.mostRecentDataPoint();
172   }
173 
174   /**
175    * Remove key from the set of objects that we're tracking stats for.
176    * @param key key of stats to be removed
177    */
178   public void remove(T key) {
179       offsetStatsMap.remove(key);
180   }
181 
182   /**
183    * Remove all objectst that we're tracking stats for.
184    */
185   public void clear() {
186       offsetStatsMap.clear();
187   }
188 
189   /**
190    * Fetch OffsetDataStats for key.
191    * @param key key that stats are to be returned for
192    */
193   private OffsetDataStats get(T key) {
194       return offsetStatsMap.get(key);
195   }
196 
197   public class OffsetData {
198     private long offset;
199     private long timestamp;
200 
201     private OffsetData(long offset, long timestamp) {
202       this.offset = offset;
203       this.timestamp = timestamp;
204     }
205 
206     public long getOffset() { return offset; }
207     public long getTimestamp() { return timestamp; }
208 
209     public double averageRate(OffsetData previous) {
210       if (previous == null) return -1;
211       double elapseOffset = offset - previous.getOffset();
212       double elapseTime = (timestamp - previous.getTimestamp()) / 1000d;
213       double rate = elapseOffset / elapseTime;
214       return rate;
215     }
216 
217     public boolean olderThan(long timestamp) {
218       return this.timestamp < timestamp;
219     }
220 
221     public long within(long timestamp, long delta) {
222 
223       long diff = Math.abs(this.timestamp - timestamp);
224 
225       if (diff < delta) return diff;
226       return -1;
227     }
228   }
229 
230   private class OffsetDataStats {
231     private volatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
232 
233     public LinkedList<OffsetData> getOffsetDataList() {
234       return offsetDataList;
235     }
236 
237     public void add(OffsetData offsetData) {
238       synchronized(offsetDataList) {
239         offsetDataList.add(offsetData);
240       }
241     }
242 
243     public OffsetData oldestDataPoint() {
244       synchronized(offsetDataList) {
245         return offsetDataList.peekFirst();
246       }
247     }
248 
249     public OffsetData mostRecentDataPoint() {
250       synchronized(offsetDataList) {
251         return offsetDataList.peekLast();
252       }
253     }
254 
255     public void prune(long ttl) {
256       long cutoff = System.currentTimeMillis() - ttl;
257 
258       OffsetData data;
259       synchronized(offsetDataList) {
260         while ((data = offsetDataList.peekFirst()) != null) {
261           if (data.getTimestamp() > cutoff) break;
262 
263           offsetDataList.removeFirst();
264         }
265       }
266     }
267   }
268 }