1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
23
"istio.io/istio/pkg/log"
26
type delayTask struct {
34
var _ heap.Interface = &pq{}
36
// pq implements an internal priority queue so that tasks with the soonest expiry will be run first.
37
// Methods on pq are not threadsafe, access should be protected.
38
// much of this is taken from the example at https://golang.org/pkg/container/heap/
41
func (q pq) Len() int {
45
func (q pq) Less(i, j int) bool {
46
return q[i].runAt.Before(q[j].runAt)
49
func (q *pq) Swap(i, j int) {
50
(*q)[i], (*q)[j] = (*q)[j], (*q)[i]
53
func (q *pq) Push(x any) {
54
*q = append(*q, x.(*delayTask))
57
func (q *pq) Pop() any {
61
// Shrink the capacity of task queue.
62
if n < c/2 && c > 32 {
63
npq := make(pq, n, c/2)
71
old[n-1] = nil // avoid memory leak
76
// Peek is not managed by the container/heap package, so we return the 0th element in the list.
77
func (q *pq) Peek() any {
84
// Delayed implements queue such that tasks are executed after a specified delay.
85
type Delayed interface {
87
PushDelayed(t Task, delay time.Duration)
90
var _ Delayed = &delayQueue{}
92
// DelayQueueOption configure the behavior of the queue. Must be applied before Run.
93
type DelayQueueOption func(*delayQueue)
95
// DelayQueueBuffer sets maximum number of tasks awaiting execution. If this limit is reached, Push and PushDelayed
96
// will block until there is room.
97
func DelayQueueBuffer(bufferSize int) DelayQueueOption {
98
return func(queue *delayQueue) {
99
if queue.enqueue != nil {
102
queue.enqueue = make(chan *delayTask, bufferSize)
106
// DelayQueueWorkers sets the number of background worker goroutines await tasks to execute. Effectively the
107
// maximum number of concurrent tasks.
108
func DelayQueueWorkers(workers int) DelayQueueOption {
109
return func(queue *delayQueue) {
110
queue.workers = workers
114
// workerChanBuf determines whether the channel of a worker should be a buffered channel
115
// to get the best performance.
116
var workerChanBuf = func() int {
117
// Use blocking channel if GOMAXPROCS=1.
118
// This switches context from sender to receiver immediately,
119
// which results in higher performance.
121
if n = runtime.GOMAXPROCS(0); n == 1 {
125
// Make channel non-blocking and set up its capacity with GOMAXPROCS if GOMAXPROCS>1,
126
// otherwise the sender might be dragged down if the receiver is CPU-bound.
128
// GOMAXPROCS determines how many goroutines can run in parallel,
129
// which makes it the best choice as the channel capacity,
133
// NewDelayed gives a Delayed queue with maximum concurrency specified by workers.
134
func NewDelayed(opts ...DelayQueueOption) Delayed {
138
execute: make(chan *delayTask, workerChanBuf),
139
enqueue: make(chan *delayTask, 100),
141
for _, o := range opts {
147
type delayQueue struct {
149
workerStopped []chan struct{}
152
enqueue chan *delayTask
154
execute chan *delayTask
160
// Push will execute the task as soon as possible
161
func (d *delayQueue) Push(task Task) {
162
d.pushInternal(&delayTask{do: task, runAt: time.Now()})
165
// PushDelayed will execute the task after waiting for the delay
166
func (d *delayQueue) PushDelayed(t Task, delay time.Duration) {
167
task := &delayTask{do: t, runAt: time.Now().Add(delay)}
171
// pushInternal will enqueue the delayTask with retries.
172
func (d *delayQueue) pushInternal(task *delayTask) {
174
case d.enqueue <- task:
175
// buffer has room to enqueue
177
// TODO warn and resize buffer
178
// if the buffer is full, we take the more expensive route of locking and pushing directly to the heap
180
heap.Push(d.queue, task)
185
func (d *delayQueue) Closed() <-chan struct{} {
186
done := make(chan struct{})
188
for _, ch := range d.workerStopped {
196
func (d *delayQueue) Run(stop <-chan struct{}) {
197
for i := 0; i < d.workers; i++ {
198
d.workerStopped = append(d.workerStopped, d.work(stop))
201
push := func(t *delayTask) bool {
213
if head := d.queue.Peek(); head != nil {
214
task = head.(*delayTask)
220
delay := time.Until(task.runAt)
222
// execute now and continue processing incoming enqueues/tasks
227
// not ready yet, don't block enqueueing
228
await := time.NewTimer(delay)
230
case t := <-d.enqueue:
232
heap.Push(d.queue, t)
233
// put the old "head" back on the queue, it may be scheduled to execute after the one
234
// that was just pushed
235
heap.Push(d.queue, task)
248
// no items, wait for Push or stop
250
case t := <-d.enqueue:
261
// work takes a channel that signals to stop, and returns a channel that signals the worker has fully stopped
262
func (d *delayQueue) work(stop <-chan struct{}) (stopped chan struct{}) {
263
stopped = make(chan struct{})
268
case t := <-d.execute:
269
if err := t.do(); err != nil {
270
if t.retries < maxTaskRetry {
272
log.Warnf("Work item handle failed: %v %d times, retry it", err, t.retries)
276
log.Errorf("Work item handle failed: %v, reaching the maximum retry times: %d, drop it", err, maxTaskRetry)