1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
15
// Package crdclient provides an implementation of the config store and cache
16
// using Kubernetes Custom Resources and the informer framework from Kubernetes
18
// This code relies heavily on code generation for performance reasons; to implement the
19
// Istio store interface, we need to take dynamic inputs. Using the dynamic informers results in poor
20
// performance, as the cache will store unstructured objects which need to be marshaled on each Get/List call.
21
// Using istio/client-go directly will cache objects marshaled, allowing us to have cheap Get/List calls,
22
// at the expense of some code gen.
30
jsonmerge "github.com/evanphx/json-patch/v5"
32
"gomodules.xyz/jsonpatch/v2"
33
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
klabels "k8s.io/apimachinery/pkg/labels"
35
"k8s.io/apimachinery/pkg/runtime"
36
"k8s.io/apimachinery/pkg/types"
37
"k8s.io/apimachinery/pkg/util/json"
38
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // import GKE cluster authentication plugin
39
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc" // import OIDC cluster authentication plugin, e.g. for Tectonic
41
"istio.io/istio/pilot/pkg/features"
42
"istio.io/istio/pilot/pkg/model"
43
"istio.io/istio/pkg/config"
44
"istio.io/istio/pkg/config/schema/collection"
45
"istio.io/istio/pkg/config/schema/collections"
46
"istio.io/istio/pkg/config/schema/resource"
47
"istio.io/istio/pkg/kube"
48
"istio.io/istio/pkg/kube/controllers"
49
"istio.io/istio/pkg/kube/kclient"
50
"istio.io/istio/pkg/kube/kubetypes"
51
"istio.io/istio/pkg/log"
52
"istio.io/istio/pkg/maps"
53
"istio.io/istio/pkg/queue"
54
"istio.io/istio/pkg/slices"
55
"istio.io/istio/pkg/util/sets"
58
var scope = log.RegisterScope("kube", "Kubernetes client messages")
60
// Client is a client for Istio CRDs, implementing config store cache
61
// This is used for CRUD operators on Istio configuration, as well as handling of events on config changes
63
// schemas defines the set of schemas used by this client.
64
// Note: this must be a subset of the schemas defined in the codegen
65
schemas collection.Schemas
67
// domainSuffix for the config metadata
70
// revision for this control plane instance. We will only read configs that match this revision.
73
// kinds keeps track of all cache handlers for known types
74
kinds map[config.GroupVersionKind]kclient.Untyped
77
// a flag indicates whether this client has been run, it is to prevent run queue twice
80
// handlers defines a list of event handlers per-type
81
handlers map[config.GroupVersionKind][]model.EventHandler
83
schemasByCRDName map[string]resource.Schema
87
// namespacesFilter is only used to initiate filtered informer.
88
filtersByGVK map[config.GroupVersionKind]kubetypes.Filter
95
FiltersByGVK map[config.GroupVersionKind]kubetypes.Filter
98
var _ model.ConfigStoreController = &Client{}
100
func New(client kube.Client, opts Option) *Client {
101
schemas := collections.Pilot
102
if features.EnableGatewayAPI {
103
schemas = collections.PilotGatewayAPI()
105
return NewForSchemas(client, opts, schemas)
108
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) *Client {
109
schemasByCRDName := map[string]resource.Schema{}
110
for _, s := range schemas.All() {
111
// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
112
name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
113
schemasByCRDName[name] = s
116
domainSuffix: opts.DomainSuffix,
118
schemasByCRDName: schemasByCRDName,
119
revision: opts.Revision,
120
queue: queue.NewQueue(1 * time.Second),
121
started: atomic.NewBool(false),
122
kinds: map[config.GroupVersionKind]kclient.Untyped{},
123
handlers: map[config.GroupVersionKind][]model.EventHandler{},
125
logger: scope.WithLabels("controller", opts.Identifier),
126
filtersByGVK: opts.FiltersByGVK,
129
for _, s := range out.schemas.All() {
130
// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
131
name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
138
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
139
cl.handlers[kind] = append(cl.handlers[kind], handler)
142
// Run the queue and all informers. Callers should wait for HasSynced() before depending on results.
143
func (cl *Client) Run(stop <-chan struct{}) {
144
if cl.started.Swap(true) {
145
// was already started by other thread
150
cl.logger.Infof("Starting Pilot K8S CRD controller")
152
if !kube.WaitForCacheSync("crdclient", stop, cl.informerSynced) {
153
cl.logger.Errorf("Failed to sync Pilot K8S CRD controller cache")
156
cl.logger.Infof("Pilot K8S CRD controller synced in %v", time.Since(t0))
158
cl.logger.Infof("controller terminated")
161
func (cl *Client) informerSynced() bool {
162
for gk, ctl := range cl.allKinds() {
163
if !ctl.HasSynced() {
164
cl.logger.Infof("controller %q is syncing...", gk)
171
func (cl *Client) HasSynced() bool {
172
return cl.queue.HasSynced()
175
// Schemas for the store
176
func (cl *Client) Schemas() collection.Schemas {
180
// Get implements store interface
181
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
184
cl.logger.Warnf("unknown type: %s", typ)
187
obj := h.Get(name, namespace)
189
cl.logger.Debugf("couldn't find %s/%s in informer index", namespace, name)
193
cfg := TranslateObject(obj, typ, cl.domainSuffix)
197
// Create implements store interface
198
func (cl *Client) Create(cfg config.Config) (string, error) {
200
return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace)
203
meta, err := create(cl.client, cfg, getObjectMetadata(cfg))
207
return meta.GetResourceVersion(), nil
210
// Update implements store interface
211
func (cl *Client) Update(cfg config.Config) (string, error) {
213
return "", fmt.Errorf("nil spec for %v/%v", cfg.Name, cfg.Namespace)
216
meta, err := update(cl.client, cfg, getObjectMetadata(cfg))
220
return meta.GetResourceVersion(), nil
223
func (cl *Client) UpdateStatus(cfg config.Config) (string, error) {
224
if cfg.Status == nil {
225
return "", fmt.Errorf("nil status for %v/%v on updateStatus()", cfg.Name, cfg.Namespace)
228
meta, err := updateStatus(cl.client, cfg, getObjectMetadata(cfg))
232
return meta.GetResourceVersion(), nil
235
// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
236
// read-modify-write conflicts when there are many concurrent-writers to the same resource.
237
func (cl *Client) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
238
modified, patchType := patchFn(orig.DeepCopy())
240
meta, err := patch(cl.client, orig, getObjectMetadata(orig), modified, getObjectMetadata(modified), patchType)
244
return meta.GetResourceVersion(), nil
247
// Delete implements store interface
248
// `resourceVersion` must be matched before deletion is carried out. If not possible, a 409 Conflict status will be
249
func (cl *Client) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
250
return delete(cl.client, typ, name, namespace, resourceVersion)
253
// List implements store interface
254
func (cl *Client) List(kind config.GroupVersionKind, namespace string) []config.Config {
255
h, f := cl.kind(kind)
260
list := h.List(namespace, klabels.Everything())
262
out := make([]config.Config, 0, len(list))
263
for _, item := range list {
264
cfg := TranslateObject(item, kind, cl.domainSuffix)
265
out = append(out, cfg)
271
func (cl *Client) allKinds() map[config.GroupVersionKind]kclient.Untyped {
273
defer cl.kindsMu.RUnlock()
274
return maps.Clone(cl.kinds)
277
func (cl *Client) kind(r config.GroupVersionKind) (kclient.Untyped, bool) {
279
defer cl.kindsMu.RUnlock()
280
ch, ok := cl.kinds[r]
284
func TranslateObject(r runtime.Object, gvk config.GroupVersionKind, domainSuffix string) config.Config {
285
translateFunc, f := translationMap[gvk]
287
scope.Errorf("unknown type %v", gvk)
288
return config.Config{}
290
c := translateFunc(r)
291
c.Domain = domainSuffix
295
func getObjectMetadata(config config.Config) metav1.ObjectMeta {
296
return metav1.ObjectMeta{
298
Namespace: config.Namespace,
299
Labels: config.Labels,
300
Annotations: config.Annotations,
301
ResourceVersion: config.ResourceVersion,
302
OwnerReferences: config.OwnerReferences,
303
UID: types.UID(config.UID),
307
func genPatchBytes(oldRes, modRes runtime.Object, patchType types.PatchType) ([]byte, error) {
308
oldJSON, err := json.Marshal(oldRes)
310
return nil, fmt.Errorf("failed marhsalling original resource: %v", err)
312
newJSON, err := json.Marshal(modRes)
314
return nil, fmt.Errorf("failed marhsalling modified resource: %v", err)
317
case types.JSONPatchType:
318
ops, err := jsonpatch.CreatePatch(oldJSON, newJSON)
322
return json.Marshal(ops)
323
case types.MergePatchType:
324
return jsonmerge.CreateMergePatch(oldJSON, newJSON)
326
return nil, fmt.Errorf("unsupported patch type: %v. must be one of JSONPatchType or MergePatchType", patchType)
330
func (cl *Client) addCRD(name string) {
331
cl.logger.Debugf("adding CRD %q", name)
332
s, f := cl.schemasByCRDName[name]
334
cl.logger.Debugf("added resource that we are not watching: %v", name)
337
resourceGVK := s.GroupVersionKind()
338
gvr := s.GroupVersionResource()
341
defer cl.kindsMu.Unlock()
342
if _, f := cl.kinds[resourceGVK]; f {
343
cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
347
// We need multiple filters:
348
// 1. Is it in this revision?
349
// 2. Does it match the discovery selector?
350
// 3. Does it have a special per-type object filter?
351
var extraFilter func(obj any) bool
352
if of, f := cl.filtersByGVK[resourceGVK]; f && of.ObjectFilter != nil {
353
extraFilter = of.ObjectFilter.Filter
355
filter := kubetypes.Filter{ObjectFilter: composeFilters(kube.FilterIfEnhancedFilteringEnabled(cl.client), cl.inRevision, extraFilter)}
357
var kc kclient.Untyped
359
kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
361
kc = kclient.NewDelayedInformer[controllers.Object](
364
kubetypes.StandardInformer,
370
kc.AddEventHandler(controllers.EventHandler[controllers.Object]{
371
AddFunc: func(obj controllers.Object) {
372
incrementEvent(kind, "add")
373
cl.queue.Push(func() error {
374
cl.onEvent(resourceGVK, nil, obj, model.EventAdd)
378
UpdateFunc: func(old, cur controllers.Object) {
379
incrementEvent(kind, "update")
380
cl.queue.Push(func() error {
381
cl.onEvent(resourceGVK, old, cur, model.EventUpdate)
385
DeleteFunc: func(obj controllers.Object) {
386
incrementEvent(kind, "delete")
387
cl.queue.Push(func() error {
388
cl.onEvent(resourceGVK, nil, obj, model.EventDelete)
394
cl.kinds[resourceGVK] = kc
397
// composedFilter offers a way to join multiple different object filters into a single one
398
type composedFilter struct {
399
// The primary filter, which has a handler. Optional
400
filter kubetypes.DynamicObjectFilter
401
// Secondary filters (no handler allowed)
402
extra []func(obj any) bool
405
func (f composedFilter) Filter(obj any) bool {
406
for _, filter := range f.extra {
412
return f.filter.Filter(obj)
417
func (f composedFilter) AddHandler(fn func(selected, deselected sets.String)) {
419
f.filter.AddHandler(fn)
423
func composeFilters(filter kubetypes.DynamicObjectFilter, extra ...func(obj any) bool) kubetypes.DynamicObjectFilter {
424
return composedFilter{
426
extra: slices.FilterInPlace(extra, func(f func(obj any) bool) bool {
432
func (cl *Client) inRevision(obj any) bool {
433
return config.LabelsInRevision(obj.(controllers.Object).GetLabels(), cl.revision)
436
func (cl *Client) onEvent(resourceGVK config.GroupVersionKind, old controllers.Object, curr controllers.Object, event model.Event) {
437
currItem := controllers.ExtractObject(curr)
442
currConfig := TranslateObject(currItem, resourceGVK, cl.domainSuffix)
444
var oldConfig config.Config
446
oldConfig = TranslateObject(old, resourceGVK, cl.domainSuffix)
449
for _, f := range cl.handlers[resourceGVK] {
450
f(oldConfig, currConfig, event)