istio

Форк
0
203 строки · 5.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package aggregate implements a read-only aggregator for config stores.
16
package aggregate
17

18
import (
19
	"errors"
20

21
	"k8s.io/apimachinery/pkg/types"
22

23
	"istio.io/istio/pilot/pkg/model"
24
	"istio.io/istio/pkg/config"
25
	"istio.io/istio/pkg/config/schema/collection"
26
	"istio.io/istio/pkg/slices"
27
	"istio.io/istio/pkg/util/sets"
28
)
29

30
var errorUnsupported = errors.New("unsupported operation: the config aggregator is read-only")
31

32
// makeStore creates an aggregate config store from several config stores and
33
// unifies their descriptors
34
func makeStore(stores []model.ConfigStore, writer model.ConfigStore) (model.ConfigStore, error) {
35
	union := collection.NewSchemasBuilder()
36
	storeTypes := make(map[config.GroupVersionKind][]model.ConfigStore)
37
	for _, store := range stores {
38
		for _, s := range store.Schemas().All() {
39
			if len(storeTypes[s.GroupVersionKind()]) == 0 {
40
				if err := union.Add(s); err != nil {
41
					return nil, err
42
				}
43
			}
44
			storeTypes[s.GroupVersionKind()] = append(storeTypes[s.GroupVersionKind()], store)
45
		}
46
	}
47

48
	schemas := union.Build()
49
	if err := schemas.Validate(); err != nil {
50
		return nil, err
51
	}
52
	result := &store{
53
		schemas: schemas,
54
		stores:  storeTypes,
55
		writer:  writer,
56
	}
57

58
	return result, nil
59
}
60

61
// MakeWriteableCache creates an aggregate config store cache from several config store caches. An additional
62
// `writer` config store is passed, which may or may not be part of `caches`.
63
func MakeWriteableCache(caches []model.ConfigStoreController, writer model.ConfigStore) (model.ConfigStoreController, error) {
64
	stores := make([]model.ConfigStore, 0, len(caches))
65
	for _, cache := range caches {
66
		stores = append(stores, cache)
67
	}
68
	store, err := makeStore(stores, writer)
69
	if err != nil {
70
		return nil, err
71
	}
72
	return &storeCache{
73
		ConfigStore: store,
74
		caches:      caches,
75
	}, nil
76
}
77

78
// MakeCache creates an aggregate config store cache from several config store
79
// caches.
80
func MakeCache(caches []model.ConfigStoreController) (model.ConfigStoreController, error) {
81
	return MakeWriteableCache(caches, nil)
82
}
83

84
type store struct {
85
	// schemas is the unified
86
	schemas collection.Schemas
87

88
	// stores is a mapping from config type to a store
89
	stores map[config.GroupVersionKind][]model.ConfigStore
90

91
	writer model.ConfigStore
92
}
93

94
func (cr *store) Schemas() collection.Schemas {
95
	return cr.schemas
96
}
97

98
// Get the first config found in the stores.
99
func (cr *store) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
100
	for _, store := range cr.stores[typ] {
101
		config := store.Get(typ, name, namespace)
102
		if config != nil {
103
			return config
104
		}
105
	}
106
	return nil
107
}
108

109
// List all configs in the stores.
110
func (cr *store) List(typ config.GroupVersionKind, namespace string) []config.Config {
111
	stores := cr.stores[typ]
112
	if len(stores) == 0 {
113
		return nil
114
	}
115

116
	var (
117
		configs      []config.Config
118
		storeConfigs = make([][]config.Config, 0, len(stores))
119
		configCnt    int
120
	)
121

122
	for _, store := range stores {
123
		curConfigs := store.List(typ, namespace)
124
		storeConfigs = append(storeConfigs, curConfigs)
125
		configCnt += len(curConfigs)
126
	}
127

128
	configs = make([]config.Config, 0, configCnt)
129
	// Used to remove duplicated config
130
	configMap := sets.NewWithLength[types.NamespacedName](configCnt)
131
	for _, curConfigs := range storeConfigs {
132
		configs = append(configs, curConfigs...)
133
	}
134
	configs = slices.FilterInPlace[config.Config](configs, func(cfg config.Config) bool {
135
		return !configMap.InsertContains(cfg.NamespacedName())
136
	})
137

138
	return configs
139
}
140

141
func (cr *store) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
142
	if cr.writer == nil {
143
		return errorUnsupported
144
	}
145
	return cr.writer.Delete(typ, name, namespace, resourceVersion)
146
}
147

148
func (cr *store) Create(c config.Config) (string, error) {
149
	if cr.writer == nil {
150
		return "", errorUnsupported
151
	}
152
	return cr.writer.Create(c)
153
}
154

155
func (cr *store) Update(c config.Config) (string, error) {
156
	if cr.writer == nil {
157
		return "", errorUnsupported
158
	}
159
	return cr.writer.Update(c)
160
}
161

162
func (cr *store) UpdateStatus(c config.Config) (string, error) {
163
	if cr.writer == nil {
164
		return "", errorUnsupported
165
	}
166
	return cr.writer.UpdateStatus(c)
167
}
168

169
func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
170
	if cr.writer == nil {
171
		return "", errorUnsupported
172
	}
173
	return cr.writer.Patch(orig, patchFn)
174
}
175

176
type storeCache struct {
177
	model.ConfigStore
178
	caches []model.ConfigStoreController
179
}
180

181
func (cr *storeCache) HasSynced() bool {
182
	for _, cache := range cr.caches {
183
		if !cache.HasSynced() {
184
			return false
185
		}
186
	}
187
	return true
188
}
189

190
func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
191
	for _, cache := range cr.caches {
192
		if _, exists := cache.Schemas().FindByGroupVersionKind(kind); exists {
193
			cache.RegisterEventHandler(kind, handler)
194
		}
195
	}
196
}
197

198
func (cr *storeCache) Run(stop <-chan struct{}) {
199
	for _, cache := range cr.caches {
200
		go cache.Run(stop)
201
	}
202
	<-stop
203
}
204

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.