kraken

Форк
0
/
limiter.go 
139 строк · 3.0 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package dedup
15

16
import (
17
	"sync"
18
	"time"
19

20
	"github.com/andres-erbsen/clock"
21
)
22

23
// TaskGCInterval is the interval in which garbage collection of old tasks runs.
24
const TaskGCInterval = time.Minute
25

26
// TaskRunner runs against some input and produces some output w/ a ttl.
27
type TaskRunner interface {
28
	Run(input interface{}) (output interface{}, ttl time.Duration)
29
}
30

31
type task struct {
32
	input interface{}
33

34
	cond      *sync.Cond
35
	running   bool
36
	output    interface{}
37
	expiresAt time.Time
38
}
39

40
func newTask(input interface{}) *task {
41
	return &task{
42
		input: input,
43
		cond:  sync.NewCond(new(sync.Mutex)),
44
	}
45
}
46

47
func (t *task) expired(now time.Time) bool {
48
	return now.After(t.expiresAt)
49
}
50

51
// Limiter deduplicates the running of a common task within a given limit. Tasks
52
// are deduplicated based on input equality.
53
type Limiter struct {
54
	sync.RWMutex
55
	clk    clock.Clock
56
	runner TaskRunner
57
	tasks  map[interface{}]*task
58
	gc     *IntervalTrap
59
}
60

61
// NewLimiter creates a new Limiter for tasks. The limit is determined per task
62
// via the TaskRunner.
63
func NewLimiter(clk clock.Clock, runner TaskRunner) *Limiter {
64
	l := &Limiter{
65
		clk:    clk,
66
		runner: runner,
67
		tasks:  make(map[interface{}]*task),
68
	}
69
	l.gc = NewIntervalTrap(TaskGCInterval, clk, &limiterTaskGC{l})
70
	return l
71
}
72

73
// Run runs a task with input.
74
func (l *Limiter) Run(input interface{}) interface{} {
75
	l.gc.Trap()
76

77
	l.RLock()
78
	t, ok := l.tasks[input]
79
	l.RUnlock()
80
	if !ok {
81
		// Slow path, must initialize task struct under global write lock.
82
		l.Lock()
83
		t, ok = l.tasks[input]
84
		if !ok {
85
			t = newTask(input)
86
			l.tasks[input] = t
87
		}
88
		l.Unlock()
89
	}
90
	return l.getOutput(t)
91
}
92

93
func (l *Limiter) getOutput(t *task) interface{} {
94
	t.cond.L.Lock()
95

96
	if !t.expired(l.clk.Now()) {
97
		defer t.cond.L.Unlock()
98
		return t.output
99
	}
100

101
	if t.running {
102
		t.cond.Wait()
103
		defer t.cond.L.Unlock()
104
		return t.output
105
	}
106

107
	t.running = true
108
	t.cond.L.Unlock()
109

110
	output, ttl := l.runner.Run(t.input)
111

112
	t.cond.L.Lock()
113
	t.output = output
114
	t.expiresAt = l.clk.Now().Add(ttl)
115
	t.running = false
116
	t.cond.L.Unlock()
117

118
	t.cond.Broadcast()
119

120
	return output
121
}
122

123
type limiterTaskGC struct {
124
	limiter *Limiter
125
}
126

127
func (gc *limiterTaskGC) Run() {
128
	gc.limiter.Lock()
129
	defer gc.limiter.Unlock()
130

131
	for input, t := range gc.limiter.tasks {
132
		t.cond.L.Lock()
133
		expired := t.expired(gc.limiter.clk.Now()) && !t.running
134
		t.cond.L.Unlock()
135
		if expired {
136
			delete(gc.limiter.tasks, input)
137
		}
138
	}
139
}
140

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.