cubefs
116 строк · 2.1 Кб
1package rolling
2
3import (
4"sync"
5"time"
6)
7
8// Number tracks a numberBucket over a bounded number of
9// time buckets. Currently the buckets are one second long and only the last 10 seconds are kept.
10type Number struct {
11Buckets map[int64]*numberBucket
12Mutex *sync.RWMutex
13}
14
15type numberBucket struct {
16Value float64
17}
18
19// NewNumber initializes a RollingNumber struct.
20func NewNumber() *Number {
21r := &Number{
22Buckets: make(map[int64]*numberBucket),
23Mutex: &sync.RWMutex{},
24}
25return r
26}
27
28func (r *Number) getCurrentBucket() *numberBucket {
29now := time.Now().Unix()
30var bucket *numberBucket
31var ok bool
32
33if bucket, ok = r.Buckets[now]; !ok {
34bucket = &numberBucket{}
35r.Buckets[now] = bucket
36}
37
38return bucket
39}
40
41func (r *Number) removeOldBuckets() {
42now := time.Now().Unix() - 10
43
44for timestamp := range r.Buckets {
45// TODO: configurable rolling window
46if timestamp <= now {
47delete(r.Buckets, timestamp)
48}
49}
50}
51
52// Increment increments the number in current timeBucket.
53func (r *Number) Increment(i float64) {
54if i == 0 {
55return
56}
57
58r.Mutex.Lock()
59defer r.Mutex.Unlock()
60
61b := r.getCurrentBucket()
62b.Value += i
63r.removeOldBuckets()
64}
65
66// UpdateMax updates the maximum value in the current bucket.
67func (r *Number) UpdateMax(n float64) {
68r.Mutex.Lock()
69defer r.Mutex.Unlock()
70
71b := r.getCurrentBucket()
72if n > b.Value {
73b.Value = n
74}
75r.removeOldBuckets()
76}
77
78// Sum sums the values over the buckets in the last 10 seconds.
79func (r *Number) Sum(now time.Time) float64 {
80sum := float64(0)
81
82r.Mutex.RLock()
83defer r.Mutex.RUnlock()
84
85for timestamp, bucket := range r.Buckets {
86// TODO: configurable rolling window
87if timestamp >= now.Unix()-10 {
88sum += bucket.Value
89}
90}
91
92return sum
93}
94
95// Max returns the maximum value seen in the last 10 seconds.
96func (r *Number) Max(now time.Time) float64 {
97var max float64
98
99r.Mutex.RLock()
100defer r.Mutex.RUnlock()
101
102for timestamp, bucket := range r.Buckets {
103// TODO: configurable rolling window
104if timestamp >= now.Unix()-10 {
105if bucket.Value > max {
106max = bucket.Value
107}
108}
109}
110
111return max
112}
113
114func (r *Number) Avg(now time.Time) float64 {
115return r.Sum(now) / 10
116}
117