1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
15
// Package aggregate implements a read-only aggregator for config stores.
21
"k8s.io/apimachinery/pkg/types"
23
"istio.io/istio/pilot/pkg/model"
24
"istio.io/istio/pkg/config"
25
"istio.io/istio/pkg/config/schema/collection"
26
"istio.io/istio/pkg/slices"
27
"istio.io/istio/pkg/util/sets"
30
var errorUnsupported = errors.New("unsupported operation: the config aggregator is read-only")
32
// makeStore creates an aggregate config store from several config stores and
33
// unifies their descriptors
34
func makeStore(stores []model.ConfigStore, writer model.ConfigStore) (model.ConfigStore, error) {
35
union := collection.NewSchemasBuilder()
36
storeTypes := make(map[config.GroupVersionKind][]model.ConfigStore)
37
for _, store := range stores {
38
for _, s := range store.Schemas().All() {
39
if len(storeTypes[s.GroupVersionKind()]) == 0 {
40
if err := union.Add(s); err != nil {
44
storeTypes[s.GroupVersionKind()] = append(storeTypes[s.GroupVersionKind()], store)
48
schemas := union.Build()
49
if err := schemas.Validate(); err != nil {
61
// MakeWriteableCache creates an aggregate config store cache from several config store caches. An additional
62
// `writer` config store is passed, which may or may not be part of `caches`.
63
func MakeWriteableCache(caches []model.ConfigStoreController, writer model.ConfigStore) (model.ConfigStoreController, error) {
64
stores := make([]model.ConfigStore, 0, len(caches))
65
for _, cache := range caches {
66
stores = append(stores, cache)
68
store, err := makeStore(stores, writer)
78
// MakeCache creates an aggregate config store cache from several config store
80
func MakeCache(caches []model.ConfigStoreController) (model.ConfigStoreController, error) {
81
return MakeWriteableCache(caches, nil)
85
// schemas is the unified
86
schemas collection.Schemas
88
// stores is a mapping from config type to a store
89
stores map[config.GroupVersionKind][]model.ConfigStore
91
writer model.ConfigStore
94
func (cr *store) Schemas() collection.Schemas {
98
// Get the first config found in the stores.
99
func (cr *store) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
100
for _, store := range cr.stores[typ] {
101
config := store.Get(typ, name, namespace)
109
// List all configs in the stores.
110
func (cr *store) List(typ config.GroupVersionKind, namespace string) []config.Config {
111
stores := cr.stores[typ]
112
if len(stores) == 0 {
117
configs []config.Config
118
storeConfigs = make([][]config.Config, 0, len(stores))
122
for _, store := range stores {
123
curConfigs := store.List(typ, namespace)
124
storeConfigs = append(storeConfigs, curConfigs)
125
configCnt += len(curConfigs)
128
configs = make([]config.Config, 0, configCnt)
129
// Used to remove duplicated config
130
configMap := sets.NewWithLength[types.NamespacedName](configCnt)
131
for _, curConfigs := range storeConfigs {
132
configs = append(configs, curConfigs...)
134
configs = slices.FilterInPlace[config.Config](configs, func(cfg config.Config) bool {
135
return !configMap.InsertContains(cfg.NamespacedName())
141
func (cr *store) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
142
if cr.writer == nil {
143
return errorUnsupported
145
return cr.writer.Delete(typ, name, namespace, resourceVersion)
148
func (cr *store) Create(c config.Config) (string, error) {
149
if cr.writer == nil {
150
return "", errorUnsupported
152
return cr.writer.Create(c)
155
func (cr *store) Update(c config.Config) (string, error) {
156
if cr.writer == nil {
157
return "", errorUnsupported
159
return cr.writer.Update(c)
162
func (cr *store) UpdateStatus(c config.Config) (string, error) {
163
if cr.writer == nil {
164
return "", errorUnsupported
166
return cr.writer.UpdateStatus(c)
169
func (cr *store) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
170
if cr.writer == nil {
171
return "", errorUnsupported
173
return cr.writer.Patch(orig, patchFn)
176
type storeCache struct {
178
caches []model.ConfigStoreController
181
func (cr *storeCache) HasSynced() bool {
182
for _, cache := range cr.caches {
183
if !cache.HasSynced() {
190
func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
191
for _, cache := range cr.caches {
192
if _, exists := cache.Schemas().FindByGroupVersionKind(kind); exists {
193
cache.RegisterEventHandler(kind, handler)
198
func (cr *storeCache) Run(stop <-chan struct{}) {
199
for _, cache := range cr.caches {