cubefs
148 строк · 3.2 Кб
1package rolling
2
3import (
4"math"
5"sort"
6"sync"
7"time"
8)
9
10// Timing maintains time Durations for each time bucket.
11// The Durations are kept in an array to allow for a variety of
12// statistics to be calculated from the source data.
13type Timing struct {
14Buckets map[int64]*timingBucket
15Mutex *sync.RWMutex
16
17CachedSortedDurations []time.Duration
18LastCachedTime int64
19}
20
21type timingBucket struct {
22Durations []time.Duration
23}
24
25// NewTiming creates a RollingTiming struct.
26func NewTiming() *Timing {
27r := &Timing{
28Buckets: make(map[int64]*timingBucket),
29Mutex: &sync.RWMutex{},
30}
31return r
32}
33
34type byDuration []time.Duration
35
36func (c byDuration) Len() int { return len(c) }
37func (c byDuration) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
38func (c byDuration) Less(i, j int) bool { return c[i] < c[j] }
39
40// SortedDurations returns an array of time.Duration sorted from shortest
41// to longest that have occurred in the last 60 seconds.
42func (r *Timing) SortedDurations() []time.Duration {
43r.Mutex.RLock()
44t := r.LastCachedTime
45r.Mutex.RUnlock()
46
47if t+time.Duration(1*time.Second).Nanoseconds() > time.Now().UnixNano() {
48// don't recalculate if current cache is still fresh
49return r.CachedSortedDurations
50}
51
52var durations byDuration
53now := time.Now()
54
55r.Mutex.Lock()
56defer r.Mutex.Unlock()
57
58for timestamp, b := range r.Buckets {
59// TODO: configurable rolling window
60if timestamp >= now.Unix()-60 {
61for _, d := range b.Durations {
62durations = append(durations, d)
63}
64}
65}
66
67sort.Sort(durations)
68
69r.CachedSortedDurations = durations
70r.LastCachedTime = time.Now().UnixNano()
71
72return r.CachedSortedDurations
73}
74
75func (r *Timing) getCurrentBucket() *timingBucket {
76r.Mutex.RLock()
77now := time.Now()
78bucket, exists := r.Buckets[now.Unix()]
79r.Mutex.RUnlock()
80
81if !exists {
82r.Mutex.Lock()
83defer r.Mutex.Unlock()
84
85r.Buckets[now.Unix()] = &timingBucket{}
86bucket = r.Buckets[now.Unix()]
87}
88
89return bucket
90}
91
92func (r *Timing) removeOldBuckets() {
93now := time.Now()
94
95for timestamp := range r.Buckets {
96// TODO: configurable rolling window
97if timestamp <= now.Unix()-60 {
98delete(r.Buckets, timestamp)
99}
100}
101}
102
103// Add appends the time.Duration given to the current time bucket.
104func (r *Timing) Add(duration time.Duration) {
105b := r.getCurrentBucket()
106
107r.Mutex.Lock()
108defer r.Mutex.Unlock()
109
110b.Durations = append(b.Durations, duration)
111r.removeOldBuckets()
112}
113
114// Percentile computes the percentile given with a linear interpolation.
115func (r *Timing) Percentile(p float64) uint32 {
116sortedDurations := r.SortedDurations()
117length := len(sortedDurations)
118if length <= 0 {
119return 0
120}
121
122pos := r.ordinal(len(sortedDurations), p) - 1
123return uint32(sortedDurations[pos].Nanoseconds() / 1000000)
124}
125
126func (r *Timing) ordinal(length int, percentile float64) int64 {
127if percentile == 0 && length > 0 {
128return 1
129}
130
131return int64(math.Ceil((percentile / float64(100)) * float64(length)))
132}
133
134// Mean computes the average timing in the last 60 seconds.
135func (r *Timing) Mean() uint32 {
136sortedDurations := r.SortedDurations()
137var sum time.Duration
138for _, d := range sortedDurations {
139sum += d
140}
141
142length := int64(len(sortedDurations))
143if length == 0 {
144return 0
145}
146
147return uint32(sum.Nanoseconds()/length) / 1000000
148}
149