istio

Форк
0
452 строки · 13.5 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package crdclient provides an implementation of the config store and cache
16
// using Kubernetes Custom Resources and the informer framework from Kubernetes
17
//
18
// This code relies heavily on code generation for performance reasons; to implement the
19
// Istio store interface, we need to take dynamic inputs. Using the dynamic informers results in poor
20
// performance, as the cache will store unstructured objects which need to be marshaled on each Get/List call.
21
// Using istio/client-go directly will cache objects marshaled, allowing us to have cheap Get/List calls,
22
// at the expense of some code gen.
23
package crdclient
24

25
import (
26
	"fmt"
27
	"sync"
28
	"time"
29

30
	jsonmerge "github.com/evanphx/json-patch/v5"
31
	"go.uber.org/atomic"
32
	"gomodules.xyz/jsonpatch/v2"
33
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
	klabels "k8s.io/apimachinery/pkg/labels"
35
	"k8s.io/apimachinery/pkg/runtime"
36
	"k8s.io/apimachinery/pkg/types"
37
	"k8s.io/apimachinery/pkg/util/json"
38
	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"  // import GKE cluster authentication plugin
39
	_ "k8s.io/client-go/plugin/pkg/client/auth/oidc" // import OIDC cluster authentication plugin, e.g. for Tectonic
40

41
	"istio.io/istio/pilot/pkg/features"
42
	"istio.io/istio/pilot/pkg/model"
43
	"istio.io/istio/pkg/config"
44
	"istio.io/istio/pkg/config/schema/collection"
45
	"istio.io/istio/pkg/config/schema/collections"
46
	"istio.io/istio/pkg/config/schema/resource"
47
	"istio.io/istio/pkg/kube"
48
	"istio.io/istio/pkg/kube/controllers"
49
	"istio.io/istio/pkg/kube/kclient"
50
	"istio.io/istio/pkg/kube/kubetypes"
51
	"istio.io/istio/pkg/log"
52
	"istio.io/istio/pkg/maps"
53
	"istio.io/istio/pkg/queue"
54
	"istio.io/istio/pkg/slices"
55
	"istio.io/istio/pkg/util/sets"
56
)
57

58
var scope = log.RegisterScope("kube", "Kubernetes client messages")
59

60
// Client is a client for Istio CRDs, implementing config store cache
61
// This is used for CRUD operators on Istio configuration, as well as handling of events on config changes
62
type Client struct {
63
	// schemas defines the set of schemas used by this client.
64
	// Note: this must be a subset of the schemas defined in the codegen
65
	schemas collection.Schemas
66

67
	// domainSuffix for the config metadata
68
	domainSuffix string
69

70
	// revision for this control plane instance. We will only read configs that match this revision.
71
	revision string
72

73
	// kinds keeps track of all cache handlers for known types
74
	kinds   map[config.GroupVersionKind]kclient.Untyped
75
	kindsMu sync.RWMutex
76
	queue   queue.Instance
77
	// a flag indicates whether this client has been run, it is to prevent run queue twice
78
	started *atomic.Bool
79

80
	// handlers defines a list of event handlers per-type
81
	handlers map[config.GroupVersionKind][]model.EventHandler
82

83
	schemasByCRDName map[string]resource.Schema
84
	client           kube.Client
85
	logger           *log.Scope
86

87
	// namespacesFilter is only used to initiate filtered informer.
88
	filtersByGVK map[config.GroupVersionKind]kubetypes.Filter
89
}
90

91
type Option struct {
92
	Revision     string
93
	DomainSuffix string
94
	Identifier   string
95
	FiltersByGVK map[config.GroupVersionKind]kubetypes.Filter
96
}
97

98
var _ model.ConfigStoreController = &Client{}
99

100
func New(client kube.Client, opts Option) *Client {
101
	schemas := collections.Pilot
102
	if features.EnableGatewayAPI {
103
		schemas = collections.PilotGatewayAPI()
104
	}
105
	return NewForSchemas(client, opts, schemas)
106
}
107

108
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) *Client {
109
	schemasByCRDName := map[string]resource.Schema{}
110
	for _, s := range schemas.All() {
111
		// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
112
		name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
113
		schemasByCRDName[name] = s
114
	}
115
	out := &Client{
116
		domainSuffix:     opts.DomainSuffix,
117
		schemas:          schemas,
118
		schemasByCRDName: schemasByCRDName,
119
		revision:         opts.Revision,
120
		queue:            queue.NewQueue(1 * time.Second),
121
		started:          atomic.NewBool(false),
122
		kinds:            map[config.GroupVersionKind]kclient.Untyped{},
123
		handlers:         map[config.GroupVersionKind][]model.EventHandler{},
124
		client:           client,
125
		logger:           scope.WithLabels("controller", opts.Identifier),
126
		filtersByGVK:     opts.FiltersByGVK,
127
	}
128

129
	for _, s := range out.schemas.All() {
130
		// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
131
		name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
132
		out.addCRD(name)
133
	}
134

135
	return out
136
}
137

138
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
139
	cl.handlers[kind] = append(cl.handlers[kind], handler)
140
}
141

142
// Run the queue and all informers. Callers should  wait for HasSynced() before depending on results.
143
func (cl *Client) Run(stop <-chan struct{}) {
144
	if cl.started.Swap(true) {
145
		// was already started by other thread
146
		return
147
	}
148

149
	t0 := time.Now()
150
	cl.logger.Infof("Starting Pilot K8S CRD controller")
151

152
	if !kube.WaitForCacheSync("crdclient", stop, cl.informerSynced) {
153
		cl.logger.Errorf("Failed to sync Pilot K8S CRD controller cache")
154
		return
155
	}
156
	cl.logger.Infof("Pilot K8S CRD controller synced in %v", time.Since(t0))
157
	cl.queue.Run(stop)
158
	cl.logger.Infof("controller terminated")
159
}
160

161
func (cl *Client) informerSynced() bool {
162
	for gk, ctl := range cl.allKinds() {
163
		if !ctl.HasSynced() {
164
			cl.logger.Infof("controller %q is syncing...", gk)
165
			return false
166
		}
167
	}
168
	return true
169
}
170

171
func (cl *Client) HasSynced() bool {
172
	return cl.queue.HasSynced()
173
}
174

175
// Schemas for the store
176
func (cl *Client) Schemas() collection.Schemas {
177
	return cl.schemas
178
}
179

180
// Get implements store interface
181
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
182
	h, f := cl.kind(typ)
183
	if !f {
184
		cl.logger.Warnf("unknown type: %s", typ)
185
		return nil
186
	}
187
	obj := h.Get(name, namespace)
188
	if obj == nil {
189
		cl.logger.Debugf("couldn't find %s/%s in informer index", namespace, name)
190
		return nil
191
	}
192

193
	cfg := TranslateObject(obj, typ, cl.domainSuffix)
194
	return &cfg
195
}
196

197
// Create implements store interface
198
func (cl *Client) Create(cfg config.Config) (string, error) {
199
	if cfg.Spec == nil {
200
		return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace)
201
	}
202

203
	meta, err := create(cl.client, cfg, getObjectMetadata(cfg))
204
	if err != nil {
205
		return "", err
206
	}
207
	return meta.GetResourceVersion(), nil
208
}
209

210
// Update implements store interface
211
func (cl *Client) Update(cfg config.Config) (string, error) {
212
	if cfg.Spec == nil {
213
		return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace)
214
	}
215

216
	meta, err := update(cl.client, cfg, getObjectMetadata(cfg))
217
	if err != nil {
218
		return "", err
219
	}
220
	return meta.GetResourceVersion(), nil
221
}
222

223
func (cl *Client) UpdateStatus(cfg config.Config) (string, error) {
224
	if cfg.Status == nil {
225
		return "", fmt.Errorf("nil status for %v/%v on updateStatus()", cfg.Name, cfg.Namespace)
226
	}
227

228
	meta, err := updateStatus(cl.client, cfg, getObjectMetadata(cfg))
229
	if err != nil {
230
		return "", err
231
	}
232
	return meta.GetResourceVersion(), nil
233
}
234

235
// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
236
// read-modify-write conflicts when there are many concurrent-writers to the same resource.
237
func (cl *Client) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
238
	modified, patchType := patchFn(orig.DeepCopy())
239

240
	meta, err := patch(cl.client, orig, getObjectMetadata(orig), modified, getObjectMetadata(modified), patchType)
241
	if err != nil {
242
		return "", err
243
	}
244
	return meta.GetResourceVersion(), nil
245
}
246

247
// Delete implements store interface
248
// `resourceVersion` must be matched before deletion is carried out. If not possible, a 409 Conflict status will be
249
func (cl *Client) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
250
	return delete(cl.client, typ, name, namespace, resourceVersion)
251
}
252

253
// List implements store interface
254
func (cl *Client) List(kind config.GroupVersionKind, namespace string) []config.Config {
255
	h, f := cl.kind(kind)
256
	if !f {
257
		return nil
258
	}
259

260
	list := h.List(namespace, klabels.Everything())
261

262
	out := make([]config.Config, 0, len(list))
263
	for _, item := range list {
264
		cfg := TranslateObject(item, kind, cl.domainSuffix)
265
		out = append(out, cfg)
266
	}
267

268
	return out
269
}
270

271
func (cl *Client) allKinds() map[config.GroupVersionKind]kclient.Untyped {
272
	cl.kindsMu.RLock()
273
	defer cl.kindsMu.RUnlock()
274
	return maps.Clone(cl.kinds)
275
}
276

277
func (cl *Client) kind(r config.GroupVersionKind) (kclient.Untyped, bool) {
278
	cl.kindsMu.RLock()
279
	defer cl.kindsMu.RUnlock()
280
	ch, ok := cl.kinds[r]
281
	return ch, ok
282
}
283

284
func TranslateObject(r runtime.Object, gvk config.GroupVersionKind, domainSuffix string) config.Config {
285
	translateFunc, f := translationMap[gvk]
286
	if !f {
287
		scope.Errorf("unknown type %v", gvk)
288
		return config.Config{}
289
	}
290
	c := translateFunc(r)
291
	c.Domain = domainSuffix
292
	return c
293
}
294

295
func getObjectMetadata(config config.Config) metav1.ObjectMeta {
296
	return metav1.ObjectMeta{
297
		Name:            config.Name,
298
		Namespace:       config.Namespace,
299
		Labels:          config.Labels,
300
		Annotations:     config.Annotations,
301
		ResourceVersion: config.ResourceVersion,
302
		OwnerReferences: config.OwnerReferences,
303
		UID:             types.UID(config.UID),
304
	}
305
}
306

307
func genPatchBytes(oldRes, modRes runtime.Object, patchType types.PatchType) ([]byte, error) {
308
	oldJSON, err := json.Marshal(oldRes)
309
	if err != nil {
310
		return nil, fmt.Errorf("failed marhsalling original resource: %v", err)
311
	}
312
	newJSON, err := json.Marshal(modRes)
313
	if err != nil {
314
		return nil, fmt.Errorf("failed marhsalling modified resource: %v", err)
315
	}
316
	switch patchType {
317
	case types.JSONPatchType:
318
		ops, err := jsonpatch.CreatePatch(oldJSON, newJSON)
319
		if err != nil {
320
			return nil, err
321
		}
322
		return json.Marshal(ops)
323
	case types.MergePatchType:
324
		return jsonmerge.CreateMergePatch(oldJSON, newJSON)
325
	default:
326
		return nil, fmt.Errorf("unsupported patch type: %v. must be one of JSONPatchType or MergePatchType", patchType)
327
	}
328
}
329

330
func (cl *Client) addCRD(name string) {
331
	cl.logger.Debugf("adding CRD %q", name)
332
	s, f := cl.schemasByCRDName[name]
333
	if !f {
334
		cl.logger.Debugf("added resource that we are not watching: %v", name)
335
		return
336
	}
337
	resourceGVK := s.GroupVersionKind()
338
	gvr := s.GroupVersionResource()
339

340
	cl.kindsMu.Lock()
341
	defer cl.kindsMu.Unlock()
342
	if _, f := cl.kinds[resourceGVK]; f {
343
		cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
344
		return
345
	}
346

347
	// We need multiple filters:
348
	// 1. Is it in this revision?
349
	// 2. Does it match the discovery selector?
350
	// 3. Does it have a special per-type object filter?
351
	var extraFilter func(obj any) bool
352
	if of, f := cl.filtersByGVK[resourceGVK]; f && of.ObjectFilter != nil {
353
		extraFilter = of.ObjectFilter.Filter
354
	}
355
	filter := kubetypes.Filter{ObjectFilter: composeFilters(kube.FilterIfEnhancedFilteringEnabled(cl.client), cl.inRevision, extraFilter)}
356

357
	var kc kclient.Untyped
358
	if s.IsBuiltin() {
359
		kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
360
	} else {
361
		kc = kclient.NewDelayedInformer[controllers.Object](
362
			cl.client,
363
			gvr,
364
			kubetypes.StandardInformer,
365
			filter,
366
		)
367
	}
368

369
	kind := s.Kind()
370
	kc.AddEventHandler(controllers.EventHandler[controllers.Object]{
371
		AddFunc: func(obj controllers.Object) {
372
			incrementEvent(kind, "add")
373
			cl.queue.Push(func() error {
374
				cl.onEvent(resourceGVK, nil, obj, model.EventAdd)
375
				return nil
376
			})
377
		},
378
		UpdateFunc: func(old, cur controllers.Object) {
379
			incrementEvent(kind, "update")
380
			cl.queue.Push(func() error {
381
				cl.onEvent(resourceGVK, old, cur, model.EventUpdate)
382
				return nil
383
			})
384
		},
385
		DeleteFunc: func(obj controllers.Object) {
386
			incrementEvent(kind, "delete")
387
			cl.queue.Push(func() error {
388
				cl.onEvent(resourceGVK, nil, obj, model.EventDelete)
389
				return nil
390
			})
391
		},
392
	})
393

394
	cl.kinds[resourceGVK] = kc
395
}
396

397
// composedFilter offers a way to join multiple different object filters into a single one
398
type composedFilter struct {
399
	// The primary filter, which has a handler. Optional
400
	filter kubetypes.DynamicObjectFilter
401
	// Secondary filters (no handler allowed)
402
	extra []func(obj any) bool
403
}
404

405
func (f composedFilter) Filter(obj any) bool {
406
	for _, filter := range f.extra {
407
		if !filter(obj) {
408
			return false
409
		}
410
	}
411
	if f.filter != nil {
412
		return f.filter.Filter(obj)
413
	}
414
	return true
415
}
416

417
func (f composedFilter) AddHandler(fn func(selected, deselected sets.String)) {
418
	if f.filter != nil {
419
		f.filter.AddHandler(fn)
420
	}
421
}
422

423
func composeFilters(filter kubetypes.DynamicObjectFilter, extra ...func(obj any) bool) kubetypes.DynamicObjectFilter {
424
	return composedFilter{
425
		filter: filter,
426
		extra: slices.FilterInPlace(extra, func(f func(obj any) bool) bool {
427
			return f != nil
428
		}),
429
	}
430
}
431

432
func (cl *Client) inRevision(obj any) bool {
433
	return config.LabelsInRevision(obj.(controllers.Object).GetLabels(), cl.revision)
434
}
435

436
func (cl *Client) onEvent(resourceGVK config.GroupVersionKind, old controllers.Object, curr controllers.Object, event model.Event) {
437
	currItem := controllers.ExtractObject(curr)
438
	if currItem == nil {
439
		return
440
	}
441

442
	currConfig := TranslateObject(currItem, resourceGVK, cl.domainSuffix)
443

444
	var oldConfig config.Config
445
	if old != nil {
446
		oldConfig = TranslateObject(old, resourceGVK, cl.domainSuffix)
447
	}
448

449
	for _, f := range cl.handlers[resourceGVK] {
450
		f(oldConfig, currConfig, event)
451
	}
452
}
453

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.