istio

Форк
0
1178 строк · 39.9 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"fmt"
19
	"sort"
20
	"sync"
21
	"time"
22

23
	"github.com/hashicorp/go-multierror"
24
	"go.uber.org/atomic"
25
	v1 "k8s.io/api/core/v1"
26
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
	klabels "k8s.io/apimachinery/pkg/labels"
28
	"k8s.io/apimachinery/pkg/types"
29

30
	"istio.io/api/label"
31
	"istio.io/istio/pilot/pkg/features"
32
	"istio.io/istio/pilot/pkg/model"
33
	"istio.io/istio/pilot/pkg/serviceregistry"
34
	"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
35
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
36
	"istio.io/istio/pilot/pkg/serviceregistry/kube/controller/ambient"
37
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
38
	labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
39
	"istio.io/istio/pilot/pkg/serviceregistry/util/workloadinstances"
40
	"istio.io/istio/pkg/cluster"
41
	"istio.io/istio/pkg/config"
42
	"istio.io/istio/pkg/config/host"
43
	"istio.io/istio/pkg/config/labels"
44
	"istio.io/istio/pkg/config/mesh"
45
	"istio.io/istio/pkg/config/protocol"
46
	"istio.io/istio/pkg/config/visibility"
47
	kubelib "istio.io/istio/pkg/kube"
48
	"istio.io/istio/pkg/kube/controllers"
49
	"istio.io/istio/pkg/kube/kclient"
50
	istiolog "istio.io/istio/pkg/log"
51
	"istio.io/istio/pkg/maps"
52
	"istio.io/istio/pkg/monitoring"
53
	"istio.io/istio/pkg/network"
54
	"istio.io/istio/pkg/ptr"
55
	"istio.io/istio/pkg/queue"
56
	"istio.io/istio/pkg/slices"
57
)
58

59
const (
60
	// NodeRegionLabel is the well-known label for kubernetes node region in beta
61
	NodeRegionLabel = v1.LabelFailureDomainBetaRegion
62
	// NodeZoneLabel is the well-known label for kubernetes node zone in beta
63
	NodeZoneLabel = v1.LabelFailureDomainBetaZone
64
	// NodeRegionLabelGA is the well-known label for kubernetes node region in ga
65
	NodeRegionLabelGA = v1.LabelTopologyRegion
66
	// NodeZoneLabelGA is the well-known label for kubernetes node zone in ga
67
	NodeZoneLabelGA = v1.LabelTopologyZone
68

69
	// DefaultNetworkGatewayPort is the port used by default for cross-network traffic if not otherwise specified
70
	// by meshNetworks or "networking.istio.io/gatewayPort"
71
	DefaultNetworkGatewayPort = 15443
72
)
73

74
var log = istiolog.RegisterScope("kube", "kubernetes service registry controller")
75

76
var (
77
	typeTag  = monitoring.CreateLabel("type")
78
	eventTag = monitoring.CreateLabel("event")
79

80
	k8sEvents = monitoring.NewSum(
81
		"pilot_k8s_reg_events",
82
		"Events from k8s registry.",
83
	)
84

85
	// nolint: gocritic
86
	// This is deprecated in favor of `pilot_k8s_endpoints_pending_pod`, which is a gauge indicating the number of
87
	// currently missing pods. This helps distinguish transient errors from permanent ones
88
	endpointsWithNoPods = monitoring.NewSum(
89
		"pilot_k8s_endpoints_with_no_pods",
90
		"Endpoints that does not have any corresponding pods.")
91

92
	endpointsPendingPodUpdate = monitoring.NewGauge(
93
		"pilot_k8s_endpoints_pending_pod",
94
		"Number of endpoints that do not currently have any corresponding pods.",
95
	)
96
)
97

98
func incrementEvent(kind, event string) {
99
	if kind == "" || event == "" {
100
		return
101
	}
102
	k8sEvents.With(typeTag.Value(kind), eventTag.Value(event)).Increment()
103
}
104

105
// Options stores the configurable attributes of a Controller.
106
type Options struct {
107
	SystemNamespace string
108

109
	// MeshServiceController is a mesh-wide service Controller.
110
	MeshServiceController *aggregate.Controller
111

112
	DomainSuffix string
113

114
	// ClusterID identifies the cluster which the controller communicate with.
115
	ClusterID cluster.ID
116

117
	// ClusterAliases are alias names for cluster. When a proxy connects with a cluster ID
118
	// and if it has a different alias we should use that a cluster ID for proxy.
119
	ClusterAliases map[string]string
120

121
	// Metrics for capturing node-based metrics.
122
	Metrics model.Metrics
123

124
	// XDSUpdater will push changes to the xDS server.
125
	XDSUpdater model.XDSUpdater
126

127
	// MeshNetworksWatcher observes changes to the mesh networks config.
128
	MeshNetworksWatcher mesh.NetworksWatcher
129

130
	// MeshWatcher observes changes to the mesh config
131
	MeshWatcher mesh.Watcher
132

133
	// Maximum QPS when communicating with kubernetes API
134
	KubernetesAPIQPS float32
135

136
	// Maximum burst for throttle when communicating with the kubernetes API
137
	KubernetesAPIBurst int
138

139
	// SyncTimeout, if set, causes HasSynced to be returned when timeout.
140
	SyncTimeout time.Duration
141

142
	// Revision of this Istiod instance
143
	Revision string
144

145
	ConfigCluster bool
146
}
147

148
// kubernetesNode represents a kubernetes node that is reachable externally
149
type kubernetesNode struct {
150
	address string
151
	labels  labels.Instance
152
}
153

154
// controllerInterface is a simplified interface for the Controller used for testing.
155
type controllerInterface interface {
156
	getPodLocality(pod *v1.Pod) string
157
	Network(endpointIP string, labels labels.Instance) network.ID
158
	Cluster() cluster.ID
159
}
160

161
var (
162
	_ controllerInterface      = &Controller{}
163
	_ serviceregistry.Instance = &Controller{}
164
)
165

166
type ambientIndex = ambient.Index
167

168
// Controller is a collection of synchronized resource watchers
169
// Caches are thread-safe
170
type Controller struct {
171
	opts Options
172

173
	client kubelib.Client
174

175
	queue queue.Instance
176

177
	namespaces kclient.Client[*v1.Namespace]
178
	services   kclient.Client[*v1.Service]
179

180
	endpoints *endpointSliceController
181

182
	// Used to watch node accessible from remote cluster.
183
	// In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type.
184
	// With this, we can populate mesh's gateway address with the node ips.
185
	nodes kclient.Client[*v1.Node]
186

187
	exports serviceExportCache
188
	imports serviceImportCache
189
	pods    *PodCache
190

191
	crdHandlers                []func(name string)
192
	handlers                   model.ControllerHandlers
193
	namespaceDiscoveryHandlers []func(ns string, event model.Event)
194

195
	// This is only used for test
196
	stop chan struct{}
197

198
	sync.RWMutex
199
	// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
200
	servicesMap map[host.Name]*model.Service
201
	// nodeSelectorsForServices stores hostname => label selectors that can be used to
202
	// refine the set of node port IPs for a service.
203
	nodeSelectorsForServices map[host.Name]labels.Instance
204
	// map of node name and its address+labels - this is the only thing we need from nodes
205
	// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
206
	// we run through the label selectors here to pick only ones that we need.
207
	// Only nodes with ExternalIP addresses are included in this map !
208
	nodeInfoMap map[string]kubernetesNode
209
	// index over workload instances from workload entries
210
	workloadInstancesIndex workloadinstances.Index
211

212
	*networkManager
213

214
	ambientIndex
215

216
	// initialSyncTimedout is set to true after performing an initial processing timed out.
217
	initialSyncTimedout *atomic.Bool
218
	meshWatcher         mesh.Watcher
219

220
	podsClient kclient.Client[*v1.Pod]
221

222
	configCluster bool
223

224
	networksHandlerRegistration *mesh.WatcherHandlerRegistration
225
	meshHandlerRegistration     *mesh.WatcherHandlerRegistration
226
}
227

228
// NewController creates a new Kubernetes controller
229
// Created by bootstrap and multicluster (see multicluster.Controller).
230
func NewController(kubeClient kubelib.Client, options Options) *Controller {
231
	c := &Controller{
232
		opts:                     options,
233
		client:                   kubeClient,
234
		queue:                    queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
235
		servicesMap:              make(map[host.Name]*model.Service),
236
		nodeSelectorsForServices: make(map[host.Name]labels.Instance),
237
		nodeInfoMap:              make(map[string]kubernetesNode),
238
		workloadInstancesIndex:   workloadinstances.NewIndex(),
239
		initialSyncTimedout:      atomic.NewBool(false),
240

241
		configCluster: options.ConfigCluster,
242
	}
243
	c.networkManager = initNetworkManager(c, options)
244

245
	c.namespaces = kclient.NewFiltered[*v1.Namespace](kubeClient, kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
246

247
	if c.opts.SystemNamespace != "" {
248
		registerHandlers[*v1.Namespace](
249
			c,
250
			c.namespaces,
251
			"Namespaces",
252
			func(old *v1.Namespace, cur *v1.Namespace, event model.Event) error {
253
				if cur.Name == c.opts.SystemNamespace {
254
					return c.onSystemNamespaceEvent(old, cur, event)
255
				}
256
				return nil
257
			},
258
			nil,
259
		)
260
	}
261

262
	c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
263

264
	registerHandlers[*v1.Service](c, c.services, "Services", c.onServiceEvent, nil)
265

266
	c.endpoints = newEndpointSliceController(c)
267

268
	// This is for getting the node IPs of a selected set of nodes
269
	c.nodes = kclient.NewFiltered[*v1.Node](kubeClient, kclient.Filter{ObjectTransform: kubelib.StripNodeUnusedFields})
270
	registerHandlers[*v1.Node](c, c.nodes, "Nodes", c.onNodeEvent, nil)
271

272
	c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
273
		ObjectFilter:    kubeClient.ObjectFilter(),
274
		ObjectTransform: kubelib.StripPodUnusedFields,
275
	})
276
	c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
277
		c.queue.Push(func() error {
278
			return c.endpoints.podArrived(key.Name, key.Namespace)
279
		})
280
	})
281
	registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, c.pods.labelFilter)
282

283
	if features.EnableAmbientControllers {
284
		c.ambientIndex = ambient.New(ambient.Options{
285
			Client:          kubeClient,
286
			SystemNamespace: options.SystemNamespace,
287
			DomainSuffix:    options.DomainSuffix,
288
			ClusterID:       options.ClusterID,
289
			Revision:        options.Revision,
290
			XDSUpdater:      options.XDSUpdater,
291
			LookupNetwork:   c.Network,
292
		})
293
	}
294
	c.exports = newServiceExportCache(c)
295
	c.imports = newServiceImportCache(c)
296

297
	c.meshWatcher = options.MeshWatcher
298
	if c.opts.MeshNetworksWatcher != nil {
299
		c.networksHandlerRegistration = c.opts.MeshNetworksWatcher.AddNetworksHandler(func() {
300
			c.reloadMeshNetworks()
301
			c.onNetworkChange()
302
		})
303
		c.reloadMeshNetworks()
304
	}
305
	return c
306
}
307

308
func (c *Controller) Provider() provider.ID {
309
	return provider.Kubernetes
310
}
311

312
func (c *Controller) Cluster() cluster.ID {
313
	return c.opts.ClusterID
314
}
315

316
func (c *Controller) MCSServices() []model.MCSServiceInfo {
317
	outMap := make(map[types.NamespacedName]model.MCSServiceInfo)
318

319
	// Add the ServiceExport info.
320
	for _, se := range c.exports.ExportedServices() {
321
		mcsService := outMap[se.namespacedName]
322
		mcsService.Cluster = c.Cluster()
323
		mcsService.Name = se.namespacedName.Name
324
		mcsService.Namespace = se.namespacedName.Namespace
325
		mcsService.Exported = true
326
		mcsService.Discoverability = se.discoverability
327
		outMap[se.namespacedName] = mcsService
328
	}
329

330
	// Add the ServiceImport info.
331
	for _, si := range c.imports.ImportedServices() {
332
		mcsService := outMap[si.namespacedName]
333
		mcsService.Cluster = c.Cluster()
334
		mcsService.Name = si.namespacedName.Name
335
		mcsService.Namespace = si.namespacedName.Namespace
336
		mcsService.Imported = true
337
		mcsService.ClusterSetVIP = si.clusterSetVIP
338
		outMap[si.namespacedName] = mcsService
339
	}
340

341
	return maps.Values(outMap)
342
}
343

344
func (c *Controller) Network(endpointIP string, labels labels.Instance) network.ID {
345
	// 1. check the pod/workloadEntry label
346
	if nw := labels[label.TopologyNetwork.Name]; nw != "" {
347
		return network.ID(nw)
348
	}
349

350
	// 2. check the system namespace labels
351
	if nw := c.networkFromSystemNamespace(); nw != "" {
352
		return nw
353
	}
354

355
	// 3. check the meshNetworks config
356
	if nw := c.networkFromMeshNetworks(endpointIP); nw != "" {
357
		return nw
358
	}
359

360
	return ""
361
}
362

363
func (c *Controller) Cleanup() error {
364
	if err := queue.WaitForClose(c.queue, 30*time.Second); err != nil {
365
		log.Warnf("queue for removed kube registry %q may not be done processing: %v", c.Cluster(), err)
366
	}
367
	if c.opts.XDSUpdater != nil {
368
		c.opts.XDSUpdater.RemoveShard(model.ShardKeyFromRegistry(c))
369
	}
370

371
	// Unregister networks handler
372
	if c.networksHandlerRegistration != nil {
373
		c.opts.MeshNetworksWatcher.DeleteNetworksHandler(c.networksHandlerRegistration)
374
	}
375

376
	// Unregister mesh handler
377
	if c.meshHandlerRegistration != nil {
378
		c.opts.MeshWatcher.DeleteMeshHandler(c.meshHandlerRegistration)
379
	}
380

381
	return nil
382
}
383

384
func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) error {
385
	log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)
386

387
	// Create the standard (cluster.local) service.
388
	svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster())
389

390
	switch event {
391
	case model.EventDelete:
392
		c.deleteService(svcConv)
393
	default:
394
		c.addOrUpdateService(pre, curr, svcConv, event, false)
395
	}
396

397
	return nil
398
}
399

400
func (c *Controller) deleteService(svc *model.Service) {
401
	c.Lock()
402
	delete(c.servicesMap, svc.Hostname)
403
	delete(c.nodeSelectorsForServices, svc.Hostname)
404
	_, isNetworkGateway := c.networkGatewaysBySvc[svc.Hostname]
405
	delete(c.networkGatewaysBySvc, svc.Hostname)
406
	c.Unlock()
407

408
	if isNetworkGateway {
409
		c.NotifyGatewayHandlers()
410
		// TODO trigger push via handler
411
		// networks are different, we need to update all eds endpoints
412
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
413
	}
414

415
	shard := model.ShardKeyFromRegistry(c)
416
	event := model.EventDelete
417
	c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, event)
418

419
	c.handlers.NotifyServiceHandlers(nil, svc, event)
420
}
421

422
func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv *model.Service, event model.Event, updateEDSCache bool) {
423
	needsFullPush := false
424
	// First, process nodePort gateway service, whose externalIPs specified
425
	// and loadbalancer gateway service
426
	if currConv.Attributes.ClusterExternalAddresses.Len() > 0 {
427
		needsFullPush = c.extractGatewaysFromService(currConv)
428
	} else if isNodePortGatewayService(curr) {
429
		// We need to know which services are using node selectors because during node events,
430
		// we have to update all the node port services accordingly.
431
		nodeSelector := getNodeSelectorsForService(curr)
432
		c.Lock()
433
		// only add when it is nodePort gateway service
434
		c.nodeSelectorsForServices[currConv.Hostname] = nodeSelector
435
		c.Unlock()
436
		needsFullPush = c.updateServiceNodePortAddresses(currConv)
437
	}
438

439
	// For ExternalName, we need to update the EndpointIndex, as we will store endpoints just based on the Service.
440
	if !features.EnableExternalNameAlias && curr != nil && curr.Spec.Type == v1.ServiceTypeExternalName {
441
		updateEDSCache = true
442
	}
443

444
	c.Lock()
445
	prevConv := c.servicesMap[currConv.Hostname]
446
	c.servicesMap[currConv.Hostname] = currConv
447
	c.Unlock()
448
	// This full push needed to update ALL ends endpoints, even though we do a full push on service add/update
449
	// as that full push is only triggered for the specific service.
450
	if needsFullPush {
451
		// networks are different, we need to update all eds endpoints
452
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
453
	}
454

455
	shard := model.ShardKeyFromRegistry(c)
456
	ns := currConv.Attributes.Namespace
457
	// We also need to update when the Service changes. For Kubernetes, a service change will result in Endpoint updates,
458
	// but workload entries will also need to be updated.
459
	// TODO(nmittler): Build different sets of endpoints for cluster.local and clusterset.local.
460
	if updateEDSCache || features.EnableK8SServiceSelectWorkloadEntries {
461
		endpoints := c.buildEndpointsForService(currConv, updateEDSCache)
462
		if len(endpoints) > 0 {
463
			c.opts.XDSUpdater.EDSCacheUpdate(shard, string(currConv.Hostname), ns, endpoints)
464
		}
465
	}
466

467
	// filter out same service event
468
	if event == model.EventUpdate && !serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
469
		return
470
	}
471

472
	c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
473
	c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
474
}
475

476
func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache bool) []*model.IstioEndpoint {
477
	endpoints := c.endpoints.buildIstioEndpointsWithService(svc.Attributes.Name, svc.Attributes.Namespace, svc.Hostname, updateCache)
478
	if features.EnableK8SServiceSelectWorkloadEntries {
479
		fep := c.collectWorkloadInstanceEndpoints(svc)
480
		endpoints = append(endpoints, fep...)
481
	}
482
	if !features.EnableExternalNameAlias {
483
		endpoints = append(endpoints, kube.ExternalNameEndpoints(svc)...)
484
	}
485
	return endpoints
486
}
487

488
func (c *Controller) onNodeEvent(_, node *v1.Node, event model.Event) error {
489
	var updatedNeeded bool
490
	if event == model.EventDelete {
491
		updatedNeeded = true
492
		c.Lock()
493
		delete(c.nodeInfoMap, node.Name)
494
		c.Unlock()
495
	} else {
496
		k8sNode := kubernetesNode{labels: node.Labels}
497
		for _, address := range node.Status.Addresses {
498
			if address.Type == v1.NodeExternalIP && address.Address != "" {
499
				k8sNode.address = address.Address
500
				break
501
			}
502
		}
503
		if k8sNode.address == "" {
504
			return nil
505
		}
506

507
		c.Lock()
508
		// check if the node exists as this add event could be due to controller resync
509
		// if the stored object changes, then fire an update event. Otherwise, ignore this event.
510
		currentNode, exists := c.nodeInfoMap[node.Name]
511
		if !exists || !nodeEquals(currentNode, k8sNode) {
512
			c.nodeInfoMap[node.Name] = k8sNode
513
			updatedNeeded = true
514
		}
515
		c.Unlock()
516
	}
517

518
	// update all related services
519
	if updatedNeeded && c.updateServiceNodePortAddresses() {
520
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
521
			Full:   true,
522
			Reason: model.NewReasonStats(model.ServiceUpdate),
523
		})
524
	}
525
	return nil
526
}
527

528
// FilterOutFunc func for filtering out objects during update callback
529
type FilterOutFunc[T controllers.Object] func(old, cur T) bool
530

531
// registerHandlers registers a handler for a given informer
532
// Note: `otype` is used for metric, if empty, no metric will be reported
533
func registerHandlers[T controllers.ComparableObject](c *Controller,
534
	informer kclient.Informer[T], otype string,
535
	handler func(T, T, model.Event) error, filter FilterOutFunc[T],
536
) {
537
	wrappedHandler := func(prev, curr T, event model.Event) error {
538
		curr = informer.Get(curr.GetName(), curr.GetNamespace())
539
		if controllers.IsNil(curr) {
540
			// this can happen when an immediate delete after update
541
			// the delete event can be handled later
542
			return nil
543
		}
544
		return handler(prev, curr, event)
545
	}
546
	informer.AddEventHandler(
547
		controllers.EventHandler[T]{
548
			AddFunc: func(obj T) {
549
				incrementEvent(otype, "add")
550
				c.queue.Push(func() error {
551
					return wrappedHandler(ptr.Empty[T](), obj, model.EventAdd)
552
				})
553
			},
554
			UpdateFunc: func(old, cur T) {
555
				if filter != nil {
556
					if filter(old, cur) {
557
						incrementEvent(otype, "updatesame")
558
						return
559
					}
560
				}
561
				incrementEvent(otype, "update")
562
				c.queue.Push(func() error {
563
					return wrappedHandler(old, cur, model.EventUpdate)
564
				})
565
			},
566
			DeleteFunc: func(obj T) {
567
				incrementEvent(otype, "delete")
568
				c.queue.Push(func() error {
569
					return handler(ptr.Empty[T](), obj, model.EventDelete)
570
				})
571
			},
572
		})
573
}
574

575
// HasSynced returns true after the initial state synchronization
576
func (c *Controller) HasSynced() bool {
577
	return c.queue.HasSynced() || c.initialSyncTimedout.Load()
578
}
579

580
func (c *Controller) informersSynced() bool {
581
	return c.namespaces.HasSynced() &&
582
		c.services.HasSynced() &&
583
		c.endpoints.slices.HasSynced() &&
584
		c.pods.pods.HasSynced() &&
585
		c.nodes.HasSynced() &&
586
		c.imports.HasSynced() &&
587
		c.exports.HasSynced() &&
588
		c.networkManager.HasSynced()
589
}
590

591
func (c *Controller) syncPods() error {
592
	var err *multierror.Error
593
	pods := c.podsClient.List(metav1.NamespaceAll, klabels.Everything())
594
	log.Debugf("initializing %d pods", len(pods))
595
	for _, s := range pods {
596
		err = multierror.Append(err, c.pods.onEvent(nil, s, model.EventAdd))
597
	}
598
	return err.ErrorOrNil()
599
}
600

601
// Run all controllers until a signal is received
602
func (c *Controller) Run(stop <-chan struct{}) {
603
	if c.opts.SyncTimeout != 0 {
604
		time.AfterFunc(c.opts.SyncTimeout, func() {
605
			if !c.queue.HasSynced() {
606
				log.Warnf("kube controller for %s initial sync timed out", c.opts.ClusterID)
607
				c.initialSyncTimedout.Store(true)
608
			}
609
		})
610
	}
611
	st := time.Now()
612

613
	go c.imports.Run(stop)
614
	go c.exports.Run(stop)
615

616
	kubelib.WaitForCacheSync("kube controller", stop, c.informersSynced)
617
	log.Infof("kube controller for %s synced after %v", c.opts.ClusterID, time.Since(st))
618
	// after the in-order sync we can start processing the queue
619
	c.queue.Run(stop)
620
	log.Infof("Controller terminated")
621
}
622

623
// Stop the controller. Only for tests, to simplify the code (defer c.Stop())
624
func (c *Controller) Stop() {
625
	if c.stop != nil {
626
		close(c.stop)
627
	}
628
}
629

630
// Services implements a service catalog operation
631
func (c *Controller) Services() []*model.Service {
632
	c.RLock()
633
	out := make([]*model.Service, 0, len(c.servicesMap))
634
	for _, svc := range c.servicesMap {
635
		out = append(out, svc)
636
	}
637
	c.RUnlock()
638
	sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
639
	return out
640
}
641

642
// GetService implements a service catalog operation by hostname specified.
643
func (c *Controller) GetService(hostname host.Name) *model.Service {
644
	c.RLock()
645
	svc := c.servicesMap[hostname]
646
	c.RUnlock()
647
	return svc
648
}
649

650
// getPodLocality retrieves the locality for a pod.
651
func (c *Controller) getPodLocality(pod *v1.Pod) string {
652
	// if pod has `istio-locality` label, skip below ops
653
	if len(pod.Labels[model.LocalityLabel]) > 0 {
654
		return model.GetLocalityLabel(pod.Labels[model.LocalityLabel])
655
	}
656

657
	// NodeName is set by the scheduler after the pod is created
658
	// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#late-initialization
659
	node := c.nodes.Get(pod.Spec.NodeName, "")
660
	if node == nil {
661
		if pod.Spec.NodeName != "" {
662
			log.Warnf("unable to get node %q for pod %q/%q", pod.Spec.NodeName, pod.Namespace, pod.Name)
663
		}
664
		return ""
665
	}
666

667
	region := getLabelValue(node.ObjectMeta, NodeRegionLabelGA, NodeRegionLabel)
668
	zone := getLabelValue(node.ObjectMeta, NodeZoneLabelGA, NodeZoneLabel)
669
	subzone := getLabelValue(node.ObjectMeta, label.TopologySubzone.Name, "")
670

671
	if region == "" && zone == "" && subzone == "" {
672
		return ""
673
	}
674

675
	return region + "/" + zone + "/" + subzone // Format: "%s/%s/%s"
676
}
677

678
func (c *Controller) serviceInstancesFromWorkloadInstances(svc *model.Service, reqSvcPort int) []*model.ServiceInstance {
679
	// Run through all the workload instances, select ones that match the service labels
680
	// only if this is a kubernetes internal service and of ClientSideLB (eds) type
681
	// as InstancesByPort is called by the aggregate controller. We dont want to include
682
	// workload instances for any other registry
683
	workloadInstancesExist := !c.workloadInstancesIndex.Empty()
684
	c.RLock()
685
	_, inRegistry := c.servicesMap[svc.Hostname]
686
	c.RUnlock()
687

688
	// Only select internal Kubernetes services with selectors
689
	if !inRegistry || !workloadInstancesExist || svc.Attributes.ServiceRegistry != provider.Kubernetes ||
690
		svc.MeshExternal || svc.Resolution != model.ClientSideLB || svc.Attributes.LabelSelectors == nil {
691
		return nil
692
	}
693

694
	selector := labels.Instance(svc.Attributes.LabelSelectors)
695

696
	// Get the service port name and target port so that we can construct the service instance
697
	k8sService := c.services.Get(svc.Attributes.Name, svc.Attributes.Namespace)
698
	// We did not find the k8s service. We cannot get the targetPort
699
	if k8sService == nil {
700
		log.Infof("serviceInstancesFromWorkloadInstances(%s.%s) failed to get k8s service",
701
			svc.Attributes.Name, svc.Attributes.Namespace)
702
		return nil
703
	}
704

705
	var servicePort *model.Port
706
	for _, p := range svc.Ports {
707
		if p.Port == reqSvcPort {
708
			servicePort = p
709
			break
710
		}
711
	}
712
	if servicePort == nil {
713
		return nil
714
	}
715

716
	// Now get the target Port for this service port
717
	targetPort := findServiceTargetPort(servicePort, k8sService)
718
	if targetPort.num == 0 {
719
		targetPort.num = servicePort.Port
720
	}
721

722
	out := make([]*model.ServiceInstance, 0)
723

724
	c.workloadInstancesIndex.ForEach(func(wi *model.WorkloadInstance) {
725
		if wi.Namespace != svc.Attributes.Namespace {
726
			return
727
		}
728
		if selector.Match(wi.Endpoint.Labels) {
729
			instance := serviceInstanceFromWorkloadInstance(svc, servicePort, targetPort, wi)
730
			if instance != nil {
731
				out = append(out, instance)
732
			}
733
		}
734
	})
735
	return out
736
}
737

738
func serviceInstanceFromWorkloadInstance(svc *model.Service, servicePort *model.Port,
739
	targetPort serviceTargetPort, wi *model.WorkloadInstance,
740
) *model.ServiceInstance {
741
	// create an instance with endpoint whose service port name matches
742
	istioEndpoint := wi.Endpoint.ShallowCopy()
743

744
	// by default, use the numbered targetPort
745
	istioEndpoint.EndpointPort = uint32(targetPort.num)
746

747
	if targetPort.name != "" {
748
		// This is a named port, find the corresponding port in the port map
749
		matchedPort := wi.PortMap[targetPort.name]
750
		if matchedPort != 0 {
751
			istioEndpoint.EndpointPort = matchedPort
752
		} else if targetPort.explicitName {
753
			// No match found, and we expect the name explicitly in the service, skip this endpoint
754
			return nil
755
		}
756
	}
757

758
	istioEndpoint.ServicePortName = servicePort.Name
759
	return &model.ServiceInstance{
760
		Service:     svc,
761
		ServicePort: servicePort,
762
		Endpoint:    istioEndpoint,
763
	}
764
}
765

766
// convenience function to collect all workload entry endpoints in updateEDS calls.
767
func (c *Controller) collectWorkloadInstanceEndpoints(svc *model.Service) []*model.IstioEndpoint {
768
	workloadInstancesExist := !c.workloadInstancesIndex.Empty()
769

770
	if !workloadInstancesExist || svc.Resolution != model.ClientSideLB || len(svc.Ports) == 0 {
771
		return nil
772
	}
773

774
	endpoints := make([]*model.IstioEndpoint, 0)
775
	for _, port := range svc.Ports {
776
		for _, instance := range c.serviceInstancesFromWorkloadInstances(svc, port.Port) {
777
			endpoints = append(endpoints, instance.Endpoint)
778
		}
779
	}
780

781
	return endpoints
782
}
783

784
// GetProxyServiceTargets returns service targets co-located with a given proxy
785
// TODO: this code does not return k8s service instances when the proxy's IP is a workload entry
786
// To tackle this, we need a ip2instance map like what we have in service entry.
787
func (c *Controller) GetProxyServiceTargets(proxy *model.Proxy) []model.ServiceTarget {
788
	if len(proxy.IPAddresses) > 0 {
789
		proxyIP := proxy.IPAddresses[0]
790
		// look up for a WorkloadEntry; if there are multiple WorkloadEntry(s)
791
		// with the same IP, choose one deterministically
792
		workload := workloadinstances.GetInstanceForProxy(c.workloadInstancesIndex, proxy, proxyIP)
793
		if workload != nil {
794
			return c.serviceInstancesFromWorkloadInstance(workload)
795
		}
796
		pod := c.pods.getPodByProxy(proxy)
797
		if pod != nil && !proxy.IsVM() {
798
			// we don't want to use this block for our test "VM" which is actually a Pod.
799

800
			if !c.isControllerForProxy(proxy) {
801
				log.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster())
802
				return nil
803
			}
804

805
			// 1. find proxy service by label selector, if not any, there may exist headless service without selector
806
			// failover to 2
807
			allServices := c.services.List(pod.Namespace, klabels.Everything())
808
			if services := getPodServices(allServices, pod); len(services) > 0 {
809
				out := make([]model.ServiceTarget, 0)
810
				for _, svc := range services {
811
					out = append(out, c.GetProxyServiceTargetsByPod(pod, svc)...)
812
				}
813
				return out
814
			}
815
			// 2. Headless service without selector
816
			return c.endpoints.GetProxyServiceTargets(proxy)
817
		}
818

819
		// 3. The pod is not present when this is called
820
		// due to eventual consistency issues. However, we have a lot of information about the pod from the proxy
821
		// metadata already. Because of this, we can still get most of the information we need.
822
		// If we cannot accurately construct ServiceEndpoints from just the metadata, this will return an error and we can
823
		// attempt to read the real pod.
824
		out, err := c.GetProxyServiceTargetsFromMetadata(proxy)
825
		if err != nil {
826
			log.Warnf("GetProxyServiceTargetsFromMetadata for %v failed: %v", proxy.ID, err)
827
		}
828
		return out
829
	}
830

831
	// TODO: This could not happen, remove?
832
	if c.opts.Metrics != nil {
833
		c.opts.Metrics.AddMetric(model.ProxyStatusNoService, proxy.ID, proxy.ID, "")
834
	} else {
835
		log.Infof("Missing metrics env, empty list of services for pod %s", proxy.ID)
836
	}
837
	return nil
838
}
839

840
func (c *Controller) serviceInstancesFromWorkloadInstance(si *model.WorkloadInstance) []model.ServiceTarget {
841
	out := make([]model.ServiceTarget, 0)
842
	// find the workload entry's service by label selector
843
	// rather than scanning through our internal map of model.services, get the services via the k8s apis
844
	dummyPod := &v1.Pod{
845
		ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
846
	}
847

848
	// find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb
849
	allServices := c.services.List(si.Namespace, klabels.Everything())
850
	if k8sServices := getPodServices(allServices, dummyPod); len(k8sServices) > 0 {
851
		for _, k8sSvc := range k8sServices {
852
			service := c.GetService(kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.opts.DomainSuffix))
853
			// Note that this cannot be an external service because k8s external services do not have label selectors.
854
			if service == nil || service.Resolution != model.ClientSideLB {
855
				// may be a headless service
856
				continue
857
			}
858

859
			for _, servicePort := range service.Ports {
860
				if servicePort.Protocol == protocol.UDP {
861
					continue
862
				}
863

864
				// Now get the target Port for this service port
865
				targetPort := findServiceTargetPort(servicePort, k8sSvc)
866
				if targetPort.num == 0 {
867
					targetPort.num = servicePort.Port
868
				}
869

870
				instance := serviceInstanceFromWorkloadInstance(service, servicePort, targetPort, si)
871
				if instance != nil {
872
					out = append(out, model.ServiceInstanceToTarget(instance))
873
				}
874
			}
875
		}
876
	}
877
	return out
878
}
879

880
// WorkloadInstanceHandler defines the handler for service instances generated by other registries
881
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
882
	c.queue.Push(func() error {
883
		c.workloadInstanceHandler(si, event)
884
		return nil
885
	})
886
}
887

888
func (c *Controller) workloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
889
	// ignore malformed workload entries. And ignore any workload entry that does not have a label
890
	// as there is no way for us to select them
891
	if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {
892
		return
893
	}
894

895
	// this is from a workload entry. Store it in separate index so that
896
	// the InstancesByPort can use these as well as the k8s pods.
897
	switch event {
898
	case model.EventDelete:
899
		c.workloadInstancesIndex.Delete(si)
900
	default: // add or update
901
		c.workloadInstancesIndex.Insert(si)
902
	}
903

904
	// find the workload entry's service by label selector
905
	// rather than scanning through our internal map of model.services, get the services via the k8s apis
906
	dummyPod := &v1.Pod{
907
		ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
908
	}
909

910
	// We got an instance update, which probably effects EDS. However, EDS is keyed by Hostname. We need to find all
911
	// Hostnames (services) that were updated and recompute them
912
	// find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb
913
	allServices := c.services.List(si.Namespace, klabels.Everything())
914
	matchedServices := getPodServices(allServices, dummyPod)
915
	matchedHostnames := slices.Map(matchedServices, func(e *v1.Service) host.Name {
916
		return kube.ServiceHostname(e.Name, e.Namespace, c.opts.DomainSuffix)
917
	})
918
	c.endpoints.pushEDS(matchedHostnames, si.Namespace)
919
}
920

921
func (c *Controller) onSystemNamespaceEvent(_, ns *v1.Namespace, ev model.Event) error {
922
	if ev == model.EventDelete {
923
		return nil
924
	}
925
	if c.setNetworkFromNamespace(ns) {
926
		// network changed, rarely happen
927
		// refresh pods/endpoints/services
928
		c.onNetworkChange()
929
	}
930
	return nil
931
}
932

933
// isControllerForProxy should be used for proxies assumed to be in the kube cluster for this controller. Workload Entries
934
// may not necessarily pass this check, but we still want to allow kube services to select workload instances.
935
func (c *Controller) isControllerForProxy(proxy *model.Proxy) bool {
936
	return proxy.Metadata.ClusterID == "" || proxy.Metadata.ClusterID == c.Cluster()
937
}
938

939
// GetProxyServiceTargetsFromMetadata retrieves ServiceTargets using proxy Metadata rather than
940
// from the Pod. This allows retrieving Instances immediately, regardless of delays in Kubernetes.
941
// If the proxy doesn't have enough metadata, an error is returned
942
func (c *Controller) GetProxyServiceTargetsFromMetadata(proxy *model.Proxy) ([]model.ServiceTarget, error) {
943
	if len(proxy.Labels) == 0 {
944
		return nil, nil
945
	}
946

947
	if !c.isControllerForProxy(proxy) {
948
		return nil, fmt.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster())
949
	}
950

951
	// Create a pod with just the information needed to find the associated Services
952
	dummyPod := &v1.Pod{
953
		ObjectMeta: metav1.ObjectMeta{
954
			Namespace: proxy.ConfigNamespace,
955
			Labels:    proxy.Labels,
956
		},
957
	}
958

959
	// Find the Service associated with the pod.
960
	allServices := c.services.List(proxy.ConfigNamespace, klabels.Everything())
961
	services := getPodServices(allServices, dummyPod)
962
	if len(services) == 0 {
963
		return nil, fmt.Errorf("no instances found for %s", proxy.ID)
964
	}
965

966
	out := make([]model.ServiceTarget, 0)
967
	for _, svc := range services {
968
		hostname := kube.ServiceHostname(svc.Name, svc.Namespace, c.opts.DomainSuffix)
969
		modelService := c.GetService(hostname)
970
		if modelService == nil {
971
			return nil, fmt.Errorf("failed to find model service for %v", hostname)
972
		}
973

974
		for _, modelService := range c.servicesForNamespacedName(config.NamespacedName(svc)) {
975
			tps := make(map[model.Port]*model.Port)
976
			tpsList := make([]model.Port, 0)
977
			for _, port := range svc.Spec.Ports {
978
				svcPort, f := modelService.Ports.Get(port.Name)
979
				if !f {
980
					return nil, fmt.Errorf("failed to get svc port for %v", port.Name)
981
				}
982

983
				var portNum int
984
				if len(proxy.Metadata.PodPorts) > 0 {
985
					var err error
986
					portNum, err = findPortFromMetadata(port, proxy.Metadata.PodPorts)
987
					if err != nil {
988
						return nil, fmt.Errorf("failed to find target port for %v: %v", proxy.ID, err)
989
					}
990
				} else {
991
					// most likely a VM - we assume the WorkloadEntry won't remap any ports
992
					portNum = port.TargetPort.IntValue()
993
				}
994

995
				// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
996
				// we will have to create only one ingress listener per port and protocol so that we do not endup
997
				// complaining about listener conflicts.
998
				targetPort := model.Port{
999
					Port:     portNum,
1000
					Protocol: svcPort.Protocol,
1001
				}
1002
				if _, exists := tps[targetPort]; !exists {
1003
					tps[targetPort] = svcPort
1004
					tpsList = append(tpsList, targetPort)
1005
				}
1006
			}
1007

1008
			// Iterate over target ports in the same order as defined in service spec, in case of
1009
			// protocol conflict for a port causes unstable protocol selection for a port.
1010
			for _, tp := range tpsList {
1011
				svcPort := tps[tp]
1012
				out = append(out, model.ServiceTarget{
1013
					Service: modelService,
1014
					Port: model.ServiceInstancePort{
1015
						ServicePort: svcPort,
1016
						TargetPort:  uint32(tp.Port),
1017
					},
1018
				})
1019
			}
1020
		}
1021
	}
1022
	return out, nil
1023
}
1024

1025
func (c *Controller) GetProxyServiceTargetsByPod(pod *v1.Pod, service *v1.Service) []model.ServiceTarget {
1026
	var out []model.ServiceTarget
1027

1028
	for _, svc := range c.servicesForNamespacedName(config.NamespacedName(service)) {
1029
		tps := make(map[model.Port]*model.Port)
1030
		tpsList := make([]model.Port, 0)
1031
		for _, port := range service.Spec.Ports {
1032
			svcPort, exists := svc.Ports.Get(port.Name)
1033
			if !exists {
1034
				continue
1035
			}
1036
			// find target port
1037
			portNum, err := FindPort(pod, &port)
1038
			if err != nil {
1039
				log.Warnf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
1040
				continue
1041
			}
1042
			// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
1043
			// we will have to create only one ingress listener per port and protocol so that we do not endup
1044
			// complaining about listener conflicts.
1045
			targetPort := model.Port{
1046
				Port:     portNum,
1047
				Protocol: svcPort.Protocol,
1048
			}
1049
			if _, exists := tps[targetPort]; !exists {
1050
				tps[targetPort] = svcPort
1051
				tpsList = append(tpsList, targetPort)
1052
			}
1053
		}
1054
		// Iterate over target ports in the same order as defined in service spec, in case of
1055
		// protocol conflict for a port causes unstable protocol selection for a port.
1056
		for _, tp := range tpsList {
1057
			svcPort := tps[tp]
1058
			out = append(out, model.ServiceTarget{
1059
				Service: svc,
1060
				Port: model.ServiceInstancePort{
1061
					ServicePort: svcPort,
1062
					TargetPort:  uint32(tp.Port),
1063
				},
1064
			})
1065
		}
1066
	}
1067

1068
	return out
1069
}
1070

1071
func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
1072
	pod := c.pods.getPodByProxy(proxy)
1073
	if pod != nil {
1074
		var locality, nodeName string
1075
		locality = c.getPodLocality(pod)
1076
		if len(proxy.GetNodeName()) == 0 {
1077
			// this can happen for an "old" proxy with no `Metadata.NodeName` set
1078
			// in this case we set the node name in labels on the fly
1079
			// TODO: remove this when 1.16 is EOL?
1080
			nodeName = pod.Spec.NodeName
1081
		}
1082
		if len(locality) == 0 && len(nodeName) == 0 {
1083
			return pod.Labels
1084
		}
1085
		return labelutil.AugmentLabels(pod.Labels, c.clusterID, locality, nodeName, c.network)
1086
	}
1087
	return nil
1088
}
1089

1090
// AppendServiceHandler implements a service catalog operation
1091
func (c *Controller) AppendServiceHandler(f model.ServiceHandler) {
1092
	c.handlers.AppendServiceHandler(f)
1093
}
1094

1095
// AppendWorkloadHandler implements a service catalog operation
1096
func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) {
1097
	c.handlers.AppendWorkloadHandler(f)
1098
}
1099

1100
// AppendNamespaceDiscoveryHandlers register handlers on namespace selected/deselected by discovery selectors change.
1101
func (c *Controller) AppendNamespaceDiscoveryHandlers(f func(string, model.Event)) {
1102
	c.namespaceDiscoveryHandlers = append(c.namespaceDiscoveryHandlers, f)
1103
}
1104

1105
// AppendCrdHandlers register handlers on crd event.
1106
func (c *Controller) AppendCrdHandlers(f func(name string)) {
1107
	c.crdHandlers = append(c.crdHandlers, f)
1108
}
1109

1110
// hostNamesForNamespacedName returns all possible hostnames for the given service name.
1111
// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
1112
// hostname as well as the MCS hostname (clusterset.local). Otherwise, only the regular
1113
// hostname will be returned.
1114
func (c *Controller) hostNamesForNamespacedName(name types.NamespacedName) []host.Name {
1115
	if features.EnableMCSHost {
1116
		return []host.Name{
1117
			kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
1118
			serviceClusterSetLocalHostname(name),
1119
		}
1120
	}
1121
	return []host.Name{
1122
		kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
1123
	}
1124
}
1125

1126
// servicesForNamespacedName returns all services for the given service name.
1127
// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
1128
// service as well as the MCS service (clusterset.local), if available. Otherwise,
1129
// only the regular service will be returned.
1130
func (c *Controller) servicesForNamespacedName(name types.NamespacedName) []*model.Service {
1131
	if features.EnableMCSHost {
1132
		out := make([]*model.Service, 0, 2)
1133

1134
		c.RLock()
1135
		if svc := c.servicesMap[kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)]; svc != nil {
1136
			out = append(out, svc)
1137
		}
1138

1139
		if svc := c.servicesMap[serviceClusterSetLocalHostname(name)]; svc != nil {
1140
			out = append(out, svc)
1141
		}
1142
		c.RUnlock()
1143

1144
		return out
1145
	}
1146
	if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)); svc != nil {
1147
		return []*model.Service{svc}
1148
	}
1149
	return nil
1150
}
1151

1152
func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv *model.Service) bool {
1153
	if !features.EnableOptimizedServicePush {
1154
		return true
1155
	}
1156
	if preConv == nil {
1157
		return !currConv.Attributes.ExportTo.Contains(visibility.None)
1158
	}
1159
	// if service are not exported, no need to push
1160
	if preConv.Attributes.ExportTo.Contains(visibility.None) &&
1161
		currConv.Attributes.ExportTo.Contains(visibility.None) {
1162
		return false
1163
	}
1164
	// Check if there are any changes we care about by comparing `model.Service`s
1165
	if !preConv.Equals(currConv) {
1166
		return true
1167
	}
1168
	// Also check if target ports are changed since they are not included in `model.Service`
1169
	// `preConv.Equals(currConv)` already makes sure the length of ports is not changed
1170
	if prev != nil && curr != nil {
1171
		if !slices.EqualFunc(prev.Spec.Ports, curr.Spec.Ports, func(a, b v1.ServicePort) bool {
1172
			return a.TargetPort == b.TargetPort
1173
		}) {
1174
			return true
1175
		}
1176
	}
1177
	return false
1178
}
1179

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.