istio

Форк
0
/
instance.go 
171 строка · 4.1 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package queue
16

17
import (
18
	"sync"
19
	"time"
20

21
	"go.uber.org/atomic"
22
	"k8s.io/apimachinery/pkg/util/rand"
23

24
	"istio.io/istio/pkg/log"
25
)
26

27
// Task to be performed.
28
type Task func() error
29

30
type queueTask struct {
31
	task        Task
32
	enqueueTime time.Time
33
	startTime   time.Time
34
}
35

36
// Instance of work tickets processed using a rate-limiting loop
37
type baseInstance interface {
38
	// Push a task.
39
	Push(task Task)
40
	// Run the loop until a signal on the channel
41
	Run(<-chan struct{})
42
	// Closed returns a chan that will be signaled when the Instance has stopped processing tasks.
43
	Closed() <-chan struct{}
44
}
45

46
type Instance interface {
47
	baseInstance
48
	// HasSynced returns true once the queue has synced.
49
	// Syncing indicates that all items in the queue *before* Run was called have been processed.
50
	HasSynced() bool
51
}
52

53
type queueImpl struct {
54
	delay     time.Duration
55
	tasks     []*queueTask
56
	cond      *sync.Cond
57
	closing   bool
58
	closed    chan struct{}
59
	closeOnce *sync.Once
60
	// initialSync indicates the queue has initially "synced".
61
	initialSync *atomic.Bool
62
	id          string
63
	metrics     *queueMetrics
64
}
65

66
// NewQueue instantiates a queue with a processing function
67
func NewQueue(errorDelay time.Duration) Instance {
68
	return NewQueueWithID(errorDelay, rand.String(10))
69
}
70

71
func NewQueueWithID(errorDelay time.Duration, name string) Instance {
72
	return &queueImpl{
73
		delay:       errorDelay,
74
		tasks:       make([]*queueTask, 0),
75
		closing:     false,
76
		closed:      make(chan struct{}),
77
		closeOnce:   &sync.Once{},
78
		initialSync: atomic.NewBool(false),
79
		cond:        sync.NewCond(&sync.Mutex{}),
80
		id:          name,
81
		metrics:     newQueueMetrics(name),
82
	}
83
}
84

85
func (q *queueImpl) Push(item Task) {
86
	q.cond.L.Lock()
87
	defer q.cond.L.Unlock()
88
	if !q.closing {
89
		q.tasks = append(q.tasks, &queueTask{task: item, enqueueTime: time.Now()})
90
		q.metrics.depth.RecordInt(int64(len(q.tasks)))
91
	}
92
	q.cond.Signal()
93
}
94

95
func (q *queueImpl) Closed() <-chan struct{} {
96
	return q.closed
97
}
98

99
// get blocks until it can return a task to be processed. If shutdown = true,
100
// the processing go routine should stop.
101
func (q *queueImpl) get() (task *queueTask, shutdown bool) {
102
	q.cond.L.Lock()
103
	defer q.cond.L.Unlock()
104
	// wait for closing to be set, or a task to be pushed
105
	for !q.closing && len(q.tasks) == 0 {
106
		q.cond.Wait()
107
	}
108

109
	if q.closing && len(q.tasks) == 0 {
110
		// We must be shutting down.
111
		return nil, true
112
	}
113
	task = q.tasks[0]
114
	// Slicing will not free the underlying elements of the array, so explicitly clear them out here
115
	q.tasks[0] = nil
116
	q.tasks = q.tasks[1:]
117

118
	task.startTime = time.Now()
119
	q.metrics.depth.RecordInt(int64(len(q.tasks)))
120
	q.metrics.latency.Record(time.Since(task.enqueueTime).Seconds())
121

122
	return task, false
123
}
124

125
func (q *queueImpl) processNextItem() bool {
126
	// Wait until there is a new item in the queue
127
	task, shuttingdown := q.get()
128
	if shuttingdown {
129
		return false
130
	}
131

132
	// Run the task.
133
	if err := task.task(); err != nil {
134
		delay := q.delay
135
		log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
136
		time.AfterFunc(delay, func() {
137
			q.Push(task.task)
138
		})
139
	}
140
	q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())
141

142
	return true
143
}
144

145
func (q *queueImpl) HasSynced() bool {
146
	return q.initialSync.Load()
147
}
148

149
func (q *queueImpl) Run(stop <-chan struct{}) {
150
	log.Debugf("started queue %s", q.id)
151
	defer func() {
152
		q.closeOnce.Do(func() {
153
			log.Debugf("closed queue %s", q.id)
154
			close(q.closed)
155
		})
156
	}()
157
	go func() {
158
		<-stop
159
		q.cond.L.Lock()
160
		q.cond.Signal()
161
		q.closing = true
162
		q.cond.L.Unlock()
163
	}()
164

165
	q.Push(func() error {
166
		q.initialSync.Store(true)
167
		return nil
168
	})
169
	for q.processNextItem() {
170
	}
171
}
172

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.