istio

Форк
0
/
manager.go 
125 строк · 4.2 Кб
1
/*
2
 Copyright Istio Authors
3

4
 Licensed under the Apache License, Version 2.0 (the "License");
5
 you may not use this file except in compliance with the License.
6
 You may obtain a copy of the License at
7

8
     http://www.apache.org/licenses/LICENSE-2.0
9

10
 Unless required by applicable law or agreed to in writing, software
11
 distributed under the License is distributed on an "AS IS" BASIS,
12
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 See the License for the specific language governing permissions and
14
 limitations under the License.
15
*/
16

17
package status
18

19
import (
20
	"istio.io/api/meta/v1alpha1"
21
	"istio.io/istio/pilot/pkg/features"
22
	"istio.io/istio/pilot/pkg/model"
23
	"istio.io/istio/pkg/config"
24
	"istio.io/istio/pkg/config/schema/gvk"
25
)
26

27
// Manager allows multiple controllers to provide input into configuration
28
// status without needlessly doubling the number of writes, or overwriting
29
// one another.  Each status controller calls newController, passing in
30
// an arbitrary status modification function, and then calls EnqueueStatusUpdate
31
// when an individual resource is ready to be updated with the relevant data.
32
type Manager struct {
33
	// TODO: is Resource the right abstraction?
34
	store   model.ConfigStore
35
	workers WorkerQueue
36
}
37

38
func NewManager(store model.ConfigStore) *Manager {
39
	writeFunc := func(m *config.Config, istatus any) {
40
		scope.Debugf("writing status for resource %s/%s", m.Namespace, m.Name)
41
		status := istatus.(GenerationProvider)
42
		m.Status = status.Unwrap()
43
		_, err := store.UpdateStatus(*m)
44
		if err != nil {
45
			// TODO: need better error handling
46
			scope.Errorf("Encountered unexpected error updating status for %v, will try again later: %s", m, err)
47
			return
48
		}
49
	}
50
	retrieveFunc := func(resource Resource) *config.Config {
51
		scope.Debugf("retrieving config for status update: %s/%s", resource.Namespace, resource.Name)
52
		k, ok := gvk.FromGVR(resource.GroupVersionResource)
53
		if !ok {
54
			scope.Warnf("GVR %v could not be identified", resource.GroupVersionResource)
55
			return nil
56
		}
57

58
		current := store.Get(k, resource.Name, resource.Namespace)
59
		return current
60
	}
61
	return &Manager{
62
		store:   store,
63
		workers: NewWorkerPool(writeFunc, retrieveFunc, uint(features.StatusMaxWorkers)),
64
	}
65
}
66

67
func (m *Manager) Start(stop <-chan struct{}) {
68
	scope.Info("Starting status manager")
69

70
	ctx := NewIstioContext(stop)
71
	m.workers.Run(ctx)
72
}
73

74
// CreateGenericController provides an interface for a status update function to be
75
// called in series with other controllers, minimizing the number of actual
76
// api server writes sent from various status controllers.  The UpdateFunc
77
// must take the target resource status and arbitrary context information as
78
// parameters, and return the updated status value.  Multiple controllers
79
// will be called in series, so the input status may not have been written
80
// to the API server yet, and the output status may be modified by other
81
// controllers before it is written to the server.
82
func (m *Manager) CreateGenericController(fn UpdateFunc) *Controller {
83
	result := &Controller{
84
		fn:      fn,
85
		workers: m.workers,
86
	}
87
	return result
88
}
89

90
func (m *Manager) CreateIstioStatusController(fn func(status *v1alpha1.IstioStatus, context any) *v1alpha1.IstioStatus) *Controller {
91
	wrapper := func(status any, context any) GenerationProvider {
92
		var input *v1alpha1.IstioStatus
93
		if status != nil {
94
			converted := status.(*IstioGenerationProvider)
95
			input = converted.IstioStatus
96
		}
97
		result := fn(input, context)
98
		return &IstioGenerationProvider{result}
99
	}
100
	result := &Controller{
101
		fn:      wrapper,
102
		workers: m.workers,
103
	}
104
	return result
105
}
106

107
type UpdateFunc func(status any, context any) GenerationProvider
108

109
type Controller struct {
110
	fn      UpdateFunc
111
	workers WorkerQueue
112
}
113

114
// EnqueueStatusUpdateResource informs the manager that this controller would like to
115
// update the status of target, using the information in context.  Once the status
116
// workers are ready to perform this update, the controller's UpdateFunc
117
// will be called with target and context as input.
118
func (c *Controller) EnqueueStatusUpdateResource(context any, target Resource) {
119
	// TODO: buffer this with channel
120
	c.workers.Push(target, c, context)
121
}
122

123
func (c *Controller) Delete(r Resource) {
124
	c.workers.Delete(r)
125
}
126

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.