This project has retired. For details please refer to its Attic page.
OffsetStatsManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection;
19  
20  import org.apache.log4j.Logger;
21  
22  import java.util.Map;
23  import java.util.LinkedList;
24  import java.util.Date;
25  import java.util.concurrent.ConcurrentHashMap;
26  
27  /**
28   * Manages stats for multiple objects of type T. T can be any class that is used
29   * as a key for offset statistics (i.e. Agent, Collector, etc.). A client would
30   * create an instance of this class and call <code>addOffsetDataPoint<code>
31   * repeatedly over time. Then <code>calcAverageRate</code> can be called to
32   * retrieve the average offset-unit per second over a given time interval.
33   * <P>
34   * For a given object T that is actively adding data points, stats are kept for
35   * up to 20 minutes.
36   * <P>
37   * Care should be taken to always call <code>remove()</code> when old T objects
38   * should no longer be tracked.
39   */
40  public class OffsetStatsManager<T> {
41    protected Logger log = Logger.getLogger(getClass());
42  
43    /*
44     * This value is how far back we keep data for. Old data is purge when new
45     * data is added.
46     */
47    private static long DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L; // 20 minutes
48  
49    /**
50     * How far back can our data be to be considered fresh enough, relative to the
51     * interval requests. For example if this value is 0.25 and interval requested
52     * is 60 seconds, our most recent data point must be no more than 15 seconds old.
53     */
54    private static double DEFAULT_STALE_THRESHOLD = 0.25;
55  
56  
57    /**
58     * How far back do we need to have historical data for, relative to the
59     * interval requested. For example if this value is 0.25 and the interval
60     * requested is 60 seconds, our most oldest data point must be within 15
61     * seconds of the most recent data point - 60.
62     */
63    private static double DEFAULT_AGE_THRESHOLD = 0.25;
64  
65    // These can be made configurable if someone needs to do so
66    private long statsDataTTL = DEFAULT_STATS_DATA_TTL;
67    private double staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
68    private double ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
69  
70    private Map<T, OffsetDataStats> offsetStatsMap =
71            new ConcurrentHashMap<T, OffsetDataStats>();
72  
73    public OffsetStatsManager() {
74      this(DEFAULT_STATS_DATA_TTL);
75    }
76  
77    public OffsetStatsManager(long statsDataTTL) {
78      this.statsDataTTL = statsDataTTL;
79    }
80  
81    /**
82     * Record that at a given point in time an object key had a given offset.
83     * @param key Object to key this data point to
84     * @param offset How much of an offset to record
85     * @param timestamp The time the offset occured
86     */
87    public void addOffsetDataPoint(T key, long offset, long timestamp) {
88      OffsetDataStats stats = null;
89  
90      synchronized (offsetStatsMap) {
91        if (offsetStatsMap.get(key) == null)
92          offsetStatsMap.put(key, new OffsetDataStats());
93  
94        stats = offsetStatsMap.get(key);
95      }
96  
97      stats.add(new OffsetData(offset, timestamp));
98      stats.prune(statsDataTTL);
99  
100     if (log.isDebugEnabled())
101       log.debug("Added offset - key=" + key + ", offset=" + offset +
102                 ", time=" + new Date(timestamp) + ", dataCount=" +
103                 stats.getOffsetDataList().size());
104   }
105 
106   public double calcAverageRate(T key, long timeIntervalSecs) {
107     OffsetDataStats stats = get(key);
108     if (stats == null) {
109       if (log.isDebugEnabled())
110         log.debug("No stats data found key=" + key);
111       return -1;
112     }
113 
114     // first get the most recent data point to see if we're stale
115     long now = System.currentTimeMillis();
116     long mostRecentThreashold = now -
117             timeIntervalSecs * (long)(staleThresholdPercent * 1000);
118     OffsetData newestOffsetData = stats.mostRecentDataPoint();
119 
120     if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
121       if (log.isDebugEnabled())
122         log.debug("Stats data too stale for key=" + key);
123 
124       return -1; // data is too stale
125     }
126 
127     // then get the oldest data point to see if we have enough coverage
128     long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
129     long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
130 
131     OffsetData oldestOffsetData = null;
132     long minDiff = -1;
133     long lastDiff = -1;
134     for (OffsetData offsetData : stats.getOffsetDataList()) {
135       long diff = offsetData.within(then, thenDelta);
136 
137       if (diff < 0) continue;
138 
139       if (minDiff == -1 || minDiff < diff) {
140         // this is the data point closest to our target then time
141         minDiff = diff;
142         oldestOffsetData = offsetData;
143       }
144 
145       // optimize so is we got a minDiff, but the diffs are getting worse, then
146       // we've found the closet point and we can move on
147       if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
148         break;
149       }
150 
151       lastDiff = diff;
152     }
153 
154     if (oldestOffsetData == null) {
155       if (log.isDebugEnabled())
156         log.debug("Stats data history too short for key=" + key);
157 
158       return -1;
159     }
160 
161     return newestOffsetData.averageRate(oldestOffsetData);
162   }
163 
164   public OffsetData oldestDataPoint(T key) {
165     OffsetDataStats stats = get(key);
166     return stats.oldestDataPoint();
167   }
168 
169   public OffsetData mostRecentDataPoint(T key) {
170     OffsetDataStats stats = get(key);
171     return stats.mostRecentDataPoint();
172   }
173 
174   /**
175    * Remove key from the set of objects that we're tracking stats for.
176    * @param key key of stats to be removed
177    */
178   public void remove(T key) {
179     synchronized (offsetStatsMap) {
180       offsetStatsMap.remove(key);
181     }
182   }
183 
184   /**
185    * Remove all objectst that we're tracking stats for.
186    */
187   public void clear() {
188     synchronized (offsetStatsMap) {
189       offsetStatsMap.clear();
190     }
191   }
192 
193   /**
194    * Fetch OffsetDataStats for key.
195    * @param key key that stats are to be returned for
196    */
197   private OffsetDataStats get(T key) {
198     synchronized (offsetStatsMap) {
199       return offsetStatsMap.get(key);
200     }
201   }
202 
203   public class OffsetData {
204     private long offset;
205     private long timestamp;
206 
207     private OffsetData(long offset, long timestamp) {
208       this.offset = offset;
209       this.timestamp = timestamp;
210     }
211 
212     public long getOffset() { return offset; }
213     public long getTimestamp() { return timestamp; }
214 
215     public double averageRate(OffsetData previous) {
216       if (previous == null) return -1;
217 
218       return new Double((offset - previous.getOffset())) /
219              new Double((timestamp - previous.getTimestamp())) * 1000L;
220     }
221 
222     public boolean olderThan(long timestamp) {
223       return this.timestamp < timestamp;
224     }
225 
226     public long within(long timestamp, long delta) {
227 
228       long diff = Math.abs(this.timestamp - timestamp);
229 
230       if (diff < delta) return diff;
231       return -1;
232     }
233   }
234 
235   private class OffsetDataStats {
236     private volatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
237 
238     public LinkedList<OffsetData> getOffsetDataList() {
239       return offsetDataList;
240     }
241 
242     public void add(OffsetData offsetData) {
243       synchronized(offsetDataList) {
244         offsetDataList.add(offsetData);
245       }
246     }
247 
248     public OffsetData oldestDataPoint() {
249       synchronized(offsetDataList) {
250         return offsetDataList.peekFirst();
251       }
252     }
253 
254     public OffsetData mostRecentDataPoint() {
255       synchronized(offsetDataList) {
256         return offsetDataList.peekLast();
257       }
258     }
259 
260     public void prune(long ttl) {
261       long cutoff = System.currentTimeMillis() - ttl;
262 
263       OffsetData data;
264       synchronized(offsetDataList) {
265         while ((data = offsetDataList.peekFirst()) != null) {
266           if (data.getTimestamp() > cutoff) break;
267 
268           offsetDataList.removeFirst();
269         }
270       }
271     }
272   }
273 }