cubefs

Форк
0
525 строк · 14.6 Кб
1
// Copyright 2015 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Package timeseries implements a time series structure for stats collection.
6
package timeseries // import "golang.org/x/net/internal/timeseries"
7

8
import (
9
	"fmt"
10
	"log"
11
	"time"
12
)
13

14
const (
15
	timeSeriesNumBuckets       = 64
16
	minuteHourSeriesNumBuckets = 60
17
)
18

19
var timeSeriesResolutions = []time.Duration{
20
	1 * time.Second,
21
	10 * time.Second,
22
	1 * time.Minute,
23
	10 * time.Minute,
24
	1 * time.Hour,
25
	6 * time.Hour,
26
	24 * time.Hour,          // 1 day
27
	7 * 24 * time.Hour,      // 1 week
28
	4 * 7 * 24 * time.Hour,  // 4 weeks
29
	16 * 7 * 24 * time.Hour, // 16 weeks
30
}
31

32
var minuteHourSeriesResolutions = []time.Duration{
33
	1 * time.Second,
34
	1 * time.Minute,
35
}
36

37
// An Observable is a kind of data that can be aggregated in a time series.
38
type Observable interface {
39
	Multiply(ratio float64)    // Multiplies the data in self by a given ratio
40
	Add(other Observable)      // Adds the data from a different observation to self
41
	Clear()                    // Clears the observation so it can be reused.
42
	CopyFrom(other Observable) // Copies the contents of a given observation to self
43
}
44

45
// Float attaches the methods of Observable to a float64.
46
type Float float64
47

48
// NewFloat returns a Float.
49
func NewFloat() Observable {
50
	f := Float(0)
51
	return &f
52
}
53

54
// String returns the float as a string.
55
func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
56

57
// Value returns the float's value.
58
func (f *Float) Value() float64 { return float64(*f) }
59

60
func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
61

62
func (f *Float) Add(other Observable) {
63
	o := other.(*Float)
64
	*f += *o
65
}
66

67
func (f *Float) Clear() { *f = 0 }
68

69
func (f *Float) CopyFrom(other Observable) {
70
	o := other.(*Float)
71
	*f = *o
72
}
73

74
// A Clock tells the current time.
75
type Clock interface {
76
	Time() time.Time
77
}
78

79
type defaultClock int
80

81
var defaultClockInstance defaultClock
82

83
func (defaultClock) Time() time.Time { return time.Now() }
84

85
// Information kept per level. Each level consists of a circular list of
86
// observations. The start of the level may be derived from end and the
87
// len(buckets) * sizeInMillis.
88
type tsLevel struct {
89
	oldest   int               // index to oldest bucketed Observable
90
	newest   int               // index to newest bucketed Observable
91
	end      time.Time         // end timestamp for this level
92
	size     time.Duration     // duration of the bucketed Observable
93
	buckets  []Observable      // collections of observations
94
	provider func() Observable // used for creating new Observable
95
}
96

97
func (l *tsLevel) Clear() {
98
	l.oldest = 0
99
	l.newest = len(l.buckets) - 1
100
	l.end = time.Time{}
101
	for i := range l.buckets {
102
		if l.buckets[i] != nil {
103
			l.buckets[i].Clear()
104
			l.buckets[i] = nil
105
		}
106
	}
107
}
108

109
func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
110
	l.size = size
111
	l.provider = f
112
	l.buckets = make([]Observable, numBuckets)
113
}
114

115
// Keeps a sequence of levels. Each level is responsible for storing data at
116
// a given resolution. For example, the first level stores data at a one
117
// minute resolution while the second level stores data at a one hour
118
// resolution.
119

120
// Each level is represented by a sequence of buckets. Each bucket spans an
121
// interval equal to the resolution of the level. New observations are added
122
// to the last bucket.
123
type timeSeries struct {
124
	provider    func() Observable // make more Observable
125
	numBuckets  int               // number of buckets in each level
126
	levels      []*tsLevel        // levels of bucketed Observable
127
	lastAdd     time.Time         // time of last Observable tracked
128
	total       Observable        // convenient aggregation of all Observable
129
	clock       Clock             // Clock for getting current time
130
	pending     Observable        // observations not yet bucketed
131
	pendingTime time.Time         // what time are we keeping in pending
132
	dirty       bool              // if there are pending observations
133
}
134

135
// init initializes a level according to the supplied criteria.
136
func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
137
	ts.provider = f
138
	ts.numBuckets = numBuckets
139
	ts.clock = clock
140
	ts.levels = make([]*tsLevel, len(resolutions))
141

142
	for i := range resolutions {
143
		if i > 0 && resolutions[i-1] >= resolutions[i] {
144
			log.Print("timeseries: resolutions must be monotonically increasing")
145
			break
146
		}
147
		newLevel := new(tsLevel)
148
		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
149
		ts.levels[i] = newLevel
150
	}
151

152
	ts.Clear()
153
}
154

155
// Clear removes all observations from the time series.
156
func (ts *timeSeries) Clear() {
157
	ts.lastAdd = time.Time{}
158
	ts.total = ts.resetObservation(ts.total)
159
	ts.pending = ts.resetObservation(ts.pending)
160
	ts.pendingTime = time.Time{}
161
	ts.dirty = false
162

163
	for i := range ts.levels {
164
		ts.levels[i].Clear()
165
	}
166
}
167

168
// Add records an observation at the current time.
169
func (ts *timeSeries) Add(observation Observable) {
170
	ts.AddWithTime(observation, ts.clock.Time())
171
}
172

173
// AddWithTime records an observation at the specified time.
174
func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
175

176
	smallBucketDuration := ts.levels[0].size
177

178
	if t.After(ts.lastAdd) {
179
		ts.lastAdd = t
180
	}
181

182
	if t.After(ts.pendingTime) {
183
		ts.advance(t)
184
		ts.mergePendingUpdates()
185
		ts.pendingTime = ts.levels[0].end
186
		ts.pending.CopyFrom(observation)
187
		ts.dirty = true
188
	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
189
		// The observation is close enough to go into the pending bucket.
190
		// This compensates for clock skewing and small scheduling delays
191
		// by letting the update stay in the fast path.
192
		ts.pending.Add(observation)
193
		ts.dirty = true
194
	} else {
195
		ts.mergeValue(observation, t)
196
	}
197
}
198

199
// mergeValue inserts the observation at the specified time in the past into all levels.
200
func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
201
	for _, level := range ts.levels {
202
		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
203
		if 0 <= index && index < ts.numBuckets {
204
			bucketNumber := (level.oldest + index) % ts.numBuckets
205
			if level.buckets[bucketNumber] == nil {
206
				level.buckets[bucketNumber] = level.provider()
207
			}
208
			level.buckets[bucketNumber].Add(observation)
209
		}
210
	}
211
	ts.total.Add(observation)
212
}
213

214
// mergePendingUpdates applies the pending updates into all levels.
215
func (ts *timeSeries) mergePendingUpdates() {
216
	if ts.dirty {
217
		ts.mergeValue(ts.pending, ts.pendingTime)
218
		ts.pending = ts.resetObservation(ts.pending)
219
		ts.dirty = false
220
	}
221
}
222

223
// advance cycles the buckets at each level until the latest bucket in
224
// each level can hold the time specified.
225
func (ts *timeSeries) advance(t time.Time) {
226
	if !t.After(ts.levels[0].end) {
227
		return
228
	}
229
	for i := 0; i < len(ts.levels); i++ {
230
		level := ts.levels[i]
231
		if !level.end.Before(t) {
232
			break
233
		}
234

235
		// If the time is sufficiently far, just clear the level and advance
236
		// directly.
237
		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
238
			for _, b := range level.buckets {
239
				ts.resetObservation(b)
240
			}
241
			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
242
		}
243

244
		for t.After(level.end) {
245
			level.end = level.end.Add(level.size)
246
			level.newest = level.oldest
247
			level.oldest = (level.oldest + 1) % ts.numBuckets
248
			ts.resetObservation(level.buckets[level.newest])
249
		}
250

251
		t = level.end
252
	}
253
}
254

255
// Latest returns the sum of the num latest buckets from the level.
256
func (ts *timeSeries) Latest(level, num int) Observable {
257
	now := ts.clock.Time()
258
	if ts.levels[0].end.Before(now) {
259
		ts.advance(now)
260
	}
261

262
	ts.mergePendingUpdates()
263

264
	result := ts.provider()
265
	l := ts.levels[level]
266
	index := l.newest
267

268
	for i := 0; i < num; i++ {
269
		if l.buckets[index] != nil {
270
			result.Add(l.buckets[index])
271
		}
272
		if index == 0 {
273
			index = ts.numBuckets
274
		}
275
		index--
276
	}
277

278
	return result
279
}
280

281
// LatestBuckets returns a copy of the num latest buckets from level.
282
func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
283
	if level < 0 || level > len(ts.levels) {
284
		log.Print("timeseries: bad level argument: ", level)
285
		return nil
286
	}
287
	if num < 0 || num >= ts.numBuckets {
288
		log.Print("timeseries: bad num argument: ", num)
289
		return nil
290
	}
291

292
	results := make([]Observable, num)
293
	now := ts.clock.Time()
294
	if ts.levels[0].end.Before(now) {
295
		ts.advance(now)
296
	}
297

298
	ts.mergePendingUpdates()
299

300
	l := ts.levels[level]
301
	index := l.newest
302

303
	for i := 0; i < num; i++ {
304
		result := ts.provider()
305
		results[i] = result
306
		if l.buckets[index] != nil {
307
			result.CopyFrom(l.buckets[index])
308
		}
309

310
		if index == 0 {
311
			index = ts.numBuckets
312
		}
313
		index -= 1
314
	}
315
	return results
316
}
317

318
// ScaleBy updates observations by scaling by factor.
319
func (ts *timeSeries) ScaleBy(factor float64) {
320
	for _, l := range ts.levels {
321
		for i := 0; i < ts.numBuckets; i++ {
322
			l.buckets[i].Multiply(factor)
323
		}
324
	}
325

326
	ts.total.Multiply(factor)
327
	ts.pending.Multiply(factor)
328
}
329

330
// Range returns the sum of observations added over the specified time range.
331
// If start or finish times don't fall on bucket boundaries of the same
332
// level, then return values are approximate answers.
333
func (ts *timeSeries) Range(start, finish time.Time) Observable {
334
	return ts.ComputeRange(start, finish, 1)[0]
335
}
336

337
// Recent returns the sum of observations from the last delta.
338
func (ts *timeSeries) Recent(delta time.Duration) Observable {
339
	now := ts.clock.Time()
340
	return ts.Range(now.Add(-delta), now)
341
}
342

343
// Total returns the total of all observations.
344
func (ts *timeSeries) Total() Observable {
345
	ts.mergePendingUpdates()
346
	return ts.total
347
}
348

349
// ComputeRange computes a specified number of values into a slice using
350
// the observations recorded over the specified time period. The return
351
// values are approximate if the start or finish times don't fall on the
352
// bucket boundaries at the same level or if the number of buckets spanning
353
// the range is not an integral multiple of num.
354
func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
355
	if start.After(finish) {
356
		log.Printf("timeseries: start > finish, %v>%v", start, finish)
357
		return nil
358
	}
359

360
	if num < 0 {
361
		log.Printf("timeseries: num < 0, %v", num)
362
		return nil
363
	}
364

365
	results := make([]Observable, num)
366

367
	for _, l := range ts.levels {
368
		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
369
			ts.extract(l, start, finish, num, results)
370
			return results
371
		}
372
	}
373

374
	// Failed to find a level that covers the desired range. So just
375
	// extract from the last level, even if it doesn't cover the entire
376
	// desired range.
377
	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
378

379
	return results
380
}
381

382
// RecentList returns the specified number of values in slice over the most
383
// recent time period of the specified range.
384
func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
385
	if delta < 0 {
386
		return nil
387
	}
388
	now := ts.clock.Time()
389
	return ts.ComputeRange(now.Add(-delta), now, num)
390
}
391

392
// extract returns a slice of specified number of observations from a given
393
// level over a given range.
394
func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
395
	ts.mergePendingUpdates()
396

397
	srcInterval := l.size
398
	dstInterval := finish.Sub(start) / time.Duration(num)
399
	dstStart := start
400
	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
401

402
	srcIndex := 0
403

404
	// Where should scanning start?
405
	if dstStart.After(srcStart) {
406
		advance := int(dstStart.Sub(srcStart) / srcInterval)
407
		srcIndex += advance
408
		srcStart = srcStart.Add(time.Duration(advance) * srcInterval)
409
	}
410

411
	// The i'th value is computed as show below.
412
	// interval = (finish/start)/num
413
	// i'th value = sum of observation in range
414
	//   [ start + i       * interval,
415
	//     start + (i + 1) * interval )
416
	for i := 0; i < num; i++ {
417
		results[i] = ts.resetObservation(results[i])
418
		dstEnd := dstStart.Add(dstInterval)
419
		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
420
			srcEnd := srcStart.Add(srcInterval)
421
			if srcEnd.After(ts.lastAdd) {
422
				srcEnd = ts.lastAdd
423
			}
424

425
			if !srcEnd.Before(dstStart) {
426
				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
427
				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
428
					// dst completely contains src.
429
					if srcValue != nil {
430
						results[i].Add(srcValue)
431
					}
432
				} else {
433
					// dst partially overlaps src.
434
					overlapStart := maxTime(srcStart, dstStart)
435
					overlapEnd := minTime(srcEnd, dstEnd)
436
					base := srcEnd.Sub(srcStart)
437
					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
438

439
					used := ts.provider()
440
					if srcValue != nil {
441
						used.CopyFrom(srcValue)
442
					}
443
					used.Multiply(fraction)
444
					results[i].Add(used)
445
				}
446

447
				if srcEnd.After(dstEnd) {
448
					break
449
				}
450
			}
451
			srcIndex++
452
			srcStart = srcStart.Add(srcInterval)
453
		}
454
		dstStart = dstStart.Add(dstInterval)
455
	}
456
}
457

458
// resetObservation clears the content so the struct may be reused.
459
func (ts *timeSeries) resetObservation(observation Observable) Observable {
460
	if observation == nil {
461
		observation = ts.provider()
462
	} else {
463
		observation.Clear()
464
	}
465
	return observation
466
}
467

468
// TimeSeries tracks data at granularities from 1 second to 16 weeks.
469
type TimeSeries struct {
470
	timeSeries
471
}
472

473
// NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
474
func NewTimeSeries(f func() Observable) *TimeSeries {
475
	return NewTimeSeriesWithClock(f, defaultClockInstance)
476
}
477

478
// NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
479
// assigning timestamps.
480
func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
481
	ts := new(TimeSeries)
482
	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
483
	return ts
484
}
485

486
// MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
487
type MinuteHourSeries struct {
488
	timeSeries
489
}
490

491
// NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
492
func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
493
	return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
494
}
495

496
// NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
497
// assigning timestamps.
498
func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
499
	ts := new(MinuteHourSeries)
500
	ts.timeSeries.init(minuteHourSeriesResolutions, f,
501
		minuteHourSeriesNumBuckets, clock)
502
	return ts
503
}
504

505
func (ts *MinuteHourSeries) Minute() Observable {
506
	return ts.timeSeries.Latest(0, 60)
507
}
508

509
func (ts *MinuteHourSeries) Hour() Observable {
510
	return ts.timeSeries.Latest(1, 60)
511
}
512

513
func minTime(a, b time.Time) time.Time {
514
	if a.Before(b) {
515
		return a
516
	}
517
	return b
518
}
519

520
func maxTime(a, b time.Time) time.Time {
521
	if a.After(b) {
522
		return a
523
	}
524
	return b
525
}
526

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.