istio
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package queue16
17import (18"sync"19"time"20
21"go.uber.org/atomic"22"k8s.io/apimachinery/pkg/util/rand"23
24"istio.io/istio/pkg/log"25)
26
27// Task to be performed.
28type Task func() error29
30type queueTask struct {31task Task
32enqueueTime time.Time33startTime time.Time34}
35
36// Instance of work tickets processed using a rate-limiting loop
37type baseInstance interface {38// Push a task.39Push(task Task)40// Run the loop until a signal on the channel41Run(<-chan struct{})42// Closed returns a chan that will be signaled when the Instance has stopped processing tasks.43Closed() <-chan struct{}44}
45
46type Instance interface {47baseInstance
48// HasSynced returns true once the queue has synced.49// Syncing indicates that all items in the queue *before* Run was called have been processed.50HasSynced() bool51}
52
53type queueImpl struct {54delay time.Duration55tasks []*queueTask56cond *sync.Cond57closing bool58closed chan struct{}59closeOnce *sync.Once60// initialSync indicates the queue has initially "synced".61initialSync *atomic.Bool62id string63metrics *queueMetrics64}
65
66// NewQueue instantiates a queue with a processing function
67func NewQueue(errorDelay time.Duration) Instance {68return NewQueueWithID(errorDelay, rand.String(10))69}
70
71func NewQueueWithID(errorDelay time.Duration, name string) Instance {72return &queueImpl{73delay: errorDelay,74tasks: make([]*queueTask, 0),75closing: false,76closed: make(chan struct{}),77closeOnce: &sync.Once{},78initialSync: atomic.NewBool(false),79cond: sync.NewCond(&sync.Mutex{}),80id: name,81metrics: newQueueMetrics(name),82}83}
84
85func (q *queueImpl) Push(item Task) {86q.cond.L.Lock()87defer q.cond.L.Unlock()88if !q.closing {89q.tasks = append(q.tasks, &queueTask{task: item, enqueueTime: time.Now()})90q.metrics.depth.RecordInt(int64(len(q.tasks)))91}92q.cond.Signal()93}
94
95func (q *queueImpl) Closed() <-chan struct{} {96return q.closed97}
98
99// get blocks until it can return a task to be processed. If shutdown = true,
100// the processing go routine should stop.
101func (q *queueImpl) get() (task *queueTask, shutdown bool) {102q.cond.L.Lock()103defer q.cond.L.Unlock()104// wait for closing to be set, or a task to be pushed105for !q.closing && len(q.tasks) == 0 {106q.cond.Wait()107}108
109if q.closing && len(q.tasks) == 0 {110// We must be shutting down.111return nil, true112}113task = q.tasks[0]114// Slicing will not free the underlying elements of the array, so explicitly clear them out here115q.tasks[0] = nil116q.tasks = q.tasks[1:]117
118task.startTime = time.Now()119q.metrics.depth.RecordInt(int64(len(q.tasks)))120q.metrics.latency.Record(time.Since(task.enqueueTime).Seconds())121
122return task, false123}
124
125func (q *queueImpl) processNextItem() bool {126// Wait until there is a new item in the queue127task, shuttingdown := q.get()128if shuttingdown {129return false130}131
132// Run the task.133if err := task.task(); err != nil {134delay := q.delay135log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)136time.AfterFunc(delay, func() {137q.Push(task.task)138})139}140q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())141
142return true143}
144
145func (q *queueImpl) HasSynced() bool {146return q.initialSync.Load()147}
148
149func (q *queueImpl) Run(stop <-chan struct{}) {150log.Debugf("started queue %s", q.id)151defer func() {152q.closeOnce.Do(func() {153log.Debugf("closed queue %s", q.id)154close(q.closed)155})156}()157go func() {158<-stop159q.cond.L.Lock()160q.cond.Signal()161q.closing = true162q.cond.L.Unlock()163}()164
165q.Push(func() error {166q.initialSync.Store(true)167return nil168})169for q.processNextItem() {170}171}
172