24
appsv1 "k8s.io/api/apps/v1"
25
corev1 "k8s.io/api/core/v1"
26
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
28
klabels "k8s.io/apimachinery/pkg/labels"
29
"k8s.io/apimachinery/pkg/runtime/schema"
30
"k8s.io/apimachinery/pkg/types"
31
gateway "sigs.k8s.io/gateway-api/apis/v1beta1"
35
meshapi "istio.io/api/mesh/v1alpha1"
36
"istio.io/istio/pilot/pkg/features"
37
"istio.io/istio/pilot/pkg/model"
38
"istio.io/istio/pkg/cluster"
39
"istio.io/istio/pkg/config/constants"
40
"istio.io/istio/pkg/config/protocol"
41
"istio.io/istio/pkg/config/schema/gvk"
42
"istio.io/istio/pkg/config/schema/gvr"
43
common_features "istio.io/istio/pkg/features"
44
"istio.io/istio/pkg/kube"
45
"istio.io/istio/pkg/kube/controllers"
46
"istio.io/istio/pkg/kube/inject"
47
"istio.io/istio/pkg/kube/kclient"
48
istiolog "istio.io/istio/pkg/log"
49
"istio.io/istio/pkg/revisions"
50
"istio.io/istio/pkg/test/util/tmpl"
51
"istio.io/istio/pkg/test/util/yml"
52
"istio.io/istio/pkg/util/sets"
78
type DeploymentController struct {
81
env *model.Environment
82
queue controllers.Queue
84
gateways kclient.Client[*gateway.Gateway]
85
gatewayClasses kclient.Client[*gateway.GatewayClass]
87
clients map[schema.GroupVersionResource]getter
88
injectConfig func() inject.WebhookConfig
89
deployments kclient.Client[*appsv1.Deployment]
90
services kclient.Client[*corev1.Service]
91
serviceAccounts kclient.Client[*corev1.ServiceAccount]
92
namespaces kclient.Client[*corev1.Namespace]
93
tagWatcher revisions.TagWatcher
98
type patcher func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error
101
type classInfo struct {
110
defaultServiceType corev1.ServiceType
113
disableRouteGeneration bool
116
addressType gateway.AddressType
119
var classInfos = getClassInfos()
121
var builtinClasses = getBuiltinClasses()
123
func getBuiltinClasses() map[gateway.ObjectName]gateway.GatewayController {
124
res := map[gateway.ObjectName]gateway.GatewayController{
125
gateway.ObjectName(features.GatewayAPIDefaultGatewayClass): gateway.GatewayController(features.ManagedGatewayController),
128
if features.MultiNetworkGatewayAPI {
129
res[constants.RemoteGatewayClassName] = constants.UnmanagedGatewayController
132
if features.EnableAmbientWaypoints {
133
res[constants.WaypointGatewayClassName] = constants.ManagedGatewayMeshController
138
func getClassInfos() map[gateway.GatewayController]classInfo {
139
m := map[gateway.GatewayController]classInfo{
140
gateway.GatewayController(features.ManagedGatewayController): {
141
controller: features.ManagedGatewayController,
142
description: "The default Istio GatewayClass",
143
templates: "kube-gateway",
144
defaultServiceType: corev1.ServiceTypeLoadBalancer,
145
addressType: gateway.HostnameAddressType,
149
if features.MultiNetworkGatewayAPI {
150
m[constants.UnmanagedGatewayController] = classInfo{
153
controller: constants.UnmanagedGatewayController,
154
description: "Remote to this cluster. Does not deploy or affect configuration.",
155
disableRouteGeneration: true,
156
addressType: gateway.HostnameAddressType,
159
if features.EnableAmbientWaypoints {
160
m[constants.ManagedGatewayMeshController] = classInfo{
161
controller: constants.ManagedGatewayMeshController,
162
description: "The default Istio waypoint GatewayClass",
163
templates: "waypoint",
164
defaultServiceType: corev1.ServiceTypeClusterIP,
165
addressType: gateway.IPAddressType,
173
func NewDeploymentController(client kube.Client, clusterID cluster.ID, env *model.Environment,
174
webhookConfig func() inject.WebhookConfig, injectionHandler func(fn func()), tw revisions.TagWatcher, revision string,
175
) *DeploymentController {
176
filter := kclient.Filter{ObjectFilter: kube.FilterIfEnhancedFilteringEnabled(client)}
177
gateways := kclient.NewFiltered[*gateway.Gateway](client, filter)
178
gatewayClasses := kclient.New[*gateway.GatewayClass](client)
179
dc := &DeploymentController{
181
clusterID: clusterID,
182
clients: map[schema.GroupVersionResource]getter{},
184
patcher: func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error {
185
c := client.Dynamic().Resource(gvr).Namespace(namespace)
187
_, err := c.Patch(context.Background(), name, types.ApplyPatchType, data, metav1.PatchOptions{
189
FieldManager: features.ManagedGatewayController,
194
gatewayClasses: gatewayClasses,
195
injectConfig: webhookConfig,
199
dc.queue = controllers.NewQueue("gateway deployment",
200
controllers.WithReconciler(dc.Reconcile),
201
controllers.WithMaxAttempts(5))
206
parentHandler := controllers.ObjectHandler(controllers.EnqueueForParentHandler(dc.queue, gvk.KubernetesGateway))
208
dc.services = kclient.NewFiltered[*corev1.Service](client, filter)
209
dc.services.AddEventHandler(parentHandler)
210
dc.clients[gvr.Service] = NewUntypedWrapper(dc.services)
212
dc.deployments = kclient.NewFiltered[*appsv1.Deployment](client, filter)
213
dc.deployments.AddEventHandler(parentHandler)
214
dc.clients[gvr.Deployment] = NewUntypedWrapper(dc.deployments)
216
dc.serviceAccounts = kclient.NewFiltered[*corev1.ServiceAccount](client, filter)
217
dc.serviceAccounts.AddEventHandler(parentHandler)
218
dc.clients[gvr.ServiceAccount] = NewUntypedWrapper(dc.serviceAccounts)
220
dc.namespaces = kclient.NewFiltered[*corev1.Namespace](client, filter)
221
dc.namespaces.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
224
for _, gw := range dc.gateways.List(o.GetName(), klabels.Everything()) {
225
dc.queue.AddObject(gw)
229
gateways.AddEventHandler(controllers.ObjectHandler(dc.queue.AddObject))
230
gatewayClasses.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
231
for _, g := range dc.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
232
if string(g.Spec.GatewayClassName) == o.GetName() {
233
dc.queue.AddObject(g)
239
injectionHandler(func() {
240
for _, gw := range dc.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
241
dc.queue.AddObject(gw)
245
dc.tagWatcher.AddHandler(dc.HandleTagChange)
250
func (d *DeploymentController) Run(stop <-chan struct{}) {
251
kube.WaitForCacheSync(
252
"deployment controller",
254
d.namespaces.HasSynced,
255
d.deployments.HasSynced,
256
d.services.HasSynced,
257
d.serviceAccounts.HasSynced,
258
d.gateways.HasSynced,
259
d.gatewayClasses.HasSynced,
260
d.tagWatcher.HasSynced,
263
controllers.ShutdownAll(d.namespaces, d.deployments, d.services, d.serviceAccounts, d.gateways, d.gatewayClasses)
267
func (d *DeploymentController) Reconcile(req types.NamespacedName) error {
268
log := log.WithLabels("gateway", req)
270
gw := d.gateways.Get(req.Name, req.Namespace)
272
log.Debugf("gateway no longer exists")
279
var controller gateway.GatewayController
280
if gc := d.gatewayClasses.Get(string(gw.Spec.GatewayClassName), ""); gc != nil {
281
controller = gc.Spec.ControllerName
283
if builtin, f := builtinClasses[gw.Spec.GatewayClassName]; f {
287
ci, f := classInfos[controller]
289
log.Debugf("skipping unknown controller %q", controller)
294
selectedTag, ok := gw.Labels[label.IoIstioRev.Name]
296
ns := d.namespaces.Get(gw.Namespace, "")
298
log.Debugf("gateway is not for this revision, skipping")
301
selectedTag = ns.Labels[label.IoIstioRev.Name]
303
myTags := d.tagWatcher.GetMyTags()
304
if !myTags.Contains(selectedTag) && !(selectedTag == "" && myTags.Contains("default")) {
305
log.Debugf("gateway is not for this revision, skipping")
311
return d.configureIstioGateway(log, *gw, ci)
314
func (d *DeploymentController) configureIstioGateway(log *istiolog.Scope, gw gateway.Gateway, gi classInfo) error {
317
if gi.templates == "" {
318
log.Debug("skip gateway class without template")
321
if !IsManaged(&gw.Spec) {
322
log.Debug("skip disabled gateway")
325
existingControllerVersion, overwriteControllerVersion, shouldHandle := ManagedGatewayControllerVersion(gw)
327
log.Debugf("skipping gateway which is managed by controller version %v", existingControllerVersion)
330
log.Info("reconciling")
332
var ns *corev1.Namespace
333
if d.namespaces != nil {
334
ns = d.namespaces.Get(gw.Namespace, "")
336
proxyUID, proxyGID := inject.GetProxyIDs(ns)
338
defaultName := getDefaultName(gw.Name, &gw.Spec)
340
serviceType := gi.defaultServiceType
341
if o, f := gw.Annotations[serviceTypeOverride]; f {
342
serviceType = corev1.ServiceType(o)
346
isWaypointGateway := strings.Contains(string(gw.Spec.GatewayClassName), "waypoint")
349
network := d.injectConfig().Values.Struct().GetGlobal().GetNetwork()
350
if _, ok := gw.GetLabels()[label.TopologyNetwork.Name]; !ok && network != "" && isWaypointGateway {
351
if gw.Labels == nil {
352
gw.Labels = make(map[string]string)
354
gw.Labels[label.TopologyNetwork.Name] = d.injectConfig().Values.Struct().GetGlobal().GetNetwork()
358
var hasAmbientAnnotation bool
359
if _, ok := gw.Annotations[constants.AmbientRedirection]; ok {
360
hasAmbientAnnotation = true
362
if gw.Spec.Infrastructure != nil {
363
if _, ok := gw.Spec.Infrastructure.Annotations[constants.AmbientRedirection]; ok {
364
hasAmbientAnnotation = true
367
if features.EnableAmbientWaypoints && !isWaypointGateway && !hasAmbientAnnotation {
368
if gw.Annotations == nil {
369
gw.Annotations = make(map[string]string)
371
gw.Annotations[constants.AmbientRedirection] = constants.AmbientRedirectionDisabled
374
input := TemplateInput{
376
DeploymentName: model.GetOrDefault(gw.Annotations[gatewayNameOverride], defaultName),
377
ServiceAccount: model.GetOrDefault(gw.Annotations[gatewaySAOverride], defaultName),
378
Ports: extractServicePorts(gw),
379
ClusterID: d.clusterID.String(),
381
KubeVersion: kube.GetVersionAsInt(d.client),
382
Revision: d.revision,
383
ServiceType: serviceType,
386
CompliancePolicy: common_features.CompliancePolicy,
387
InfrastructureLabels: gw.GetLabels(),
388
InfrastructureAnnotations: gw.GetAnnotations(),
391
d.setGatewayNameLabel(&input)
393
gwInfra := gw.Spec.Infrastructure
394
if gwInfra != nil && gwInfra.Labels != nil {
395
infraLabels := make(map[string]string, len(gwInfra.Labels))
396
for k, v := range gw.Spec.Infrastructure.Labels {
397
if strings.HasPrefix(string(k), "gateway.networking.k8s.io/") {
400
infraLabels[string(k)] = string(v)
406
if _, ok := infraLabels[label.TopologyNetwork.Name]; !ok && network != "" && isWaypointGateway {
407
infraLabels[label.TopologyNetwork.Name] = network
410
input.InfrastructureLabels = infraLabels
413
if gwInfra != nil && gwInfra.Annotations != nil {
414
infraAnnotations := make(map[string]string, len(gwInfra.Annotations))
415
for k, v := range gw.Spec.Infrastructure.Annotations {
416
if strings.HasPrefix(string(k), "gateway.networking.k8s.io/") {
419
infraAnnotations[string(k)] = string(v)
421
input.InfrastructureAnnotations = infraAnnotations
424
if overwriteControllerVersion {
425
log.Debugf("write controller version, existing=%v", existingControllerVersion)
426
if err := d.setGatewayControllerVersion(gw); err != nil {
427
return fmt.Errorf("update gateway annotation: %v", err)
430
log.Debugf("controller version existing=%v, no action needed", existingControllerVersion)
433
rendered, err := d.render(gi.templates, input)
435
return fmt.Errorf("failed to render template: %v", err)
437
for _, t := range rendered {
438
if err := d.apply(gi.controller, t); err != nil {
439
return fmt.Errorf("apply failed: %v", err)
443
log.Info("gateway updated")
455
ControllerVersionAnnotation = "gateway.istio.io/controller-version"
463
ControllerVersion = 5
469
func ManagedGatewayControllerVersion(gw gateway.Gateway) (existing string, takeOver bool, manage bool) {
470
cur, f := gw.Annotations[ControllerVersionAnnotation]
473
return "", true, true
475
curNum, err := strconv.Atoi(cur)
479
return cur, false, false
481
if curNum > ControllerVersion {
483
return cur, false, false
485
if curNum == ControllerVersion {
488
return cur, false, true
492
return cur, true, true
495
type derivedInput struct {
500
ProxyConfig *meshapi.ProxyConfig
501
MeshConfig *meshapi.MeshConfig
502
Values map[string]any
505
func (d *DeploymentController) render(templateName string, mi TemplateInput) ([]string, error) {
506
cfg := d.injectConfig()
508
template := cfg.Templates[templateName]
510
return nil, fmt.Errorf("no %q template defined", templateName)
513
labelToMatch := map[string]string{constants.GatewayNameLabel: mi.Name, constants.DeprecatedGatewayNameLabel: mi.Name}
514
proxyConfig := d.env.GetProxyConfigOrDefault(mi.Namespace, labelToMatch, nil, cfg.MeshConfig)
515
input := derivedInput{
517
ProxyImage: inject.ProxyImage(
519
proxyConfig.GetImage(),
522
ProxyConfig: proxyConfig,
523
MeshConfig: cfg.MeshConfig,
524
Values: cfg.Values.Map(),
526
results, err := tmpl.Execute(template, input)
531
return yml.SplitString(results), nil
534
func (d *DeploymentController) setGatewayControllerVersion(gws gateway.Gateway) error {
535
patch := fmt.Sprintf(`{"apiVersion":"gateway.networking.k8s.io/v1beta1","kind":"Gateway","metadata":{"annotations":{"%s":"%d"}}}`,
536
ControllerVersionAnnotation, ControllerVersion)
538
log.Debugf("applying %v", patch)
539
return d.patcher(gvr.KubernetesGateway, gws.GetName(), gws.GetNamespace(), []byte(patch))
543
func (d *DeploymentController) apply(controller string, yml string) error {
544
data := map[string]any{}
545
err := yaml.Unmarshal([]byte(yml), &data)
549
us := unstructured.Unstructured{Object: data}
551
clabel := strings.ReplaceAll(controller, "/", "-")
552
err = unstructured.SetNestedField(us.Object, clabel, "metadata", "labels", constants.ManagedGatewayLabel)
556
gvr, err := controllers.UnstructuredToGVR(us)
560
j, err := json.Marshal(us.Object)
564
canManage, resourceVersion := d.canManage(gvr, us.GetName(), us.GetNamespace())
566
log.Debugf("skipping %v/%v/%v, already managed", gvr, us.GetName(), us.GetNamespace())
570
us.SetResourceVersion(resourceVersion)
572
log.Debugf("applying %v", string(j))
573
if err := d.patcher(gvr, us.GetName(), us.GetNamespace(), j); err != nil {
574
return fmt.Errorf("patch %v/%v/%v: %v", us.GroupVersionKind(), us.GetNamespace(), us.GetName(), err)
579
func (d *DeploymentController) HandleTagChange(newTags sets.String) {
580
for _, gw := range d.gateways.List(metav1.NamespaceAll, klabels.Everything()) {
581
d.queue.AddObject(gw)
589
func (d *DeploymentController) canManage(gvr schema.GroupVersionResource, name, namespace string) (bool, string) {
590
store, f := d.clients[gvr]
592
log.Warnf("unknown GVR %v", gvr)
597
obj := store.Get(name, namespace)
602
_, managed := obj.GetLabels()[constants.ManagedGatewayLabel]
604
return managed, obj.GetResourceVersion()
609
func (d *DeploymentController) setGatewayNameLabel(ti *TemplateInput) {
610
ti.GatewayNameLabel = constants.GatewayNameLabel
611
store, f := d.clients[gvr.Deployment]
613
log.Warnf("deployment gvr not found in deployment controller clients; defaulting to the new gateway name label")
616
dep := store.Get(ti.DeploymentName, ti.Namespace)
618
log.Debugf("deployment %s/%s not found in store; using to the new gateway name label", ti.DeploymentName, ti.Namespace)
623
_, exists := dep.(*appsv1.Deployment).Spec.Selector.MatchLabels[constants.DeprecatedGatewayNameLabel]
630
ti.GatewayNameLabel = constants.DeprecatedGatewayNameLabel
633
type TemplateInput struct {
635
DeploymentName string
636
ServiceAccount string
637
Ports []corev1.ServicePort
638
ServiceType corev1.ServiceType
644
CompliancePolicy string
645
InfrastructureLabels map[string]string
646
InfrastructureAnnotations map[string]string
647
GatewayNameLabel string
650
func extractServicePorts(gw gateway.Gateway) []corev1.ServicePort {
651
tcp := strings.ToLower(string(protocol.TCP))
652
svcPorts := make([]corev1.ServicePort, 0, len(gw.Spec.Listeners)+1)
653
svcPorts = append(svcPorts, corev1.ServicePort{
658
portNums := sets.New[int32]()
659
for i, l := range gw.Spec.Listeners {
660
if portNums.Contains(int32(l.Port)) {
663
portNums.Insert(int32(l.Port))
664
name := string(l.Name)
667
name = fmt.Sprintf("%s-%d", strings.ToLower(string(l.Protocol)), i)
669
appProtocol := strings.ToLower(string(l.Protocol))
670
svcPorts = append(svcPorts, corev1.ServicePort{
673
AppProtocol: &appProtocol,
680
type UntypedWrapper[T controllers.ComparableObject] struct {
681
reader kclient.Reader[T]
683
type getter interface {
684
Get(name, namespace string) controllers.Object
687
func NewUntypedWrapper[T controllers.ComparableObject](c kclient.Client[T]) getter {
688
return UntypedWrapper[T]{c}
691
func (u UntypedWrapper[T]) Get(name, namespace string) controllers.Object {
693
res := u.reader.Get(name, namespace)
694
if controllers.IsNil(res) {
700
var _ getter = UntypedWrapper[*corev1.Service]{}