cubefs

Форк
0
148 строк · 3.2 Кб
1
package rolling
2

3
import (
4
	"math"
5
	"sort"
6
	"sync"
7
	"time"
8
)
9

10
// Timing maintains time Durations for each time bucket.
11
// The Durations are kept in an array to allow for a variety of
12
// statistics to be calculated from the source data.
13
type Timing struct {
14
	Buckets map[int64]*timingBucket
15
	Mutex   *sync.RWMutex
16

17
	CachedSortedDurations []time.Duration
18
	LastCachedTime        int64
19
}
20

21
type timingBucket struct {
22
	Durations []time.Duration
23
}
24

25
// NewTiming creates a RollingTiming struct.
26
func NewTiming() *Timing {
27
	r := &Timing{
28
		Buckets: make(map[int64]*timingBucket),
29
		Mutex:   &sync.RWMutex{},
30
	}
31
	return r
32
}
33

34
type byDuration []time.Duration
35

36
func (c byDuration) Len() int           { return len(c) }
37
func (c byDuration) Swap(i, j int)      { c[i], c[j] = c[j], c[i] }
38
func (c byDuration) Less(i, j int) bool { return c[i] < c[j] }
39

40
// SortedDurations returns an array of time.Duration sorted from shortest
41
// to longest that have occurred in the last 60 seconds.
42
func (r *Timing) SortedDurations() []time.Duration {
43
	r.Mutex.RLock()
44
	t := r.LastCachedTime
45
	r.Mutex.RUnlock()
46

47
	if t+time.Duration(1*time.Second).Nanoseconds() > time.Now().UnixNano() {
48
		// don't recalculate if current cache is still fresh
49
		return r.CachedSortedDurations
50
	}
51

52
	var durations byDuration
53
	now := time.Now()
54

55
	r.Mutex.Lock()
56
	defer r.Mutex.Unlock()
57

58
	for timestamp, b := range r.Buckets {
59
		// TODO: configurable rolling window
60
		if timestamp >= now.Unix()-60 {
61
			for _, d := range b.Durations {
62
				durations = append(durations, d)
63
			}
64
		}
65
	}
66

67
	sort.Sort(durations)
68

69
	r.CachedSortedDurations = durations
70
	r.LastCachedTime = time.Now().UnixNano()
71

72
	return r.CachedSortedDurations
73
}
74

75
func (r *Timing) getCurrentBucket() *timingBucket {
76
	r.Mutex.RLock()
77
	now := time.Now()
78
	bucket, exists := r.Buckets[now.Unix()]
79
	r.Mutex.RUnlock()
80

81
	if !exists {
82
		r.Mutex.Lock()
83
		defer r.Mutex.Unlock()
84

85
		r.Buckets[now.Unix()] = &timingBucket{}
86
		bucket = r.Buckets[now.Unix()]
87
	}
88

89
	return bucket
90
}
91

92
func (r *Timing) removeOldBuckets() {
93
	now := time.Now()
94

95
	for timestamp := range r.Buckets {
96
		// TODO: configurable rolling window
97
		if timestamp <= now.Unix()-60 {
98
			delete(r.Buckets, timestamp)
99
		}
100
	}
101
}
102

103
// Add appends the time.Duration given to the current time bucket.
104
func (r *Timing) Add(duration time.Duration) {
105
	b := r.getCurrentBucket()
106

107
	r.Mutex.Lock()
108
	defer r.Mutex.Unlock()
109

110
	b.Durations = append(b.Durations, duration)
111
	r.removeOldBuckets()
112
}
113

114
// Percentile computes the percentile given with a linear interpolation.
115
func (r *Timing) Percentile(p float64) uint32 {
116
	sortedDurations := r.SortedDurations()
117
	length := len(sortedDurations)
118
	if length <= 0 {
119
		return 0
120
	}
121

122
	pos := r.ordinal(len(sortedDurations), p) - 1
123
	return uint32(sortedDurations[pos].Nanoseconds() / 1000000)
124
}
125

126
func (r *Timing) ordinal(length int, percentile float64) int64 {
127
	if percentile == 0 && length > 0 {
128
		return 1
129
	}
130

131
	return int64(math.Ceil((percentile / float64(100)) * float64(length)))
132
}
133

134
// Mean computes the average timing in the last 60 seconds.
135
func (r *Timing) Mean() uint32 {
136
	sortedDurations := r.SortedDurations()
137
	var sum time.Duration
138
	for _, d := range sortedDurations {
139
		sum += d
140
	}
141

142
	length := int64(len(sortedDurations))
143
	if length == 0 {
144
		return 0
145
	}
146

147
	return uint32(sum.Nanoseconds()/length) / 1000000
148
}
149

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.