1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection;
1920import org.apache.log4j.Logger;
2122import java.util.Map;
23import java.util.LinkedList;
24import java.util.Date;
25import java.util.concurrent.ConcurrentHashMap;
26import java.util.concurrent.TimeUnit;
2728/**29 * Manages stats for multiple objects of type T. T can be any class that is used30 * as a key for offset statistics (i.e. Agent, Collector, etc.). A client would31 * create an instance of this class and call <code>addOffsetDataPoint<code>32 * repeatedly over time. Then <code>calcAverageRate</code> can be called to33 * retrieve the average offset-unit per second over a given time interval.34 * <P>35 * For a given object T that is actively adding data points, stats are kept for36 * up to 20 minutes.37 * <P>38 * Care should be taken to always call <code>remove()</code> when old T objects39 * should no longer be tracked.40 * @param <T> 41 */42publicclass OffsetStatsManager<T> {
43protected Logger log = Logger.getLogger(getClass());
4445/*46 * This value is how far back we keep data for. Old data is purge when new47 * data is added.48 */49privatestaticlong DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L; // 20 minutes5051/**52 * How far back can our data be to be considered fresh enough, relative to the53 * interval requests. For example if this value is 0.25 and interval requested54 * is 60 seconds, our most recent data point must be no more than 15 seconds old.55 */56privatestaticdouble DEFAULT_STALE_THRESHOLD = 0.25;
575859/**60 * How far back do we need to have historical data for, relative to the61 * interval requested. For example if this value is 0.25 and the interval62 * requested is 60 seconds, our most oldest data point must be within 1563 * seconds of the most recent data point - 60.64 */65privatestaticdouble DEFAULT_AGE_THRESHOLD = 0.25;
6667// These can be made configurable if someone needs to do so68privatelong statsDataTTL = DEFAULT_STATS_DATA_TTL;
69privatedouble staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
70privatedouble ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
7172private Map<T, OffsetDataStats> offsetStatsMap =
73new ConcurrentHashMap<T, OffsetDataStats>();
7475publicOffsetStatsManager() {
76this(DEFAULT_STATS_DATA_TTL);
77 }
7879publicOffsetStatsManager(long statsDataTTL) {
80this.statsDataTTL = statsDataTTL;
81 }
8283/**84 * Record that at a given point in time an object key had a given offset.85 * @param key Object to key this data point to86 * @param offset How much of an offset to record87 * @param timestamp The time the offset occured88 */89publicvoid addOffsetDataPoint(T key, long offset, long timestamp) {
90OffsetDataStats stats = null;
9192if (offsetStatsMap.get(key) == null)
93 offsetStatsMap.put(key, newOffsetDataStats());
9495 stats = offsetStatsMap.get(key);
9697 stats.add(newOffsetData(offset, timestamp));
98 stats.prune(statsDataTTL);
99100if (log.isDebugEnabled())
101 log.debug("Added offset - key=" + key + ", offset=" + offset +
102", time=" + new Date(timestamp) + ", dataCount=" +
103 stats.getOffsetDataList().size());
104 }
105106publicdouble calcAverageRate(T key, long timeIntervalSecs) {
107OffsetDataStats stats = get(key);
108if (stats == null) {
109if (log.isDebugEnabled())
110 log.debug("No stats data found key=" + key);
111return -1;
112 }
113114// first get the most recent data point to see if we're stale115long now = System.currentTimeMillis();
116long mostRecentThreashold = now -
117 timeIntervalSecs * (long)(staleThresholdPercent * 1000);
118OffsetData newestOffsetData = stats.mostRecentDataPoint();
119120if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
121if (log.isDebugEnabled())
122 log.debug("Stats data too stale for key=" + key);
123124return -1; // data is too stale125 }
126127// then get the oldest data point to see if we have enough coverage128long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
129long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
130131OffsetData oldestOffsetData = null;
132long minDiff = -1;
133long lastDiff = -1;
134for (OffsetData offsetData : stats.getOffsetDataList()) {
135long diff = offsetData.within(then, thenDelta);
136137if (diff < 0) continue;
138139if (minDiff == -1 || minDiff < diff) {
140// this is the data point closest to our target then time141 minDiff = diff;
142 oldestOffsetData = offsetData;
143 }
144145// optimize so is we got a minDiff, but the diffs are getting worse, then146// we've found the closet point and we can move on147if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
148break;
149 }
150151 lastDiff = diff;
152 }
153154if (oldestOffsetData == null) {
155if (log.isDebugEnabled())
156 log.debug("Stats data history too short for key=" + key);
157158return -1;
159 }
160161return newestOffsetData.averageRate(oldestOffsetData);
162 }
163164publicOffsetData oldestDataPoint(T key) {
165OffsetDataStats stats = get(key);
166return stats.oldestDataPoint();
167 }
168169publicOffsetData mostRecentDataPoint(T key) {
170OffsetDataStats stats = get(key);
171return stats.mostRecentDataPoint();
172 }
173174/**175 * Remove key from the set of objects that we're tracking stats for.176 * @param key key of stats to be removed177 */178publicvoid remove(T key) {
179 offsetStatsMap.remove(key);
180 }
181182/**183 * Remove all objectst that we're tracking stats for.184 */185publicvoid clear() {
186 offsetStatsMap.clear();
187 }
188189/**190 * Fetch OffsetDataStats for key.191 * @param key key that stats are to be returned for192 */193privateOffsetDataStats get(T key) {
194return offsetStatsMap.get(key);
195 }
196197publicclassOffsetData {
198privatelong offset;
199privatelong timestamp;
200201privateOffsetData(long offset, long timestamp) {
202this.offset = offset;
203this.timestamp = timestamp;
204 }
205206publiclong getOffset() { return offset; }
207publiclong getTimestamp() { return timestamp; }
208209publicdouble averageRate(OffsetData previous) {
210if (previous == null) return -1;
211double elapseOffset = offset - previous.getOffset();
212double elapseTime = (timestamp - previous.getTimestamp()) / 1000d;
213double rate = elapseOffset / elapseTime;
214return rate;
215 }
216217publicboolean olderThan(long timestamp) {
218returnthis.timestamp < timestamp;
219 }
220221publiclong within(long timestamp, long delta) {
222223long diff = Math.abs(this.timestamp - timestamp);
224225if (diff < delta) return diff;
226return -1;
227 }
228 }
229230privateclassOffsetDataStats {
231privatevolatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
232233public LinkedList<OffsetData> getOffsetDataList() {
234return offsetDataList;
235 }
236237publicvoid add(OffsetData offsetData) {
238synchronized(offsetDataList) {
239 offsetDataList.add(offsetData);
240 }
241 }
242243publicOffsetData oldestDataPoint() {
244synchronized(offsetDataList) {
245return offsetDataList.peekFirst();
246 }
247 }
248249publicOffsetData mostRecentDataPoint() {
250synchronized(offsetDataList) {
251return offsetDataList.peekLast();
252 }
253 }
254255publicvoid prune(long ttl) {
256long cutoff = System.currentTimeMillis() - ttl;
257258OffsetData data;
259synchronized(offsetDataList) {
260while ((data = offsetDataList.peekFirst()) != null) {
261if (data.getTimestamp() > cutoff) break;
262263 offsetDataList.removeFirst();
264 }
265 }
266 }
267 }
268 }