istio

Форк
0
/
resourcelock.go 
241 строка · 6.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package status
16

17
import (
18
	"context"
19
	"strconv"
20
	"sync"
21

22
	"k8s.io/apimachinery/pkg/runtime/schema"
23

24
	"istio.io/api/meta/v1alpha1"
25
	"istio.io/istio/pkg/config"
26
	"istio.io/istio/pkg/util/sets"
27
)
28

29
// Task to be performed.
30
type Task func(entry cacheEntry)
31

32
// WorkerQueue implements an expandable goroutine pool which executes at most one concurrent routine per target
33
// resource.  Multiple calls to Push() will not schedule multiple executions per target resource, but will ensure that
34
// the single execution uses the latest value.
35
type WorkerQueue interface {
36
	// Push a task.
37
	Push(target Resource, controller *Controller, context any)
38
	// Run the loop until a signal on the context
39
	Run(ctx context.Context)
40
	// Delete a task
41
	Delete(target Resource)
42
}
43

44
type cacheEntry struct {
45
	// the cacheVale represents the latest version of the resource, including ResourceVersion
46
	cacheResource Resource
47
	// the perControllerStatus represents the latest version of the ResourceStatus
48
	perControllerStatus map[*Controller]any
49
}
50

51
type lockResource struct {
52
	schema.GroupVersionResource
53
	Namespace string
54
	Name      string
55
}
56

57
func convert(i Resource) lockResource {
58
	return lockResource{
59
		GroupVersionResource: i.GroupVersionResource,
60
		Namespace:            i.Namespace,
61
		Name:                 i.Name,
62
	}
63
}
64

65
type WorkQueue struct {
66
	// tasks which are not currently executing but need to run
67
	tasks []lockResource
68
	// a lock to govern access to data in the cache
69
	lock sync.Mutex
70
	// for each task, a cacheEntry which can be updated before the task is run so that execution will have latest values
71
	cache map[lockResource]cacheEntry
72

73
	OnPush func()
74
}
75

76
func (wq *WorkQueue) Push(target Resource, ctl *Controller, progress any) {
77
	wq.lock.Lock()
78
	key := convert(target)
79
	if item, inqueue := wq.cache[key]; inqueue {
80
		item.perControllerStatus[ctl] = progress
81
		wq.cache[key] = item
82
	} else {
83
		wq.cache[key] = cacheEntry{
84
			cacheResource:       target,
85
			perControllerStatus: map[*Controller]any{ctl: progress},
86
		}
87
		wq.tasks = append(wq.tasks, key)
88
	}
89
	wq.lock.Unlock()
90
	if wq.OnPush != nil {
91
		wq.OnPush()
92
	}
93
}
94

95
// Pop returns the first item in the queue not in exclusion, along with it's latest progress
96
func (wq *WorkQueue) Pop(exclusion sets.Set[lockResource]) (target Resource, progress map[*Controller]any) {
97
	wq.lock.Lock()
98
	defer wq.lock.Unlock()
99
	for i := 0; i < len(wq.tasks); i++ {
100
		if !exclusion.Contains(wq.tasks[i]) {
101
			// remove from tasks
102
			t, ok := wq.cache[wq.tasks[i]]
103
			wq.tasks = append(wq.tasks[:i], wq.tasks[i+1:]...)
104
			if !ok {
105
				return Resource{}, nil
106
			}
107
			return t.cacheResource, t.perControllerStatus
108
		}
109
	}
110
	return Resource{}, nil
111
}
112

113
func (wq *WorkQueue) Length() int {
114
	wq.lock.Lock()
115
	defer wq.lock.Unlock()
116
	return len(wq.tasks)
117
}
118

119
func (wq *WorkQueue) Delete(target Resource) {
120
	wq.lock.Lock()
121
	defer wq.lock.Unlock()
122
	delete(wq.cache, convert(target))
123
}
124

125
type WorkerPool struct {
126
	q WorkQueue
127
	// indicates the queue is closing
128
	closing bool
129
	// the function which will be run for each task in queue
130
	write func(*config.Config, any)
131
	// the function to retrieve the initial status
132
	get func(Resource) *config.Config
133
	// current worker routine count
134
	workerCount uint
135
	// maximum worker routine count
136
	maxWorkers       uint
137
	currentlyWorking sets.Set[lockResource]
138
	lock             sync.Mutex
139
}
140

141
func NewWorkerPool(write func(*config.Config, any), get func(Resource) *config.Config, maxWorkers uint) WorkerQueue {
142
	return &WorkerPool{
143
		write:            write,
144
		get:              get,
145
		maxWorkers:       maxWorkers,
146
		currentlyWorking: sets.New[lockResource](),
147
		q: WorkQueue{
148
			tasks:  make([]lockResource, 0),
149
			cache:  make(map[lockResource]cacheEntry),
150
			OnPush: nil,
151
		},
152
	}
153
}
154

155
func (wp *WorkerPool) Delete(target Resource) {
156
	wp.q.Delete(target)
157
}
158

159
func (wp *WorkerPool) Push(target Resource, controller *Controller, context any) {
160
	wp.q.Push(target, controller, context)
161
	wp.maybeAddWorker()
162
}
163

164
func (wp *WorkerPool) Run(ctx context.Context) {
165
	context.AfterFunc(ctx, func() {
166
		wp.lock.Lock()
167
		wp.closing = true
168
		wp.lock.Unlock()
169
	})
170
}
171

172
// maybeAddWorker adds a worker unless we are at maxWorkers.  Workers exit when there are no more tasks, except for the
173
// last worker, which stays alive indefinitely.
174
func (wp *WorkerPool) maybeAddWorker() {
175
	wp.lock.Lock()
176
	if wp.workerCount >= wp.maxWorkers || wp.q.Length() == 0 {
177
		wp.lock.Unlock()
178
		return
179
	}
180
	wp.workerCount++
181
	wp.lock.Unlock()
182
	go func() {
183
		for {
184
			wp.lock.Lock()
185
			if wp.closing || wp.q.Length() == 0 {
186
				wp.workerCount--
187
				wp.lock.Unlock()
188
				return
189
			}
190

191
			target, perControllerWork := wp.q.Pop(wp.currentlyWorking)
192

193
			if target == (Resource{}) {
194
				// continue or return?
195
				// could have been deleted, or could be no items in queue not currently worked on.  need a way to differentiate.
196
				wp.lock.Unlock()
197
				continue
198
			}
199
			wp.q.Delete(target)
200
			wp.currentlyWorking.Insert(convert(target))
201
			wp.lock.Unlock()
202
			// work should be done without holding the lock
203
			cfg := wp.get(target)
204
			if cfg != nil {
205
				// Check that generation matches
206
				if strconv.FormatInt(cfg.Generation, 10) == target.Generation {
207
					x, err := GetOGProvider(cfg.Status)
208
					if err == nil {
209
						// Not all controllers user generation, so we can ignore errors
210
						x.SetObservedGeneration(cfg.Generation)
211
					}
212
					for c, i := range perControllerWork {
213
						// TODO: this does not guarantee controller order.  perhaps it should?
214
						x = c.fn(x, i)
215
					}
216
					wp.write(cfg, x)
217
				}
218
			}
219
			wp.lock.Lock()
220
			wp.currentlyWorking.Delete(convert(target))
221
			wp.lock.Unlock()
222
		}
223
	}()
224
}
225

226
type GenerationProvider interface {
227
	SetObservedGeneration(int64)
228
	Unwrap() any
229
}
230

231
type IstioGenerationProvider struct {
232
	*v1alpha1.IstioStatus
233
}
234

235
func (i *IstioGenerationProvider) SetObservedGeneration(in int64) {
236
	i.ObservedGeneration = in
237
}
238

239
func (i *IstioGenerationProvider) Unwrap() any {
240
	return i.IstioStatus
241
}
242

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.