1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection;
1920import org.apache.log4j.Logger;
2122import java.util.Map;
23import java.util.LinkedList;
24import java.util.Date;
25import java.util.concurrent.ConcurrentHashMap;
2627/**28 * Manages stats for multiple objects of type T. T can be any class that is used29 * as a key for offset statistics (i.e. Agent, Collector, etc.). A client would30 * create an instance of this class and call <code>addOffsetDataPoint<code>31 * repeatedly over time. Then <code>calcAverageRate</code> can be called to32 * retrieve the average offset-unit per second over a given time interval.33 * <P>34 * For a given object T that is actively adding data points, stats are kept for35 * up to 20 minutes.36 * <P>37 * Care should be taken to always call <code>remove()</code> when old T objects38 * should no longer be tracked.39 */40publicclass OffsetStatsManager<T> {
41protected Logger log = Logger.getLogger(getClass());
4243/*44 * This value is how far back we keep data for. Old data is purge when new45 * data is added.46 */47privatestaticlong DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L; // 20 minutes4849/**50 * How far back can our data be to be considered fresh enough, relative to the51 * interval requests. For example if this value is 0.25 and interval requested52 * is 60 seconds, our most recent data point must be no more than 15 seconds old.53 */54privatestaticdouble DEFAULT_STALE_THRESHOLD = 0.25;
555657/**58 * How far back do we need to have historical data for, relative to the59 * interval requested. For example if this value is 0.25 and the interval60 * requested is 60 seconds, our most oldest data point must be within 1561 * seconds of the most recent data point - 60.62 */63privatestaticdouble DEFAULT_AGE_THRESHOLD = 0.25;
6465// These can be made configurable if someone needs to do so66privatelong statsDataTTL = DEFAULT_STATS_DATA_TTL;
67privatedouble staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
68privatedouble ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
6970private Map<T, OffsetDataStats> offsetStatsMap =
71new ConcurrentHashMap<T, OffsetDataStats>();
7273publicOffsetStatsManager() {
74this(DEFAULT_STATS_DATA_TTL);
75 }
7677publicOffsetStatsManager(long statsDataTTL) {
78this.statsDataTTL = statsDataTTL;
79 }
8081/**82 * Record that at a given point in time an object key had a given offset.83 * @param key Object to key this data point to84 * @param offset How much of an offset to record85 * @param timestamp The time the offset occured86 */87publicvoid addOffsetDataPoint(T key, long offset, long timestamp) {
88OffsetDataStats stats = null;
8990synchronized (offsetStatsMap) {
91if (offsetStatsMap.get(key) == null)
92 offsetStatsMap.put(key, newOffsetDataStats());
9394 stats = offsetStatsMap.get(key);
95 }
9697 stats.add(newOffsetData(offset, timestamp));
98 stats.prune(statsDataTTL);
99100if (log.isDebugEnabled())
101 log.debug("Added offset - key=" + key + ", offset=" + offset +
102", time=" + new Date(timestamp) + ", dataCount=" +
103 stats.getOffsetDataList().size());
104 }
105106publicdouble calcAverageRate(T key, long timeIntervalSecs) {
107OffsetDataStats stats = get(key);
108if (stats == null) {
109if (log.isDebugEnabled())
110 log.debug("No stats data found key=" + key);
111return -1;
112 }
113114// first get the most recent data point to see if we're stale115long now = System.currentTimeMillis();
116long mostRecentThreashold = now -
117 timeIntervalSecs * (long)(staleThresholdPercent * 1000);
118OffsetData newestOffsetData = stats.mostRecentDataPoint();
119120if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
121if (log.isDebugEnabled())
122 log.debug("Stats data too stale for key=" + key);
123124return -1; // data is too stale125 }
126127// then get the oldest data point to see if we have enough coverage128long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
129long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
130131OffsetData oldestOffsetData = null;
132long minDiff = -1;
133long lastDiff = -1;
134for (OffsetData offsetData : stats.getOffsetDataList()) {
135long diff = offsetData.within(then, thenDelta);
136137if (diff < 0) continue;
138139if (minDiff == -1 || minDiff < diff) {
140// this is the data point closest to our target then time141 minDiff = diff;
142 oldestOffsetData = offsetData;
143 }
144145// optimize so is we got a minDiff, but the diffs are getting worse, then146// we've found the closet point and we can move on147if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
148break;
149 }
150151 lastDiff = diff;
152 }
153154if (oldestOffsetData == null) {
155if (log.isDebugEnabled())
156 log.debug("Stats data history too short for key=" + key);
157158return -1;
159 }
160161return newestOffsetData.averageRate(oldestOffsetData);
162 }
163164publicOffsetData oldestDataPoint(T key) {
165OffsetDataStats stats = get(key);
166return stats.oldestDataPoint();
167 }
168169publicOffsetData mostRecentDataPoint(T key) {
170OffsetDataStats stats = get(key);
171return stats.mostRecentDataPoint();
172 }
173174/**175 * Remove key from the set of objects that we're tracking stats for.176 * @param key key of stats to be removed177 */178publicvoid remove(T key) {
179synchronized (offsetStatsMap) {
180 offsetStatsMap.remove(key);
181 }
182 }
183184/**185 * Remove all objectst that we're tracking stats for.186 */187publicvoid clear() {
188synchronized (offsetStatsMap) {
189 offsetStatsMap.clear();
190 }
191 }
192193/**194 * Fetch OffsetDataStats for key.195 * @param key key that stats are to be returned for196 */197privateOffsetDataStats get(T key) {
198synchronized (offsetStatsMap) {
199return offsetStatsMap.get(key);
200 }
201 }
202203publicclassOffsetData {
204privatelong offset;
205privatelong timestamp;
206207privateOffsetData(long offset, long timestamp) {
208this.offset = offset;
209this.timestamp = timestamp;
210 }
211212publiclong getOffset() { return offset; }
213publiclong getTimestamp() { return timestamp; }
214215publicdouble averageRate(OffsetData previous) {
216if (previous == null) return -1;
217218returnnew Double((offset - previous.getOffset())) /
219new Double((timestamp - previous.getTimestamp())) * 1000L;
220 }
221222publicboolean olderThan(long timestamp) {
223returnthis.timestamp < timestamp;
224 }
225226publiclong within(long timestamp, long delta) {
227228long diff = Math.abs(this.timestamp - timestamp);
229230if (diff < delta) return diff;
231return -1;
232 }
233 }
234235privateclassOffsetDataStats {
236privatevolatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
237238public LinkedList<OffsetData> getOffsetDataList() {
239return offsetDataList;
240 }
241242publicvoid add(OffsetData offsetData) {
243synchronized(offsetDataList) {
244 offsetDataList.add(offsetData);
245 }
246 }
247248publicOffsetData oldestDataPoint() {
249synchronized(offsetDataList) {
250return offsetDataList.peekFirst();
251 }
252 }
253254publicOffsetData mostRecentDataPoint() {
255synchronized(offsetDataList) {
256return offsetDataList.peekLast();
257 }
258 }
259260publicvoid prune(long ttl) {
261long cutoff = System.currentTimeMillis() - ttl;
262263OffsetData data;
264synchronized(offsetDataList) {
265while ((data = offsetDataList.peekFirst()) != null) {
266if (data.getTimestamp() > cutoff) break;
267268 offsetDataList.removeFirst();
269 }
270 }
271 }
272 }
273 }