1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
22
"k8s.io/apimachinery/pkg/runtime/schema"
24
"istio.io/api/meta/v1alpha1"
25
"istio.io/istio/pkg/config"
26
"istio.io/istio/pkg/util/sets"
29
// Task to be performed.
30
type Task func(entry cacheEntry)
32
// WorkerQueue implements an expandable goroutine pool which executes at most one concurrent routine per target
33
// resource. Multiple calls to Push() will not schedule multiple executions per target resource, but will ensure that
34
// the single execution uses the latest value.
35
type WorkerQueue interface {
37
Push(target Resource, controller *Controller, context any)
38
// Run the loop until a signal on the context
39
Run(ctx context.Context)
41
Delete(target Resource)
44
type cacheEntry struct {
45
// the cacheVale represents the latest version of the resource, including ResourceVersion
46
cacheResource Resource
47
// the perControllerStatus represents the latest version of the ResourceStatus
48
perControllerStatus map[*Controller]any
51
type lockResource struct {
52
schema.GroupVersionResource
57
func convert(i Resource) lockResource {
59
GroupVersionResource: i.GroupVersionResource,
60
Namespace: i.Namespace,
65
type WorkQueue struct {
66
// tasks which are not currently executing but need to run
68
// a lock to govern access to data in the cache
70
// for each task, a cacheEntry which can be updated before the task is run so that execution will have latest values
71
cache map[lockResource]cacheEntry
76
func (wq *WorkQueue) Push(target Resource, ctl *Controller, progress any) {
78
key := convert(target)
79
if item, inqueue := wq.cache[key]; inqueue {
80
item.perControllerStatus[ctl] = progress
83
wq.cache[key] = cacheEntry{
84
cacheResource: target,
85
perControllerStatus: map[*Controller]any{ctl: progress},
87
wq.tasks = append(wq.tasks, key)
95
// Pop returns the first item in the queue not in exclusion, along with it's latest progress
96
func (wq *WorkQueue) Pop(exclusion sets.Set[lockResource]) (target Resource, progress map[*Controller]any) {
98
defer wq.lock.Unlock()
99
for i := 0; i < len(wq.tasks); i++ {
100
if !exclusion.Contains(wq.tasks[i]) {
102
t, ok := wq.cache[wq.tasks[i]]
103
wq.tasks = append(wq.tasks[:i], wq.tasks[i+1:]...)
105
return Resource{}, nil
107
return t.cacheResource, t.perControllerStatus
110
return Resource{}, nil
113
func (wq *WorkQueue) Length() int {
115
defer wq.lock.Unlock()
119
func (wq *WorkQueue) Delete(target Resource) {
121
defer wq.lock.Unlock()
122
delete(wq.cache, convert(target))
125
type WorkerPool struct {
127
// indicates the queue is closing
129
// the function which will be run for each task in queue
130
write func(*config.Config, any)
131
// the function to retrieve the initial status
132
get func(Resource) *config.Config
133
// current worker routine count
135
// maximum worker routine count
137
currentlyWorking sets.Set[lockResource]
141
func NewWorkerPool(write func(*config.Config, any), get func(Resource) *config.Config, maxWorkers uint) WorkerQueue {
145
maxWorkers: maxWorkers,
146
currentlyWorking: sets.New[lockResource](),
148
tasks: make([]lockResource, 0),
149
cache: make(map[lockResource]cacheEntry),
155
func (wp *WorkerPool) Delete(target Resource) {
159
func (wp *WorkerPool) Push(target Resource, controller *Controller, context any) {
160
wp.q.Push(target, controller, context)
164
func (wp *WorkerPool) Run(ctx context.Context) {
165
context.AfterFunc(ctx, func() {
172
// maybeAddWorker adds a worker unless we are at maxWorkers. Workers exit when there are no more tasks, except for the
173
// last worker, which stays alive indefinitely.
174
func (wp *WorkerPool) maybeAddWorker() {
176
if wp.workerCount >= wp.maxWorkers || wp.q.Length() == 0 {
185
if wp.closing || wp.q.Length() == 0 {
191
target, perControllerWork := wp.q.Pop(wp.currentlyWorking)
193
if target == (Resource{}) {
194
// continue or return?
195
// could have been deleted, or could be no items in queue not currently worked on. need a way to differentiate.
200
wp.currentlyWorking.Insert(convert(target))
202
// work should be done without holding the lock
203
cfg := wp.get(target)
205
// Check that generation matches
206
if strconv.FormatInt(cfg.Generation, 10) == target.Generation {
207
x, err := GetOGProvider(cfg.Status)
209
// Not all controllers user generation, so we can ignore errors
210
x.SetObservedGeneration(cfg.Generation)
212
for c, i := range perControllerWork {
213
// TODO: this does not guarantee controller order. perhaps it should?
220
wp.currentlyWorking.Delete(convert(target))
226
type GenerationProvider interface {
227
SetObservedGeneration(int64)
231
type IstioGenerationProvider struct {
232
*v1alpha1.IstioStatus
235
func (i *IstioGenerationProvider) SetObservedGeneration(in int64) {
236
i.ObservedGeneration = in
239
func (i *IstioGenerationProvider) Unwrap() any {