istio

Форк
0
/
deploymentcontroller.go 
700 строк · 26.0 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package gateway
16

17
import (
18
	"context"
19
	"encoding/json"
20
	"fmt"
21
	"strconv"
22
	"strings"
23

24
	appsv1 "k8s.io/api/apps/v1"
25
	corev1 "k8s.io/api/core/v1"
26
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
28
	klabels "k8s.io/apimachinery/pkg/labels"
29
	"k8s.io/apimachinery/pkg/runtime/schema"
30
	"k8s.io/apimachinery/pkg/types"
31
	gateway "sigs.k8s.io/gateway-api/apis/v1beta1"
32
	"sigs.k8s.io/yaml"
33

34
	"istio.io/api/label"
35
	meshapi "istio.io/api/mesh/v1alpha1"
36
	"istio.io/istio/pilot/pkg/features"
37
	"istio.io/istio/pilot/pkg/model"
38
	"istio.io/istio/pkg/cluster"
39
	"istio.io/istio/pkg/config/constants"
40
	"istio.io/istio/pkg/config/protocol"
41
	"istio.io/istio/pkg/config/schema/gvk"
42
	"istio.io/istio/pkg/config/schema/gvr"
43
	common_features "istio.io/istio/pkg/features"
44
	"istio.io/istio/pkg/kube"
45
	"istio.io/istio/pkg/kube/controllers"
46
	"istio.io/istio/pkg/kube/inject"
47
	"istio.io/istio/pkg/kube/kclient"
48
	istiolog "istio.io/istio/pkg/log"
49
	"istio.io/istio/pkg/revisions"
50
	"istio.io/istio/pkg/test/util/tmpl"
51
	"istio.io/istio/pkg/test/util/yml"
52
	"istio.io/istio/pkg/util/sets"
53
)
54

55
// DeploymentController implements a controller that materializes a Gateway into an in cluster gateway proxy
56
// to serve requests from. This is implemented with a Deployment and Service today.
57
// The implementation makes a few non-obvious choices - namely using Server Side Apply from go templates
58
// and not using controller-runtime.
59
//
60
// controller-runtime has a number of constraints that make it inappropriate for usage here, despite this
61
// seeming to be the bread and butter of the library:
62
// * It is not readily possible to bring existing Informers, which would require extra watches (#1668)
63
// * Goroutine leaks (#1655)
64
// * Excessive API-server calls at startup which have no benefit to us (#1603)
65
// * Hard to use with SSA (#1669)
66
// While these can be worked around, at some point it isn't worth the effort.
67
//
68
// Server Side Apply with go templates is an odd choice (no one likes YAML templating...) but is one of the few
69
// remaining options after all others are ruled out.
70
//   - Merge patch/Update cannot be used. If we always enforce that our object is *exactly* the same as
71
//     the in-cluster object we will get in endless loops due to other controllers that like to add annotations, etc.
72
//     If we chose to allow any unknown fields, then we would never be able to remove fields we added, as
73
//     we cannot tell if we created it or someone else did. SSA fixes these issues
74
//   - SSA using client-go Apply libraries is almost a good choice, but most third-party clients (Istio, MCS, and gateway-api)
75
//     do not provide these libraries.
76
//   - SSA using standard API types doesn't work well either: https://github.com/kubernetes-sigs/controller-runtime/issues/1669
77
//   - This leaves YAML templates, converted to unstructured types and Applied with the dynamic client.
78
type DeploymentController struct {
79
	client         kube.Client
80
	clusterID      cluster.ID
81
	env            *model.Environment
82
	queue          controllers.Queue
83
	patcher        patcher
84
	gateways       kclient.Client[*gateway.Gateway]
85
	gatewayClasses kclient.Client[*gateway.GatewayClass]
86

87
	clients         map[schema.GroupVersionResource]getter
88
	injectConfig    func() inject.WebhookConfig
89
	deployments     kclient.Client[*appsv1.Deployment]
90
	services        kclient.Client[*corev1.Service]
91
	serviceAccounts kclient.Client[*corev1.ServiceAccount]
92
	namespaces      kclient.Client[*corev1.Namespace]
93
	tagWatcher      revisions.TagWatcher
94
	revision        string
95
}
96

97
// Patcher is a function that abstracts patching logic. This is largely because client-go fakes do not handle patching
98
type patcher func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error
99

100
// classInfo holds information about a gateway class
101
type classInfo struct {
102
	// controller name for this class
103
	controller string
104
	// description for this class
105
	description string
106
	// The key in the templates to use for this class
107
	templates string
108

109
	// defaultServiceType sets the default service type if one is not explicit set
110
	defaultServiceType corev1.ServiceType
111

112
	// disableRouteGeneration, if set, will make it so the controller ignores this class.
113
	disableRouteGeneration bool
114

115
	// addressType is the default address type to report
116
	addressType gateway.AddressType
117
}
118

119
var classInfos = getClassInfos()
120

121
var builtinClasses = getBuiltinClasses()
122

123
func getBuiltinClasses() map[gateway.ObjectName]gateway.GatewayController {
124
	res := map[gateway.ObjectName]gateway.GatewayController{
125
		gateway.ObjectName(features.GatewayAPIDefaultGatewayClass): gateway.GatewayController(features.ManagedGatewayController),
126
	}
127

128
	if features.MultiNetworkGatewayAPI {
129
		res[constants.RemoteGatewayClassName] = constants.UnmanagedGatewayController
130
	}
131

132
	if features.EnableAmbientWaypoints {
133
		res[constants.WaypointGatewayClassName] = constants.ManagedGatewayMeshController
134
	}
135
	return res
136
}
137

138
func getClassInfos() map[gateway.GatewayController]classInfo {
139
	m := map[gateway.GatewayController]classInfo{
140
		gateway.GatewayController(features.ManagedGatewayController): {
141
			controller:         features.ManagedGatewayController,
142
			description:        "The default Istio GatewayClass",
143
			templates:          "kube-gateway",
144
			defaultServiceType: corev1.ServiceTypeLoadBalancer,
145
			addressType:        gateway.HostnameAddressType,
146
		},
147
	}
148

149
	if features.MultiNetworkGatewayAPI {
150
		m[constants.UnmanagedGatewayController] = classInfo{
151
			// This represents a gateway that our control plane cannot discover directly via the API server.
152
			// We shouldn't generate Istio resources for it. We aren't programming this gateway.
153
			controller:             constants.UnmanagedGatewayController,
154
			description:            "Remote to this cluster. Does not deploy or affect configuration.",
155
			disableRouteGeneration: true,
156
			addressType:            gateway.HostnameAddressType,
157
		}
158
	}
159
	if features.EnableAmbientWaypoints {
160
		m[constants.ManagedGatewayMeshController] = classInfo{
161
			controller:         constants.ManagedGatewayMeshController,
162
			description:        "The default Istio waypoint GatewayClass",
163
			templates:          "waypoint",
164
			defaultServiceType: corev1.ServiceTypeClusterIP,
165
			addressType:        gateway.IPAddressType,
166
		}
167
	}
168
	return m
169
}
170

171
// NewDeploymentController constructs a DeploymentController and registers required informers.
172
// The controller will not start until Run() is called.
173
func NewDeploymentController(client kube.Client, clusterID cluster.ID, env *model.Environment,
174
	webhookConfig func() inject.WebhookConfig, injectionHandler func(fn func()), tw revisions.TagWatcher, revision string,
175
) *DeploymentController {
176
	filter := kclient.Filter{ObjectFilter: kube.FilterIfEnhancedFilteringEnabled(client)}
177
	gateways := kclient.NewFiltered[*gateway.Gateway](client, filter)
178
	gatewayClasses := kclient.New[*gateway.GatewayClass](client)
179
	dc := &DeploymentController{
180
		client:    client,
181
		clusterID: clusterID,
182
		clients:   map[schema.GroupVersionResource]getter{},
183
		env:       env,
184
		patcher: func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error {
185
			c := client.Dynamic().Resource(gvr).Namespace(namespace)
186
			t := true
187
			_, err := c.Patch(context.Background(), name, types.ApplyPatchType, data, metav1.PatchOptions{
188
				Force:        &t,
189
				FieldManager: features.ManagedGatewayController,
190
			}, subresources...)
191
			return err
192
		},
193
		gateways:       gateways,
194
		gatewayClasses: gatewayClasses,
195
		injectConfig:   webhookConfig,
196
		tagWatcher:     tw,
197
		revision:       revision,
198
	}
199
	dc.queue = controllers.NewQueue("gateway deployment",
200
		controllers.WithReconciler(dc.Reconcile),
201
		controllers.WithMaxAttempts(5))
202

203
	// Set up a handler that will add the parent Gateway object onto the queue.
204
	// The queue will only handle Gateway objects; if child resources (Service, etc) are updated we re-add
205
	// the Gateway to the queue and reconcile the state of the world.
206
	parentHandler := controllers.ObjectHandler(controllers.EnqueueForParentHandler(dc.queue, gvk.KubernetesGateway))
207

208
	dc.services = kclient.NewFiltered[*corev1.Service](client, filter)
209
	dc.services.AddEventHandler(parentHandler)
210
	dc.clients[gvr.Service] = NewUntypedWrapper(dc.services)
211

212
	dc.deployments = kclient.NewFiltered[*appsv1.Deployment](client, filter)
213
	dc.deployments.AddEventHandler(parentHandler)
214
	dc.clients[gvr.Deployment] = NewUntypedWrapper(dc.deployments)
215

216
	dc.serviceAccounts = kclient.NewFiltered[*corev1.ServiceAccount](client, filter)
217
	dc.serviceAccounts.AddEventHandler(parentHandler)
218
	dc.clients[gvr.ServiceAccount] = NewUntypedWrapper(dc.serviceAccounts)
219

220
	dc.namespaces = kclient.NewFiltered[*corev1.Namespace](client, filter)
221
	dc.namespaces.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
222
		// TODO: make this more intelligent, checking if something we care about has changed
223
		// requeue this namespace
224
		for _, gw := range dc.gateways.List(o.GetName(), klabels.Everything()) {
225
			dc.queue.AddObject(gw)
226
		}
227
	}))
228

229
	gateways.AddEventHandler(controllers.ObjectHandler(dc.queue.AddObject))
230
	gatewayClasses.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
231
		for _, g := range dc.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
232
			if string(g.Spec.GatewayClassName) == o.GetName() {
233
				dc.queue.AddObject(g)
234
			}
235
		}
236
	}))
237

238
	// On injection template change, requeue all gateways
239
	injectionHandler(func() {
240
		for _, gw := range dc.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
241
			dc.queue.AddObject(gw)
242
		}
243
	})
244

245
	dc.tagWatcher.AddHandler(dc.HandleTagChange)
246

247
	return dc
248
}
249

250
func (d *DeploymentController) Run(stop <-chan struct{}) {
251
	kube.WaitForCacheSync(
252
		"deployment controller",
253
		stop,
254
		d.namespaces.HasSynced,
255
		d.deployments.HasSynced,
256
		d.services.HasSynced,
257
		d.serviceAccounts.HasSynced,
258
		d.gateways.HasSynced,
259
		d.gatewayClasses.HasSynced,
260
		d.tagWatcher.HasSynced,
261
	)
262
	d.queue.Run(stop)
263
	controllers.ShutdownAll(d.namespaces, d.deployments, d.services, d.serviceAccounts, d.gateways, d.gatewayClasses)
264
}
265

266
// Reconcile takes in the name of a Gateway and ensures the cluster is in the desired state
267
func (d *DeploymentController) Reconcile(req types.NamespacedName) error {
268
	log := log.WithLabels("gateway", req)
269

270
	gw := d.gateways.Get(req.Name, req.Namespace)
271
	if gw == nil {
272
		log.Debugf("gateway no longer exists")
273
		// we'll ignore not-found errors, since they can't be fixed by an immediate
274
		// requeue (we'll need to wait for a new notification), and we can get them
275
		// on deleted requests.
276
		return nil
277
	}
278

279
	var controller gateway.GatewayController
280
	if gc := d.gatewayClasses.Get(string(gw.Spec.GatewayClassName), ""); gc != nil {
281
		controller = gc.Spec.ControllerName
282
	} else {
283
		if builtin, f := builtinClasses[gw.Spec.GatewayClassName]; f {
284
			controller = builtin
285
		}
286
	}
287
	ci, f := classInfos[controller]
288
	if !f {
289
		log.Debugf("skipping unknown controller %q", controller)
290
		return nil
291
	}
292

293
	// find the tag or revision indicated by the object
294
	selectedTag, ok := gw.Labels[label.IoIstioRev.Name]
295
	if !ok {
296
		ns := d.namespaces.Get(gw.Namespace, "")
297
		if ns == nil {
298
			log.Debugf("gateway is not for this revision, skipping")
299
			return nil
300
		}
301
		selectedTag = ns.Labels[label.IoIstioRev.Name]
302
	}
303
	myTags := d.tagWatcher.GetMyTags()
304
	if !myTags.Contains(selectedTag) && !(selectedTag == "" && myTags.Contains("default")) {
305
		log.Debugf("gateway is not for this revision, skipping")
306
		return nil
307
	}
308
	// TODO: Here we could check if the tag is set and matches no known tags, and handle that if we are default.
309

310
	// Matched class, reconcile it
311
	return d.configureIstioGateway(log, *gw, ci)
312
}
313

314
func (d *DeploymentController) configureIstioGateway(log *istiolog.Scope, gw gateway.Gateway, gi classInfo) error {
315
	// If user explicitly sets addresses, we are assuming they are pointing to an existing deployment.
316
	// We will not manage it in this case
317
	if gi.templates == "" {
318
		log.Debug("skip gateway class without template")
319
		return nil
320
	}
321
	if !IsManaged(&gw.Spec) {
322
		log.Debug("skip disabled gateway")
323
		return nil
324
	}
325
	existingControllerVersion, overwriteControllerVersion, shouldHandle := ManagedGatewayControllerVersion(gw)
326
	if !shouldHandle {
327
		log.Debugf("skipping gateway which is managed by controller version %v", existingControllerVersion)
328
		return nil
329
	}
330
	log.Info("reconciling")
331

332
	var ns *corev1.Namespace
333
	if d.namespaces != nil {
334
		ns = d.namespaces.Get(gw.Namespace, "")
335
	}
336
	proxyUID, proxyGID := inject.GetProxyIDs(ns)
337

338
	defaultName := getDefaultName(gw.Name, &gw.Spec)
339

340
	serviceType := gi.defaultServiceType
341
	if o, f := gw.Annotations[serviceTypeOverride]; f {
342
		serviceType = corev1.ServiceType(o)
343
	}
344

345
	// TODO: Codify this API (i.e how to know if a specific gateway is an Istio waypoint gateway)
346
	isWaypointGateway := strings.Contains(string(gw.Spec.GatewayClassName), "waypoint")
347

348
	// Default the network label for waypoints if not explicitly set in gateway's labels
349
	network := d.injectConfig().Values.Struct().GetGlobal().GetNetwork()
350
	if _, ok := gw.GetLabels()[label.TopologyNetwork.Name]; !ok && network != "" && isWaypointGateway {
351
		if gw.Labels == nil {
352
			gw.Labels = make(map[string]string)
353
		}
354
		gw.Labels[label.TopologyNetwork.Name] = d.injectConfig().Values.Struct().GetGlobal().GetNetwork()
355
	}
356

357
	// Disable ambient redirection for kube-gateway if there is no explicit setting
358
	var hasAmbientAnnotation bool
359
	if _, ok := gw.Annotations[constants.AmbientRedirection]; ok {
360
		hasAmbientAnnotation = true
361
	}
362
	if gw.Spec.Infrastructure != nil {
363
		if _, ok := gw.Spec.Infrastructure.Annotations[constants.AmbientRedirection]; ok {
364
			hasAmbientAnnotation = true
365
		}
366
	}
367
	if features.EnableAmbientWaypoints && !isWaypointGateway && !hasAmbientAnnotation {
368
		if gw.Annotations == nil {
369
			gw.Annotations = make(map[string]string)
370
		}
371
		gw.Annotations[constants.AmbientRedirection] = constants.AmbientRedirectionDisabled
372
	}
373

374
	input := TemplateInput{
375
		Gateway:        &gw,
376
		DeploymentName: model.GetOrDefault(gw.Annotations[gatewayNameOverride], defaultName),
377
		ServiceAccount: model.GetOrDefault(gw.Annotations[gatewaySAOverride], defaultName),
378
		Ports:          extractServicePorts(gw),
379
		ClusterID:      d.clusterID.String(),
380

381
		KubeVersion:               kube.GetVersionAsInt(d.client),
382
		Revision:                  d.revision,
383
		ServiceType:               serviceType,
384
		ProxyUID:                  proxyUID,
385
		ProxyGID:                  proxyGID,
386
		CompliancePolicy:          common_features.CompliancePolicy,
387
		InfrastructureLabels:      gw.GetLabels(),
388
		InfrastructureAnnotations: gw.GetAnnotations(),
389
	}
390

391
	d.setGatewayNameLabel(&input)
392
	// Default to the gateway labels/annotations and overwrite if infrastructure labels/annotations are set
393
	gwInfra := gw.Spec.Infrastructure
394
	if gwInfra != nil && gwInfra.Labels != nil {
395
		infraLabels := make(map[string]string, len(gwInfra.Labels))
396
		for k, v := range gw.Spec.Infrastructure.Labels {
397
			if strings.HasPrefix(string(k), "gateway.networking.k8s.io/") {
398
				continue // ignore this prefix to avoid conflicts
399
			}
400
			infraLabels[string(k)] = string(v)
401
		}
402

403
		// Default the network label for waypoints if not explicitly set in infra labels
404
		// We do this a second time here for correctness since if infra labels are set (according to the gwapi spec),
405
		// the gateway's labels are ignored.
406
		if _, ok := infraLabels[label.TopologyNetwork.Name]; !ok && network != "" && isWaypointGateway {
407
			infraLabels[label.TopologyNetwork.Name] = network
408
		}
409

410
		input.InfrastructureLabels = infraLabels
411
	}
412

413
	if gwInfra != nil && gwInfra.Annotations != nil {
414
		infraAnnotations := make(map[string]string, len(gwInfra.Annotations))
415
		for k, v := range gw.Spec.Infrastructure.Annotations {
416
			if strings.HasPrefix(string(k), "gateway.networking.k8s.io/") {
417
				continue // ignore this prefix to avoid conflicts
418
			}
419
			infraAnnotations[string(k)] = string(v)
420
		}
421
		input.InfrastructureAnnotations = infraAnnotations
422
	}
423

424
	if overwriteControllerVersion {
425
		log.Debugf("write controller version, existing=%v", existingControllerVersion)
426
		if err := d.setGatewayControllerVersion(gw); err != nil {
427
			return fmt.Errorf("update gateway annotation: %v", err)
428
		}
429
	} else {
430
		log.Debugf("controller version existing=%v, no action needed", existingControllerVersion)
431
	}
432

433
	rendered, err := d.render(gi.templates, input)
434
	if err != nil {
435
		return fmt.Errorf("failed to render template: %v", err)
436
	}
437
	for _, t := range rendered {
438
		if err := d.apply(gi.controller, t); err != nil {
439
			return fmt.Errorf("apply failed: %v", err)
440
		}
441
	}
442

443
	log.Info("gateway updated")
444
	return nil
445
}
446

447
const (
448
	// ControllerVersionAnnotation is an annotation added to the Gateway by the controller specifying
449
	// the "controller version". The original intent of this was to work around
450
	// https://github.com/istio/istio/issues/44164, where we needed to transition from a global owner
451
	// to a per-revision owner. The newer version number allows forcing ownership, even if the other
452
	// version was otherwise expected to control the Gateway.
453
	// The version number has no meaning other than "larger numbers win".
454
	// Numbers are used to future-proof in case we need to do another migration in the future.
455
	ControllerVersionAnnotation = "gateway.istio.io/controller-version"
456
	// ControllerVersion is the current version of our controller logic. Known versions are:
457
	//
458
	// * 1.17 and older: version 1 OR no version at all, depending on patch release
459
	// * 1.18+: version 5
460
	//
461
	// 2, 3, and 4 were intentionally skipped to allow for the (unlikely) event we need to insert
462
	// another version between these
463
	ControllerVersion = 5
464
)
465

466
// ManagedGatewayControllerVersion determines the version of the controller managing this Gateway,
467
// and if we should manage this.
468
// See ControllerVersionAnnotation for motivations.
469
func ManagedGatewayControllerVersion(gw gateway.Gateway) (existing string, takeOver bool, manage bool) {
470
	cur, f := gw.Annotations[ControllerVersionAnnotation]
471
	if !f {
472
		// No current owner, we should take it over.
473
		return "", true, true
474
	}
475
	curNum, err := strconv.Atoi(cur)
476
	if err != nil {
477
		// We cannot parse it - must be some new schema we don't know about. We should assume we do not manage it.
478
		// In theory, this should never happen, unless we decide a number was a bad idea in the future.
479
		return cur, false, false
480
	}
481
	if curNum > ControllerVersion {
482
		// A newer version owns this gateway, let them handle it
483
		return cur, false, false
484
	}
485
	if curNum == ControllerVersion {
486
		// We already manage this at this version
487
		// We will manage it, but no need to attempt to apply the version annotation, which could race with newer versions
488
		return cur, false, true
489
	}
490
	// We are either newer or the same version of the last owner - we can take over. We need to actually
491
	// re-apply the annotation
492
	return cur, true, true
493
}
494

495
type derivedInput struct {
496
	TemplateInput
497

498
	// Inserted from injection config
499
	ProxyImage  string
500
	ProxyConfig *meshapi.ProxyConfig
501
	MeshConfig  *meshapi.MeshConfig
502
	Values      map[string]any
503
}
504

505
func (d *DeploymentController) render(templateName string, mi TemplateInput) ([]string, error) {
506
	cfg := d.injectConfig()
507

508
	template := cfg.Templates[templateName]
509
	if template == nil {
510
		return nil, fmt.Errorf("no %q template defined", templateName)
511
	}
512

513
	labelToMatch := map[string]string{constants.GatewayNameLabel: mi.Name, constants.DeprecatedGatewayNameLabel: mi.Name}
514
	proxyConfig := d.env.GetProxyConfigOrDefault(mi.Namespace, labelToMatch, nil, cfg.MeshConfig)
515
	input := derivedInput{
516
		TemplateInput: mi,
517
		ProxyImage: inject.ProxyImage(
518
			cfg.Values.Struct(),
519
			proxyConfig.GetImage(),
520
			mi.Annotations,
521
		),
522
		ProxyConfig: proxyConfig,
523
		MeshConfig:  cfg.MeshConfig,
524
		Values:      cfg.Values.Map(),
525
	}
526
	results, err := tmpl.Execute(template, input)
527
	if err != nil {
528
		return nil, err
529
	}
530

531
	return yml.SplitString(results), nil
532
}
533

534
func (d *DeploymentController) setGatewayControllerVersion(gws gateway.Gateway) error {
535
	patch := fmt.Sprintf(`{"apiVersion":"gateway.networking.k8s.io/v1beta1","kind":"Gateway","metadata":{"annotations":{"%s":"%d"}}}`,
536
		ControllerVersionAnnotation, ControllerVersion)
537

538
	log.Debugf("applying %v", patch)
539
	return d.patcher(gvr.KubernetesGateway, gws.GetName(), gws.GetNamespace(), []byte(patch))
540
}
541

542
// apply server-side applies a template to the cluster.
543
func (d *DeploymentController) apply(controller string, yml string) error {
544
	data := map[string]any{}
545
	err := yaml.Unmarshal([]byte(yml), &data)
546
	if err != nil {
547
		return err
548
	}
549
	us := unstructured.Unstructured{Object: data}
550
	// set managed-by label
551
	clabel := strings.ReplaceAll(controller, "/", "-")
552
	err = unstructured.SetNestedField(us.Object, clabel, "metadata", "labels", constants.ManagedGatewayLabel)
553
	if err != nil {
554
		return err
555
	}
556
	gvr, err := controllers.UnstructuredToGVR(us)
557
	if err != nil {
558
		return err
559
	}
560
	j, err := json.Marshal(us.Object)
561
	if err != nil {
562
		return err
563
	}
564
	canManage, resourceVersion := d.canManage(gvr, us.GetName(), us.GetNamespace())
565
	if !canManage {
566
		log.Debugf("skipping %v/%v/%v, already managed", gvr, us.GetName(), us.GetNamespace())
567
		return nil
568
	}
569
	// Ensure our canManage assertion is not stale
570
	us.SetResourceVersion(resourceVersion)
571

572
	log.Debugf("applying %v", string(j))
573
	if err := d.patcher(gvr, us.GetName(), us.GetNamespace(), j); err != nil {
574
		return fmt.Errorf("patch %v/%v/%v: %v", us.GroupVersionKind(), us.GetNamespace(), us.GetName(), err)
575
	}
576
	return nil
577
}
578

579
func (d *DeploymentController) HandleTagChange(newTags sets.String) {
580
	for _, gw := range d.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
581
		d.queue.AddObject(gw)
582
	}
583
}
584

585
// canManage checks if a resource we are about to write should be managed by us. If the resource already exists
586
// but does not have the ManagedGatewayLabel, we won't overwrite it.
587
// This ensures we don't accidentally take over some resource we weren't supposed to, which could cause outages.
588
// Note K8s doesn't have a perfect way to "conditionally SSA", but its close enough (https://github.com/kubernetes/kubernetes/issues/116156).
589
func (d *DeploymentController) canManage(gvr schema.GroupVersionResource, name, namespace string) (bool, string) {
590
	store, f := d.clients[gvr]
591
	if !f {
592
		log.Warnf("unknown GVR %v", gvr)
593
		// Even though we don't know what it is, allow users to put the resource. We won't be able to
594
		// protect against overwrites though.
595
		return true, ""
596
	}
597
	obj := store.Get(name, namespace)
598
	if obj == nil {
599
		// no object, we can manage it
600
		return true, ""
601
	}
602
	_, managed := obj.GetLabels()[constants.ManagedGatewayLabel]
603
	// If object already exists, we can only manage it if it has the label
604
	return managed, obj.GetResourceVersion()
605
}
606

607
// setGatewayNameLabel sets either the new or deprecated gateway name label
608
// based on the template input
609
func (d *DeploymentController) setGatewayNameLabel(ti *TemplateInput) {
610
	ti.GatewayNameLabel = constants.GatewayNameLabel // default to the new gateway name label
611
	store, f := d.clients[gvr.Deployment]            // Use deployment since those matchlabels are immutable
612
	if !f {
613
		log.Warnf("deployment gvr not found in deployment controller clients; defaulting to the new gateway name label")
614
		return
615
	}
616
	dep := store.Get(ti.DeploymentName, ti.Namespace)
617
	if dep == nil {
618
		log.Debugf("deployment %s/%s not found in store; using to the new gateway name label", ti.DeploymentName, ti.Namespace)
619
		return
620
	}
621

622
	// Base label choice on the deployment's selector
623
	_, exists := dep.(*appsv1.Deployment).Spec.Selector.MatchLabels[constants.DeprecatedGatewayNameLabel]
624
	if !exists {
625
		// The old label doesn't already exist on the deployment; use the new label
626
		return
627
	}
628

629
	// The old label exists on the deployment; use the old label
630
	ti.GatewayNameLabel = constants.DeprecatedGatewayNameLabel
631
}
632

633
type TemplateInput struct {
634
	*gateway.Gateway
635
	DeploymentName            string
636
	ServiceAccount            string
637
	Ports                     []corev1.ServicePort
638
	ServiceType               corev1.ServiceType
639
	ClusterID                 string
640
	KubeVersion               int
641
	Revision                  string
642
	ProxyUID                  int64
643
	ProxyGID                  int64
644
	CompliancePolicy          string
645
	InfrastructureLabels      map[string]string
646
	InfrastructureAnnotations map[string]string
647
	GatewayNameLabel          string
648
}
649

650
func extractServicePorts(gw gateway.Gateway) []corev1.ServicePort {
651
	tcp := strings.ToLower(string(protocol.TCP))
652
	svcPorts := make([]corev1.ServicePort, 0, len(gw.Spec.Listeners)+1)
653
	svcPorts = append(svcPorts, corev1.ServicePort{
654
		Name:        "status-port",
655
		Port:        int32(15021),
656
		AppProtocol: &tcp,
657
	})
658
	portNums := sets.New[int32]()
659
	for i, l := range gw.Spec.Listeners {
660
		if portNums.Contains(int32(l.Port)) {
661
			continue
662
		}
663
		portNums.Insert(int32(l.Port))
664
		name := string(l.Name)
665
		if name == "" {
666
			// Should not happen since name is required, but in case an invalid resource gets in...
667
			name = fmt.Sprintf("%s-%d", strings.ToLower(string(l.Protocol)), i)
668
		}
669
		appProtocol := strings.ToLower(string(l.Protocol))
670
		svcPorts = append(svcPorts, corev1.ServicePort{
671
			Name:        name,
672
			Port:        int32(l.Port),
673
			AppProtocol: &appProtocol,
674
		})
675
	}
676
	return svcPorts
677
}
678

679
// UntypedWrapper wraps a typed reader to an untyped one, since Go cannot do it automatically.
680
type UntypedWrapper[T controllers.ComparableObject] struct {
681
	reader kclient.Reader[T]
682
}
683
type getter interface {
684
	Get(name, namespace string) controllers.Object
685
}
686

687
func NewUntypedWrapper[T controllers.ComparableObject](c kclient.Client[T]) getter {
688
	return UntypedWrapper[T]{c}
689
}
690

691
func (u UntypedWrapper[T]) Get(name, namespace string) controllers.Object {
692
	// DO NOT return u.reader.Get directly, or we run into issues with https://go.dev/tour/methods/12
693
	res := u.reader.Get(name, namespace)
694
	if controllers.IsNil(res) {
695
		return nil
696
	}
697
	return res
698
}
699

700
var _ getter = UntypedWrapper[*corev1.Service]{}
701

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.