istio

Форк
0
/
delay.go 
284 строки · 6.7 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package queue
16

17
import (
18
	"container/heap"
19
	"runtime"
20
	"sync"
21
	"time"
22

23
	"istio.io/istio/pkg/log"
24
)
25

26
type delayTask struct {
27
	do      func() error
28
	runAt   time.Time
29
	retries int
30
}
31

32
const maxTaskRetry = 3
33

34
var _ heap.Interface = &pq{}
35

36
// pq implements an internal priority queue so that tasks with the soonest expiry will be run first.
37
// Methods on pq are not threadsafe, access should be protected.
38
// much of this is taken from the example at https://golang.org/pkg/container/heap/
39
type pq []*delayTask
40

41
func (q pq) Len() int {
42
	return len(q)
43
}
44

45
func (q pq) Less(i, j int) bool {
46
	return q[i].runAt.Before(q[j].runAt)
47
}
48

49
func (q *pq) Swap(i, j int) {
50
	(*q)[i], (*q)[j] = (*q)[j], (*q)[i]
51
}
52

53
func (q *pq) Push(x any) {
54
	*q = append(*q, x.(*delayTask))
55
}
56

57
func (q *pq) Pop() any {
58
	old := *q
59
	n := len(old)
60
	c := cap(old)
61
	// Shrink the capacity of task queue.
62
	if n < c/2 && c > 32 {
63
		npq := make(pq, n, c/2)
64
		copy(npq, old)
65
		old = npq
66
	}
67
	if n == 0 {
68
		return nil
69
	}
70
	item := old[n-1]
71
	old[n-1] = nil // avoid memory leak
72
	*q = old[0 : n-1]
73
	return item
74
}
75

76
// Peek is not managed by the container/heap package, so we return the 0th element in the list.
77
func (q *pq) Peek() any {
78
	if q.Len() < 1 {
79
		return nil
80
	}
81
	return (*q)[0]
82
}
83

84
// Delayed implements queue such that tasks are executed after a specified delay.
85
type Delayed interface {
86
	baseInstance
87
	PushDelayed(t Task, delay time.Duration)
88
}
89

90
var _ Delayed = &delayQueue{}
91

92
// DelayQueueOption configure the behavior of the queue. Must be applied before Run.
93
type DelayQueueOption func(*delayQueue)
94

95
// DelayQueueBuffer sets maximum number of tasks awaiting execution. If this limit is reached, Push and PushDelayed
96
// will block until there is room.
97
func DelayQueueBuffer(bufferSize int) DelayQueueOption {
98
	return func(queue *delayQueue) {
99
		if queue.enqueue != nil {
100
			close(queue.enqueue)
101
		}
102
		queue.enqueue = make(chan *delayTask, bufferSize)
103
	}
104
}
105

106
// DelayQueueWorkers sets the number of background worker goroutines await tasks to execute. Effectively the
107
// maximum number of concurrent tasks.
108
func DelayQueueWorkers(workers int) DelayQueueOption {
109
	return func(queue *delayQueue) {
110
		queue.workers = workers
111
	}
112
}
113

114
// workerChanBuf determines whether the channel of a worker should be a buffered channel
115
// to get the best performance.
116
var workerChanBuf = func() int {
117
	// Use blocking channel if GOMAXPROCS=1.
118
	// This switches context from sender to receiver immediately,
119
	// which results in higher performance.
120
	var n int
121
	if n = runtime.GOMAXPROCS(0); n == 1 {
122
		return 0
123
	}
124

125
	// Make channel non-blocking and set up its capacity with GOMAXPROCS if GOMAXPROCS>1,
126
	// otherwise the sender might be dragged down if the receiver is CPU-bound.
127
	//
128
	// GOMAXPROCS determines how many goroutines can run in parallel,
129
	// which makes it the best choice as the channel capacity,
130
	return n
131
}()
132

133
// NewDelayed gives a Delayed queue with maximum concurrency specified by workers.
134
func NewDelayed(opts ...DelayQueueOption) Delayed {
135
	q := &delayQueue{
136
		workers: 1,
137
		queue:   &pq{},
138
		execute: make(chan *delayTask, workerChanBuf),
139
		enqueue: make(chan *delayTask, 100),
140
	}
141
	for _, o := range opts {
142
		o(q)
143
	}
144
	return q
145
}
146

147
type delayQueue struct {
148
	workers       int
149
	workerStopped []chan struct{}
150

151
	// incoming
152
	enqueue chan *delayTask
153
	// outgoing
154
	execute chan *delayTask
155

156
	mu    sync.Mutex
157
	queue *pq
158
}
159

160
// Push will execute the task as soon as possible
161
func (d *delayQueue) Push(task Task) {
162
	d.pushInternal(&delayTask{do: task, runAt: time.Now()})
163
}
164

165
// PushDelayed will execute the task after waiting for the delay
166
func (d *delayQueue) PushDelayed(t Task, delay time.Duration) {
167
	task := &delayTask{do: t, runAt: time.Now().Add(delay)}
168
	d.pushInternal(task)
169
}
170

171
// pushInternal will enqueue the delayTask with retries.
172
func (d *delayQueue) pushInternal(task *delayTask) {
173
	select {
174
	case d.enqueue <- task:
175
	// buffer has room to enqueue
176
	default:
177
		// TODO warn and resize buffer
178
		// if the buffer is full, we take the more expensive route of locking and pushing directly to the heap
179
		d.mu.Lock()
180
		heap.Push(d.queue, task)
181
		d.mu.Unlock()
182
	}
183
}
184

185
func (d *delayQueue) Closed() <-chan struct{} {
186
	done := make(chan struct{})
187
	go func() {
188
		for _, ch := range d.workerStopped {
189
			<-ch
190
		}
191
		close(done)
192
	}()
193
	return done
194
}
195

196
func (d *delayQueue) Run(stop <-chan struct{}) {
197
	for i := 0; i < d.workers; i++ {
198
		d.workerStopped = append(d.workerStopped, d.work(stop))
199
	}
200

201
	push := func(t *delayTask) bool {
202
		select {
203
		case d.execute <- t:
204
			return true
205
		case <-stop:
206
			return false
207
		}
208
	}
209

210
	for {
211
		var task *delayTask
212
		d.mu.Lock()
213
		if head := d.queue.Peek(); head != nil {
214
			task = head.(*delayTask)
215
			heap.Pop(d.queue)
216
		}
217
		d.mu.Unlock()
218

219
		if task != nil {
220
			delay := time.Until(task.runAt)
221
			if delay <= 0 {
222
				// execute now and continue processing incoming enqueues/tasks
223
				if !push(task) {
224
					return
225
				}
226
			} else {
227
				// not ready yet, don't block enqueueing
228
				await := time.NewTimer(delay)
229
				select {
230
				case t := <-d.enqueue:
231
					d.mu.Lock()
232
					heap.Push(d.queue, t)
233
					// put the old "head" back on the queue, it may be scheduled to execute after the one
234
					// that was just pushed
235
					heap.Push(d.queue, task)
236
					d.mu.Unlock()
237
				case <-await.C:
238
					if !push(task) {
239
						return
240
					}
241
				case <-stop:
242
					await.Stop()
243
					return
244
				}
245
				await.Stop()
246
			}
247
		} else {
248
			// no items, wait for Push or stop
249
			select {
250
			case t := <-d.enqueue:
251
				d.mu.Lock()
252
				d.queue.Push(t)
253
				d.mu.Unlock()
254
			case <-stop:
255
				return
256
			}
257
		}
258
	}
259
}
260

261
// work takes a channel that signals to stop, and returns a channel that signals the worker has fully stopped
262
func (d *delayQueue) work(stop <-chan struct{}) (stopped chan struct{}) {
263
	stopped = make(chan struct{})
264
	go func() {
265
		defer close(stopped)
266
		for {
267
			select {
268
			case t := <-d.execute:
269
				if err := t.do(); err != nil {
270
					if t.retries < maxTaskRetry {
271
						t.retries++
272
						log.Warnf("Work item handle failed: %v %d times, retry it", err, t.retries)
273
						d.pushInternal(t)
274
						continue
275
					}
276
					log.Errorf("Work item handle failed: %v, reaching the maximum retry times: %d, drop it", err, maxTaskRetry)
277
				}
278
			case <-stop:
279
				return
280
			}
281
		}
282
	}()
283
	return
284
}
285

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.