istio
1178 строк · 39.9 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"fmt"
19"sort"
20"sync"
21"time"
22
23"github.com/hashicorp/go-multierror"
24"go.uber.org/atomic"
25v1 "k8s.io/api/core/v1"
26metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27klabels "k8s.io/apimachinery/pkg/labels"
28"k8s.io/apimachinery/pkg/types"
29
30"istio.io/api/label"
31"istio.io/istio/pilot/pkg/features"
32"istio.io/istio/pilot/pkg/model"
33"istio.io/istio/pilot/pkg/serviceregistry"
34"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
35"istio.io/istio/pilot/pkg/serviceregistry/kube"
36"istio.io/istio/pilot/pkg/serviceregistry/kube/controller/ambient"
37"istio.io/istio/pilot/pkg/serviceregistry/provider"
38labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
39"istio.io/istio/pilot/pkg/serviceregistry/util/workloadinstances"
40"istio.io/istio/pkg/cluster"
41"istio.io/istio/pkg/config"
42"istio.io/istio/pkg/config/host"
43"istio.io/istio/pkg/config/labels"
44"istio.io/istio/pkg/config/mesh"
45"istio.io/istio/pkg/config/protocol"
46"istio.io/istio/pkg/config/visibility"
47kubelib "istio.io/istio/pkg/kube"
48"istio.io/istio/pkg/kube/controllers"
49"istio.io/istio/pkg/kube/kclient"
50istiolog "istio.io/istio/pkg/log"
51"istio.io/istio/pkg/maps"
52"istio.io/istio/pkg/monitoring"
53"istio.io/istio/pkg/network"
54"istio.io/istio/pkg/ptr"
55"istio.io/istio/pkg/queue"
56"istio.io/istio/pkg/slices"
57)
58
59const (
60// NodeRegionLabel is the well-known label for kubernetes node region in beta
61NodeRegionLabel = v1.LabelFailureDomainBetaRegion
62// NodeZoneLabel is the well-known label for kubernetes node zone in beta
63NodeZoneLabel = v1.LabelFailureDomainBetaZone
64// NodeRegionLabelGA is the well-known label for kubernetes node region in ga
65NodeRegionLabelGA = v1.LabelTopologyRegion
66// NodeZoneLabelGA is the well-known label for kubernetes node zone in ga
67NodeZoneLabelGA = v1.LabelTopologyZone
68
69// DefaultNetworkGatewayPort is the port used by default for cross-network traffic if not otherwise specified
70// by meshNetworks or "networking.istio.io/gatewayPort"
71DefaultNetworkGatewayPort = 15443
72)
73
74var log = istiolog.RegisterScope("kube", "kubernetes service registry controller")
75
76var (
77typeTag = monitoring.CreateLabel("type")
78eventTag = monitoring.CreateLabel("event")
79
80k8sEvents = monitoring.NewSum(
81"pilot_k8s_reg_events",
82"Events from k8s registry.",
83)
84
85// nolint: gocritic
86// This is deprecated in favor of `pilot_k8s_endpoints_pending_pod`, which is a gauge indicating the number of
87// currently missing pods. This helps distinguish transient errors from permanent ones
88endpointsWithNoPods = monitoring.NewSum(
89"pilot_k8s_endpoints_with_no_pods",
90"Endpoints that does not have any corresponding pods.")
91
92endpointsPendingPodUpdate = monitoring.NewGauge(
93"pilot_k8s_endpoints_pending_pod",
94"Number of endpoints that do not currently have any corresponding pods.",
95)
96)
97
98func incrementEvent(kind, event string) {
99if kind == "" || event == "" {
100return
101}
102k8sEvents.With(typeTag.Value(kind), eventTag.Value(event)).Increment()
103}
104
105// Options stores the configurable attributes of a Controller.
106type Options struct {
107SystemNamespace string
108
109// MeshServiceController is a mesh-wide service Controller.
110MeshServiceController *aggregate.Controller
111
112DomainSuffix string
113
114// ClusterID identifies the cluster which the controller communicate with.
115ClusterID cluster.ID
116
117// ClusterAliases are alias names for cluster. When a proxy connects with a cluster ID
118// and if it has a different alias we should use that a cluster ID for proxy.
119ClusterAliases map[string]string
120
121// Metrics for capturing node-based metrics.
122Metrics model.Metrics
123
124// XDSUpdater will push changes to the xDS server.
125XDSUpdater model.XDSUpdater
126
127// MeshNetworksWatcher observes changes to the mesh networks config.
128MeshNetworksWatcher mesh.NetworksWatcher
129
130// MeshWatcher observes changes to the mesh config
131MeshWatcher mesh.Watcher
132
133// Maximum QPS when communicating with kubernetes API
134KubernetesAPIQPS float32
135
136// Maximum burst for throttle when communicating with the kubernetes API
137KubernetesAPIBurst int
138
139// SyncTimeout, if set, causes HasSynced to be returned when timeout.
140SyncTimeout time.Duration
141
142// Revision of this Istiod instance
143Revision string
144
145ConfigCluster bool
146}
147
148// kubernetesNode represents a kubernetes node that is reachable externally
149type kubernetesNode struct {
150address string
151labels labels.Instance
152}
153
154// controllerInterface is a simplified interface for the Controller used for testing.
155type controllerInterface interface {
156getPodLocality(pod *v1.Pod) string
157Network(endpointIP string, labels labels.Instance) network.ID
158Cluster() cluster.ID
159}
160
161var (
162_ controllerInterface = &Controller{}
163_ serviceregistry.Instance = &Controller{}
164)
165
166type ambientIndex = ambient.Index
167
168// Controller is a collection of synchronized resource watchers
169// Caches are thread-safe
170type Controller struct {
171opts Options
172
173client kubelib.Client
174
175queue queue.Instance
176
177namespaces kclient.Client[*v1.Namespace]
178services kclient.Client[*v1.Service]
179
180endpoints *endpointSliceController
181
182// Used to watch node accessible from remote cluster.
183// In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type.
184// With this, we can populate mesh's gateway address with the node ips.
185nodes kclient.Client[*v1.Node]
186
187exports serviceExportCache
188imports serviceImportCache
189pods *PodCache
190
191crdHandlers []func(name string)
192handlers model.ControllerHandlers
193namespaceDiscoveryHandlers []func(ns string, event model.Event)
194
195// This is only used for test
196stop chan struct{}
197
198sync.RWMutex
199// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
200servicesMap map[host.Name]*model.Service
201// nodeSelectorsForServices stores hostname => label selectors that can be used to
202// refine the set of node port IPs for a service.
203nodeSelectorsForServices map[host.Name]labels.Instance
204// map of node name and its address+labels - this is the only thing we need from nodes
205// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
206// we run through the label selectors here to pick only ones that we need.
207// Only nodes with ExternalIP addresses are included in this map !
208nodeInfoMap map[string]kubernetesNode
209// index over workload instances from workload entries
210workloadInstancesIndex workloadinstances.Index
211
212*networkManager
213
214ambientIndex
215
216// initialSyncTimedout is set to true after performing an initial processing timed out.
217initialSyncTimedout *atomic.Bool
218meshWatcher mesh.Watcher
219
220podsClient kclient.Client[*v1.Pod]
221
222configCluster bool
223
224networksHandlerRegistration *mesh.WatcherHandlerRegistration
225meshHandlerRegistration *mesh.WatcherHandlerRegistration
226}
227
228// NewController creates a new Kubernetes controller
229// Created by bootstrap and multicluster (see multicluster.Controller).
230func NewController(kubeClient kubelib.Client, options Options) *Controller {
231c := &Controller{
232opts: options,
233client: kubeClient,
234queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
235servicesMap: make(map[host.Name]*model.Service),
236nodeSelectorsForServices: make(map[host.Name]labels.Instance),
237nodeInfoMap: make(map[string]kubernetesNode),
238workloadInstancesIndex: workloadinstances.NewIndex(),
239initialSyncTimedout: atomic.NewBool(false),
240
241configCluster: options.ConfigCluster,
242}
243c.networkManager = initNetworkManager(c, options)
244
245c.namespaces = kclient.NewFiltered[*v1.Namespace](kubeClient, kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
246
247if c.opts.SystemNamespace != "" {
248registerHandlers[*v1.Namespace](
249c,
250c.namespaces,
251"Namespaces",
252func(old *v1.Namespace, cur *v1.Namespace, event model.Event) error {
253if cur.Name == c.opts.SystemNamespace {
254return c.onSystemNamespaceEvent(old, cur, event)
255}
256return nil
257},
258nil,
259)
260}
261
262c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()})
263
264registerHandlers[*v1.Service](c, c.services, "Services", c.onServiceEvent, nil)
265
266c.endpoints = newEndpointSliceController(c)
267
268// This is for getting the node IPs of a selected set of nodes
269c.nodes = kclient.NewFiltered[*v1.Node](kubeClient, kclient.Filter{ObjectTransform: kubelib.StripNodeUnusedFields})
270registerHandlers[*v1.Node](c, c.nodes, "Nodes", c.onNodeEvent, nil)
271
272c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
273ObjectFilter: kubeClient.ObjectFilter(),
274ObjectTransform: kubelib.StripPodUnusedFields,
275})
276c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
277c.queue.Push(func() error {
278return c.endpoints.podArrived(key.Name, key.Namespace)
279})
280})
281registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, c.pods.labelFilter)
282
283if features.EnableAmbientControllers {
284c.ambientIndex = ambient.New(ambient.Options{
285Client: kubeClient,
286SystemNamespace: options.SystemNamespace,
287DomainSuffix: options.DomainSuffix,
288ClusterID: options.ClusterID,
289Revision: options.Revision,
290XDSUpdater: options.XDSUpdater,
291LookupNetwork: c.Network,
292})
293}
294c.exports = newServiceExportCache(c)
295c.imports = newServiceImportCache(c)
296
297c.meshWatcher = options.MeshWatcher
298if c.opts.MeshNetworksWatcher != nil {
299c.networksHandlerRegistration = c.opts.MeshNetworksWatcher.AddNetworksHandler(func() {
300c.reloadMeshNetworks()
301c.onNetworkChange()
302})
303c.reloadMeshNetworks()
304}
305return c
306}
307
308func (c *Controller) Provider() provider.ID {
309return provider.Kubernetes
310}
311
312func (c *Controller) Cluster() cluster.ID {
313return c.opts.ClusterID
314}
315
316func (c *Controller) MCSServices() []model.MCSServiceInfo {
317outMap := make(map[types.NamespacedName]model.MCSServiceInfo)
318
319// Add the ServiceExport info.
320for _, se := range c.exports.ExportedServices() {
321mcsService := outMap[se.namespacedName]
322mcsService.Cluster = c.Cluster()
323mcsService.Name = se.namespacedName.Name
324mcsService.Namespace = se.namespacedName.Namespace
325mcsService.Exported = true
326mcsService.Discoverability = se.discoverability
327outMap[se.namespacedName] = mcsService
328}
329
330// Add the ServiceImport info.
331for _, si := range c.imports.ImportedServices() {
332mcsService := outMap[si.namespacedName]
333mcsService.Cluster = c.Cluster()
334mcsService.Name = si.namespacedName.Name
335mcsService.Namespace = si.namespacedName.Namespace
336mcsService.Imported = true
337mcsService.ClusterSetVIP = si.clusterSetVIP
338outMap[si.namespacedName] = mcsService
339}
340
341return maps.Values(outMap)
342}
343
344func (c *Controller) Network(endpointIP string, labels labels.Instance) network.ID {
345// 1. check the pod/workloadEntry label
346if nw := labels[label.TopologyNetwork.Name]; nw != "" {
347return network.ID(nw)
348}
349
350// 2. check the system namespace labels
351if nw := c.networkFromSystemNamespace(); nw != "" {
352return nw
353}
354
355// 3. check the meshNetworks config
356if nw := c.networkFromMeshNetworks(endpointIP); nw != "" {
357return nw
358}
359
360return ""
361}
362
363func (c *Controller) Cleanup() error {
364if err := queue.WaitForClose(c.queue, 30*time.Second); err != nil {
365log.Warnf("queue for removed kube registry %q may not be done processing: %v", c.Cluster(), err)
366}
367if c.opts.XDSUpdater != nil {
368c.opts.XDSUpdater.RemoveShard(model.ShardKeyFromRegistry(c))
369}
370
371// Unregister networks handler
372if c.networksHandlerRegistration != nil {
373c.opts.MeshNetworksWatcher.DeleteNetworksHandler(c.networksHandlerRegistration)
374}
375
376// Unregister mesh handler
377if c.meshHandlerRegistration != nil {
378c.opts.MeshWatcher.DeleteMeshHandler(c.meshHandlerRegistration)
379}
380
381return nil
382}
383
384func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) error {
385log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)
386
387// Create the standard (cluster.local) service.
388svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster())
389
390switch event {
391case model.EventDelete:
392c.deleteService(svcConv)
393default:
394c.addOrUpdateService(pre, curr, svcConv, event, false)
395}
396
397return nil
398}
399
400func (c *Controller) deleteService(svc *model.Service) {
401c.Lock()
402delete(c.servicesMap, svc.Hostname)
403delete(c.nodeSelectorsForServices, svc.Hostname)
404_, isNetworkGateway := c.networkGatewaysBySvc[svc.Hostname]
405delete(c.networkGatewaysBySvc, svc.Hostname)
406c.Unlock()
407
408if isNetworkGateway {
409c.NotifyGatewayHandlers()
410// TODO trigger push via handler
411// networks are different, we need to update all eds endpoints
412c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
413}
414
415shard := model.ShardKeyFromRegistry(c)
416event := model.EventDelete
417c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, event)
418
419c.handlers.NotifyServiceHandlers(nil, svc, event)
420}
421
422func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv *model.Service, event model.Event, updateEDSCache bool) {
423needsFullPush := false
424// First, process nodePort gateway service, whose externalIPs specified
425// and loadbalancer gateway service
426if currConv.Attributes.ClusterExternalAddresses.Len() > 0 {
427needsFullPush = c.extractGatewaysFromService(currConv)
428} else if isNodePortGatewayService(curr) {
429// We need to know which services are using node selectors because during node events,
430// we have to update all the node port services accordingly.
431nodeSelector := getNodeSelectorsForService(curr)
432c.Lock()
433// only add when it is nodePort gateway service
434c.nodeSelectorsForServices[currConv.Hostname] = nodeSelector
435c.Unlock()
436needsFullPush = c.updateServiceNodePortAddresses(currConv)
437}
438
439// For ExternalName, we need to update the EndpointIndex, as we will store endpoints just based on the Service.
440if !features.EnableExternalNameAlias && curr != nil && curr.Spec.Type == v1.ServiceTypeExternalName {
441updateEDSCache = true
442}
443
444c.Lock()
445prevConv := c.servicesMap[currConv.Hostname]
446c.servicesMap[currConv.Hostname] = currConv
447c.Unlock()
448// This full push needed to update ALL ends endpoints, even though we do a full push on service add/update
449// as that full push is only triggered for the specific service.
450if needsFullPush {
451// networks are different, we need to update all eds endpoints
452c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
453}
454
455shard := model.ShardKeyFromRegistry(c)
456ns := currConv.Attributes.Namespace
457// We also need to update when the Service changes. For Kubernetes, a service change will result in Endpoint updates,
458// but workload entries will also need to be updated.
459// TODO(nmittler): Build different sets of endpoints for cluster.local and clusterset.local.
460if updateEDSCache || features.EnableK8SServiceSelectWorkloadEntries {
461endpoints := c.buildEndpointsForService(currConv, updateEDSCache)
462if len(endpoints) > 0 {
463c.opts.XDSUpdater.EDSCacheUpdate(shard, string(currConv.Hostname), ns, endpoints)
464}
465}
466
467// filter out same service event
468if event == model.EventUpdate && !serviceUpdateNeedsPush(pre, curr, prevConv, currConv) {
469return
470}
471
472c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
473c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
474}
475
476func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache bool) []*model.IstioEndpoint {
477endpoints := c.endpoints.buildIstioEndpointsWithService(svc.Attributes.Name, svc.Attributes.Namespace, svc.Hostname, updateCache)
478if features.EnableK8SServiceSelectWorkloadEntries {
479fep := c.collectWorkloadInstanceEndpoints(svc)
480endpoints = append(endpoints, fep...)
481}
482if !features.EnableExternalNameAlias {
483endpoints = append(endpoints, kube.ExternalNameEndpoints(svc)...)
484}
485return endpoints
486}
487
488func (c *Controller) onNodeEvent(_, node *v1.Node, event model.Event) error {
489var updatedNeeded bool
490if event == model.EventDelete {
491updatedNeeded = true
492c.Lock()
493delete(c.nodeInfoMap, node.Name)
494c.Unlock()
495} else {
496k8sNode := kubernetesNode{labels: node.Labels}
497for _, address := range node.Status.Addresses {
498if address.Type == v1.NodeExternalIP && address.Address != "" {
499k8sNode.address = address.Address
500break
501}
502}
503if k8sNode.address == "" {
504return nil
505}
506
507c.Lock()
508// check if the node exists as this add event could be due to controller resync
509// if the stored object changes, then fire an update event. Otherwise, ignore this event.
510currentNode, exists := c.nodeInfoMap[node.Name]
511if !exists || !nodeEquals(currentNode, k8sNode) {
512c.nodeInfoMap[node.Name] = k8sNode
513updatedNeeded = true
514}
515c.Unlock()
516}
517
518// update all related services
519if updatedNeeded && c.updateServiceNodePortAddresses() {
520c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
521Full: true,
522Reason: model.NewReasonStats(model.ServiceUpdate),
523})
524}
525return nil
526}
527
528// FilterOutFunc func for filtering out objects during update callback
529type FilterOutFunc[T controllers.Object] func(old, cur T) bool
530
531// registerHandlers registers a handler for a given informer
532// Note: `otype` is used for metric, if empty, no metric will be reported
533func registerHandlers[T controllers.ComparableObject](c *Controller,
534informer kclient.Informer[T], otype string,
535handler func(T, T, model.Event) error, filter FilterOutFunc[T],
536) {
537wrappedHandler := func(prev, curr T, event model.Event) error {
538curr = informer.Get(curr.GetName(), curr.GetNamespace())
539if controllers.IsNil(curr) {
540// this can happen when an immediate delete after update
541// the delete event can be handled later
542return nil
543}
544return handler(prev, curr, event)
545}
546informer.AddEventHandler(
547controllers.EventHandler[T]{
548AddFunc: func(obj T) {
549incrementEvent(otype, "add")
550c.queue.Push(func() error {
551return wrappedHandler(ptr.Empty[T](), obj, model.EventAdd)
552})
553},
554UpdateFunc: func(old, cur T) {
555if filter != nil {
556if filter(old, cur) {
557incrementEvent(otype, "updatesame")
558return
559}
560}
561incrementEvent(otype, "update")
562c.queue.Push(func() error {
563return wrappedHandler(old, cur, model.EventUpdate)
564})
565},
566DeleteFunc: func(obj T) {
567incrementEvent(otype, "delete")
568c.queue.Push(func() error {
569return handler(ptr.Empty[T](), obj, model.EventDelete)
570})
571},
572})
573}
574
575// HasSynced returns true after the initial state synchronization
576func (c *Controller) HasSynced() bool {
577return c.queue.HasSynced() || c.initialSyncTimedout.Load()
578}
579
580func (c *Controller) informersSynced() bool {
581return c.namespaces.HasSynced() &&
582c.services.HasSynced() &&
583c.endpoints.slices.HasSynced() &&
584c.pods.pods.HasSynced() &&
585c.nodes.HasSynced() &&
586c.imports.HasSynced() &&
587c.exports.HasSynced() &&
588c.networkManager.HasSynced()
589}
590
591func (c *Controller) syncPods() error {
592var err *multierror.Error
593pods := c.podsClient.List(metav1.NamespaceAll, klabels.Everything())
594log.Debugf("initializing %d pods", len(pods))
595for _, s := range pods {
596err = multierror.Append(err, c.pods.onEvent(nil, s, model.EventAdd))
597}
598return err.ErrorOrNil()
599}
600
601// Run all controllers until a signal is received
602func (c *Controller) Run(stop <-chan struct{}) {
603if c.opts.SyncTimeout != 0 {
604time.AfterFunc(c.opts.SyncTimeout, func() {
605if !c.queue.HasSynced() {
606log.Warnf("kube controller for %s initial sync timed out", c.opts.ClusterID)
607c.initialSyncTimedout.Store(true)
608}
609})
610}
611st := time.Now()
612
613go c.imports.Run(stop)
614go c.exports.Run(stop)
615
616kubelib.WaitForCacheSync("kube controller", stop, c.informersSynced)
617log.Infof("kube controller for %s synced after %v", c.opts.ClusterID, time.Since(st))
618// after the in-order sync we can start processing the queue
619c.queue.Run(stop)
620log.Infof("Controller terminated")
621}
622
623// Stop the controller. Only for tests, to simplify the code (defer c.Stop())
624func (c *Controller) Stop() {
625if c.stop != nil {
626close(c.stop)
627}
628}
629
630// Services implements a service catalog operation
631func (c *Controller) Services() []*model.Service {
632c.RLock()
633out := make([]*model.Service, 0, len(c.servicesMap))
634for _, svc := range c.servicesMap {
635out = append(out, svc)
636}
637c.RUnlock()
638sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
639return out
640}
641
642// GetService implements a service catalog operation by hostname specified.
643func (c *Controller) GetService(hostname host.Name) *model.Service {
644c.RLock()
645svc := c.servicesMap[hostname]
646c.RUnlock()
647return svc
648}
649
650// getPodLocality retrieves the locality for a pod.
651func (c *Controller) getPodLocality(pod *v1.Pod) string {
652// if pod has `istio-locality` label, skip below ops
653if len(pod.Labels[model.LocalityLabel]) > 0 {
654return model.GetLocalityLabel(pod.Labels[model.LocalityLabel])
655}
656
657// NodeName is set by the scheduler after the pod is created
658// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#late-initialization
659node := c.nodes.Get(pod.Spec.NodeName, "")
660if node == nil {
661if pod.Spec.NodeName != "" {
662log.Warnf("unable to get node %q for pod %q/%q", pod.Spec.NodeName, pod.Namespace, pod.Name)
663}
664return ""
665}
666
667region := getLabelValue(node.ObjectMeta, NodeRegionLabelGA, NodeRegionLabel)
668zone := getLabelValue(node.ObjectMeta, NodeZoneLabelGA, NodeZoneLabel)
669subzone := getLabelValue(node.ObjectMeta, label.TopologySubzone.Name, "")
670
671if region == "" && zone == "" && subzone == "" {
672return ""
673}
674
675return region + "/" + zone + "/" + subzone // Format: "%s/%s/%s"
676}
677
678func (c *Controller) serviceInstancesFromWorkloadInstances(svc *model.Service, reqSvcPort int) []*model.ServiceInstance {
679// Run through all the workload instances, select ones that match the service labels
680// only if this is a kubernetes internal service and of ClientSideLB (eds) type
681// as InstancesByPort is called by the aggregate controller. We dont want to include
682// workload instances for any other registry
683workloadInstancesExist := !c.workloadInstancesIndex.Empty()
684c.RLock()
685_, inRegistry := c.servicesMap[svc.Hostname]
686c.RUnlock()
687
688// Only select internal Kubernetes services with selectors
689if !inRegistry || !workloadInstancesExist || svc.Attributes.ServiceRegistry != provider.Kubernetes ||
690svc.MeshExternal || svc.Resolution != model.ClientSideLB || svc.Attributes.LabelSelectors == nil {
691return nil
692}
693
694selector := labels.Instance(svc.Attributes.LabelSelectors)
695
696// Get the service port name and target port so that we can construct the service instance
697k8sService := c.services.Get(svc.Attributes.Name, svc.Attributes.Namespace)
698// We did not find the k8s service. We cannot get the targetPort
699if k8sService == nil {
700log.Infof("serviceInstancesFromWorkloadInstances(%s.%s) failed to get k8s service",
701svc.Attributes.Name, svc.Attributes.Namespace)
702return nil
703}
704
705var servicePort *model.Port
706for _, p := range svc.Ports {
707if p.Port == reqSvcPort {
708servicePort = p
709break
710}
711}
712if servicePort == nil {
713return nil
714}
715
716// Now get the target Port for this service port
717targetPort := findServiceTargetPort(servicePort, k8sService)
718if targetPort.num == 0 {
719targetPort.num = servicePort.Port
720}
721
722out := make([]*model.ServiceInstance, 0)
723
724c.workloadInstancesIndex.ForEach(func(wi *model.WorkloadInstance) {
725if wi.Namespace != svc.Attributes.Namespace {
726return
727}
728if selector.Match(wi.Endpoint.Labels) {
729instance := serviceInstanceFromWorkloadInstance(svc, servicePort, targetPort, wi)
730if instance != nil {
731out = append(out, instance)
732}
733}
734})
735return out
736}
737
738func serviceInstanceFromWorkloadInstance(svc *model.Service, servicePort *model.Port,
739targetPort serviceTargetPort, wi *model.WorkloadInstance,
740) *model.ServiceInstance {
741// create an instance with endpoint whose service port name matches
742istioEndpoint := wi.Endpoint.ShallowCopy()
743
744// by default, use the numbered targetPort
745istioEndpoint.EndpointPort = uint32(targetPort.num)
746
747if targetPort.name != "" {
748// This is a named port, find the corresponding port in the port map
749matchedPort := wi.PortMap[targetPort.name]
750if matchedPort != 0 {
751istioEndpoint.EndpointPort = matchedPort
752} else if targetPort.explicitName {
753// No match found, and we expect the name explicitly in the service, skip this endpoint
754return nil
755}
756}
757
758istioEndpoint.ServicePortName = servicePort.Name
759return &model.ServiceInstance{
760Service: svc,
761ServicePort: servicePort,
762Endpoint: istioEndpoint,
763}
764}
765
766// convenience function to collect all workload entry endpoints in updateEDS calls.
767func (c *Controller) collectWorkloadInstanceEndpoints(svc *model.Service) []*model.IstioEndpoint {
768workloadInstancesExist := !c.workloadInstancesIndex.Empty()
769
770if !workloadInstancesExist || svc.Resolution != model.ClientSideLB || len(svc.Ports) == 0 {
771return nil
772}
773
774endpoints := make([]*model.IstioEndpoint, 0)
775for _, port := range svc.Ports {
776for _, instance := range c.serviceInstancesFromWorkloadInstances(svc, port.Port) {
777endpoints = append(endpoints, instance.Endpoint)
778}
779}
780
781return endpoints
782}
783
784// GetProxyServiceTargets returns service targets co-located with a given proxy
785// TODO: this code does not return k8s service instances when the proxy's IP is a workload entry
786// To tackle this, we need a ip2instance map like what we have in service entry.
787func (c *Controller) GetProxyServiceTargets(proxy *model.Proxy) []model.ServiceTarget {
788if len(proxy.IPAddresses) > 0 {
789proxyIP := proxy.IPAddresses[0]
790// look up for a WorkloadEntry; if there are multiple WorkloadEntry(s)
791// with the same IP, choose one deterministically
792workload := workloadinstances.GetInstanceForProxy(c.workloadInstancesIndex, proxy, proxyIP)
793if workload != nil {
794return c.serviceInstancesFromWorkloadInstance(workload)
795}
796pod := c.pods.getPodByProxy(proxy)
797if pod != nil && !proxy.IsVM() {
798// we don't want to use this block for our test "VM" which is actually a Pod.
799
800if !c.isControllerForProxy(proxy) {
801log.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster())
802return nil
803}
804
805// 1. find proxy service by label selector, if not any, there may exist headless service without selector
806// failover to 2
807allServices := c.services.List(pod.Namespace, klabels.Everything())
808if services := getPodServices(allServices, pod); len(services) > 0 {
809out := make([]model.ServiceTarget, 0)
810for _, svc := range services {
811out = append(out, c.GetProxyServiceTargetsByPod(pod, svc)...)
812}
813return out
814}
815// 2. Headless service without selector
816return c.endpoints.GetProxyServiceTargets(proxy)
817}
818
819// 3. The pod is not present when this is called
820// due to eventual consistency issues. However, we have a lot of information about the pod from the proxy
821// metadata already. Because of this, we can still get most of the information we need.
822// If we cannot accurately construct ServiceEndpoints from just the metadata, this will return an error and we can
823// attempt to read the real pod.
824out, err := c.GetProxyServiceTargetsFromMetadata(proxy)
825if err != nil {
826log.Warnf("GetProxyServiceTargetsFromMetadata for %v failed: %v", proxy.ID, err)
827}
828return out
829}
830
831// TODO: This could not happen, remove?
832if c.opts.Metrics != nil {
833c.opts.Metrics.AddMetric(model.ProxyStatusNoService, proxy.ID, proxy.ID, "")
834} else {
835log.Infof("Missing metrics env, empty list of services for pod %s", proxy.ID)
836}
837return nil
838}
839
840func (c *Controller) serviceInstancesFromWorkloadInstance(si *model.WorkloadInstance) []model.ServiceTarget {
841out := make([]model.ServiceTarget, 0)
842// find the workload entry's service by label selector
843// rather than scanning through our internal map of model.services, get the services via the k8s apis
844dummyPod := &v1.Pod{
845ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
846}
847
848// find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb
849allServices := c.services.List(si.Namespace, klabels.Everything())
850if k8sServices := getPodServices(allServices, dummyPod); len(k8sServices) > 0 {
851for _, k8sSvc := range k8sServices {
852service := c.GetService(kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.opts.DomainSuffix))
853// Note that this cannot be an external service because k8s external services do not have label selectors.
854if service == nil || service.Resolution != model.ClientSideLB {
855// may be a headless service
856continue
857}
858
859for _, servicePort := range service.Ports {
860if servicePort.Protocol == protocol.UDP {
861continue
862}
863
864// Now get the target Port for this service port
865targetPort := findServiceTargetPort(servicePort, k8sSvc)
866if targetPort.num == 0 {
867targetPort.num = servicePort.Port
868}
869
870instance := serviceInstanceFromWorkloadInstance(service, servicePort, targetPort, si)
871if instance != nil {
872out = append(out, model.ServiceInstanceToTarget(instance))
873}
874}
875}
876}
877return out
878}
879
880// WorkloadInstanceHandler defines the handler for service instances generated by other registries
881func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
882c.queue.Push(func() error {
883c.workloadInstanceHandler(si, event)
884return nil
885})
886}
887
888func (c *Controller) workloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
889// ignore malformed workload entries. And ignore any workload entry that does not have a label
890// as there is no way for us to select them
891if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {
892return
893}
894
895// this is from a workload entry. Store it in separate index so that
896// the InstancesByPort can use these as well as the k8s pods.
897switch event {
898case model.EventDelete:
899c.workloadInstancesIndex.Delete(si)
900default: // add or update
901c.workloadInstancesIndex.Insert(si)
902}
903
904// find the workload entry's service by label selector
905// rather than scanning through our internal map of model.services, get the services via the k8s apis
906dummyPod := &v1.Pod{
907ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
908}
909
910// We got an instance update, which probably effects EDS. However, EDS is keyed by Hostname. We need to find all
911// Hostnames (services) that were updated and recompute them
912// find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb
913allServices := c.services.List(si.Namespace, klabels.Everything())
914matchedServices := getPodServices(allServices, dummyPod)
915matchedHostnames := slices.Map(matchedServices, func(e *v1.Service) host.Name {
916return kube.ServiceHostname(e.Name, e.Namespace, c.opts.DomainSuffix)
917})
918c.endpoints.pushEDS(matchedHostnames, si.Namespace)
919}
920
921func (c *Controller) onSystemNamespaceEvent(_, ns *v1.Namespace, ev model.Event) error {
922if ev == model.EventDelete {
923return nil
924}
925if c.setNetworkFromNamespace(ns) {
926// network changed, rarely happen
927// refresh pods/endpoints/services
928c.onNetworkChange()
929}
930return nil
931}
932
933// isControllerForProxy should be used for proxies assumed to be in the kube cluster for this controller. Workload Entries
934// may not necessarily pass this check, but we still want to allow kube services to select workload instances.
935func (c *Controller) isControllerForProxy(proxy *model.Proxy) bool {
936return proxy.Metadata.ClusterID == "" || proxy.Metadata.ClusterID == c.Cluster()
937}
938
939// GetProxyServiceTargetsFromMetadata retrieves ServiceTargets using proxy Metadata rather than
940// from the Pod. This allows retrieving Instances immediately, regardless of delays in Kubernetes.
941// If the proxy doesn't have enough metadata, an error is returned
942func (c *Controller) GetProxyServiceTargetsFromMetadata(proxy *model.Proxy) ([]model.ServiceTarget, error) {
943if len(proxy.Labels) == 0 {
944return nil, nil
945}
946
947if !c.isControllerForProxy(proxy) {
948return nil, fmt.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster())
949}
950
951// Create a pod with just the information needed to find the associated Services
952dummyPod := &v1.Pod{
953ObjectMeta: metav1.ObjectMeta{
954Namespace: proxy.ConfigNamespace,
955Labels: proxy.Labels,
956},
957}
958
959// Find the Service associated with the pod.
960allServices := c.services.List(proxy.ConfigNamespace, klabels.Everything())
961services := getPodServices(allServices, dummyPod)
962if len(services) == 0 {
963return nil, fmt.Errorf("no instances found for %s", proxy.ID)
964}
965
966out := make([]model.ServiceTarget, 0)
967for _, svc := range services {
968hostname := kube.ServiceHostname(svc.Name, svc.Namespace, c.opts.DomainSuffix)
969modelService := c.GetService(hostname)
970if modelService == nil {
971return nil, fmt.Errorf("failed to find model service for %v", hostname)
972}
973
974for _, modelService := range c.servicesForNamespacedName(config.NamespacedName(svc)) {
975tps := make(map[model.Port]*model.Port)
976tpsList := make([]model.Port, 0)
977for _, port := range svc.Spec.Ports {
978svcPort, f := modelService.Ports.Get(port.Name)
979if !f {
980return nil, fmt.Errorf("failed to get svc port for %v", port.Name)
981}
982
983var portNum int
984if len(proxy.Metadata.PodPorts) > 0 {
985var err error
986portNum, err = findPortFromMetadata(port, proxy.Metadata.PodPorts)
987if err != nil {
988return nil, fmt.Errorf("failed to find target port for %v: %v", proxy.ID, err)
989}
990} else {
991// most likely a VM - we assume the WorkloadEntry won't remap any ports
992portNum = port.TargetPort.IntValue()
993}
994
995// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
996// we will have to create only one ingress listener per port and protocol so that we do not endup
997// complaining about listener conflicts.
998targetPort := model.Port{
999Port: portNum,
1000Protocol: svcPort.Protocol,
1001}
1002if _, exists := tps[targetPort]; !exists {
1003tps[targetPort] = svcPort
1004tpsList = append(tpsList, targetPort)
1005}
1006}
1007
1008// Iterate over target ports in the same order as defined in service spec, in case of
1009// protocol conflict for a port causes unstable protocol selection for a port.
1010for _, tp := range tpsList {
1011svcPort := tps[tp]
1012out = append(out, model.ServiceTarget{
1013Service: modelService,
1014Port: model.ServiceInstancePort{
1015ServicePort: svcPort,
1016TargetPort: uint32(tp.Port),
1017},
1018})
1019}
1020}
1021}
1022return out, nil
1023}
1024
1025func (c *Controller) GetProxyServiceTargetsByPod(pod *v1.Pod, service *v1.Service) []model.ServiceTarget {
1026var out []model.ServiceTarget
1027
1028for _, svc := range c.servicesForNamespacedName(config.NamespacedName(service)) {
1029tps := make(map[model.Port]*model.Port)
1030tpsList := make([]model.Port, 0)
1031for _, port := range service.Spec.Ports {
1032svcPort, exists := svc.Ports.Get(port.Name)
1033if !exists {
1034continue
1035}
1036// find target port
1037portNum, err := FindPort(pod, &port)
1038if err != nil {
1039log.Warnf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
1040continue
1041}
1042// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
1043// we will have to create only one ingress listener per port and protocol so that we do not endup
1044// complaining about listener conflicts.
1045targetPort := model.Port{
1046Port: portNum,
1047Protocol: svcPort.Protocol,
1048}
1049if _, exists := tps[targetPort]; !exists {
1050tps[targetPort] = svcPort
1051tpsList = append(tpsList, targetPort)
1052}
1053}
1054// Iterate over target ports in the same order as defined in service spec, in case of
1055// protocol conflict for a port causes unstable protocol selection for a port.
1056for _, tp := range tpsList {
1057svcPort := tps[tp]
1058out = append(out, model.ServiceTarget{
1059Service: svc,
1060Port: model.ServiceInstancePort{
1061ServicePort: svcPort,
1062TargetPort: uint32(tp.Port),
1063},
1064})
1065}
1066}
1067
1068return out
1069}
1070
1071func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
1072pod := c.pods.getPodByProxy(proxy)
1073if pod != nil {
1074var locality, nodeName string
1075locality = c.getPodLocality(pod)
1076if len(proxy.GetNodeName()) == 0 {
1077// this can happen for an "old" proxy with no `Metadata.NodeName` set
1078// in this case we set the node name in labels on the fly
1079// TODO: remove this when 1.16 is EOL?
1080nodeName = pod.Spec.NodeName
1081}
1082if len(locality) == 0 && len(nodeName) == 0 {
1083return pod.Labels
1084}
1085return labelutil.AugmentLabels(pod.Labels, c.clusterID, locality, nodeName, c.network)
1086}
1087return nil
1088}
1089
1090// AppendServiceHandler implements a service catalog operation
1091func (c *Controller) AppendServiceHandler(f model.ServiceHandler) {
1092c.handlers.AppendServiceHandler(f)
1093}
1094
1095// AppendWorkloadHandler implements a service catalog operation
1096func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) {
1097c.handlers.AppendWorkloadHandler(f)
1098}
1099
1100// AppendNamespaceDiscoveryHandlers register handlers on namespace selected/deselected by discovery selectors change.
1101func (c *Controller) AppendNamespaceDiscoveryHandlers(f func(string, model.Event)) {
1102c.namespaceDiscoveryHandlers = append(c.namespaceDiscoveryHandlers, f)
1103}
1104
1105// AppendCrdHandlers register handlers on crd event.
1106func (c *Controller) AppendCrdHandlers(f func(name string)) {
1107c.crdHandlers = append(c.crdHandlers, f)
1108}
1109
1110// hostNamesForNamespacedName returns all possible hostnames for the given service name.
1111// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
1112// hostname as well as the MCS hostname (clusterset.local). Otherwise, only the regular
1113// hostname will be returned.
1114func (c *Controller) hostNamesForNamespacedName(name types.NamespacedName) []host.Name {
1115if features.EnableMCSHost {
1116return []host.Name{
1117kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
1118serviceClusterSetLocalHostname(name),
1119}
1120}
1121return []host.Name{
1122kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
1123}
1124}
1125
1126// servicesForNamespacedName returns all services for the given service name.
1127// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
1128// service as well as the MCS service (clusterset.local), if available. Otherwise,
1129// only the regular service will be returned.
1130func (c *Controller) servicesForNamespacedName(name types.NamespacedName) []*model.Service {
1131if features.EnableMCSHost {
1132out := make([]*model.Service, 0, 2)
1133
1134c.RLock()
1135if svc := c.servicesMap[kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)]; svc != nil {
1136out = append(out, svc)
1137}
1138
1139if svc := c.servicesMap[serviceClusterSetLocalHostname(name)]; svc != nil {
1140out = append(out, svc)
1141}
1142c.RUnlock()
1143
1144return out
1145}
1146if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)); svc != nil {
1147return []*model.Service{svc}
1148}
1149return nil
1150}
1151
1152func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv *model.Service) bool {
1153if !features.EnableOptimizedServicePush {
1154return true
1155}
1156if preConv == nil {
1157return !currConv.Attributes.ExportTo.Contains(visibility.None)
1158}
1159// if service are not exported, no need to push
1160if preConv.Attributes.ExportTo.Contains(visibility.None) &&
1161currConv.Attributes.ExportTo.Contains(visibility.None) {
1162return false
1163}
1164// Check if there are any changes we care about by comparing `model.Service`s
1165if !preConv.Equals(currConv) {
1166return true
1167}
1168// Also check if target ports are changed since they are not included in `model.Service`
1169// `preConv.Equals(currConv)` already makes sure the length of ports is not changed
1170if prev != nil && curr != nil {
1171if !slices.EqualFunc(prev.Spec.Ports, curr.Spec.Ports, func(a, b v1.ServicePort) bool {
1172return a.TargetPort == b.TargetPort
1173}) {
1174return true
1175}
1176}
1177return false
1178}
1179