istio
1052 строки · 36.5 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package serviceentry
16
17import (
18"fmt"
19"hash/fnv"
20"strconv"
21"sync"
22"time"
23
24"k8s.io/apimachinery/pkg/types"
25
26networking "istio.io/api/networking/v1alpha3"
27"istio.io/istio/pilot/pkg/features"
28"istio.io/istio/pilot/pkg/model"
29"istio.io/istio/pilot/pkg/model/status"
30"istio.io/istio/pilot/pkg/serviceregistry"
31"istio.io/istio/pilot/pkg/serviceregistry/provider"
32"istio.io/istio/pilot/pkg/serviceregistry/util/workloadinstances"
33"istio.io/istio/pkg/cluster"
34"istio.io/istio/pkg/config"
35"istio.io/istio/pkg/config/constants"
36"istio.io/istio/pkg/config/host"
37"istio.io/istio/pkg/config/labels"
38"istio.io/istio/pkg/config/schema/gvk"
39"istio.io/istio/pkg/config/schema/kind"
40istiolog "istio.io/istio/pkg/log"
41"istio.io/istio/pkg/maps"
42"istio.io/istio/pkg/network"
43"istio.io/istio/pkg/queue"
44"istio.io/istio/pkg/slices"
45"istio.io/istio/pkg/util/protomarshal"
46"istio.io/istio/pkg/util/sets"
47)
48
49var (
50_ serviceregistry.Instance = &Controller{}
51log = istiolog.RegisterScope("serviceentry", "ServiceEntry registry")
52)
53
54var (
55prime = 65011 // Used for secondary hash function.
56maxIPs = 256 * 254 // Maximum possible IPs for address allocation.
57)
58
59// instancesKey acts as a key to identify all instances for a given hostname/namespace pair
60// This is mostly used as an index
61type instancesKey struct {
62hostname host.Name
63namespace string
64}
65
66type octetPair struct {
67thirdOctet int
68fourthOctet int
69}
70
71func makeInstanceKey(i *model.ServiceInstance) instancesKey {
72return instancesKey{i.Service.Hostname, i.Service.Attributes.Namespace}
73}
74
75type configType int
76
77const (
78serviceEntryConfigType configType = iota
79workloadEntryConfigType
80podConfigType
81)
82
83// configKey unique identifies a config object managed by this registry (ServiceEntry and WorkloadEntry)
84type configKey struct {
85kind configType
86name string
87namespace string
88}
89
90// Controller communicates with ServiceEntry CRDs and monitors for changes.
91type Controller struct {
92XdsUpdater model.XDSUpdater
93
94store model.ConfigStore
95clusterID cluster.ID
96
97// This lock is to make multi ops on the below stores. For example, in some case,
98// it requires delete all instances and then update new ones.
99mutex sync.RWMutex
100
101serviceInstances serviceInstancesStore
102// NOTE: historically, one index for both WorkloadEntry(s) and Pod(s);
103// beware of naming collisions
104workloadInstances workloadinstances.Index
105services serviceStore
106
107// To make sure the eds update run in serial to prevent stale ones can override new ones
108// when edsUpdate is called concurrently.
109// If all share one lock, then all the threads can have an obvious performance downgrade.
110edsQueue queue.Instance
111
112workloadHandlers []func(*model.WorkloadInstance, model.Event)
113
114// callback function used to get the networkID according to workload ip and labels.
115networkIDCallback func(IP string, labels labels.Instance) network.ID
116
117// Indicates whether this controller is for workload entries.
118workloadEntryController bool
119
120model.NoopAmbientIndexes
121model.NetworkGatewaysHandler
122}
123
124type Option func(*Controller)
125
126func WithClusterID(clusterID cluster.ID) Option {
127return func(o *Controller) {
128o.clusterID = clusterID
129}
130}
131
132func WithNetworkIDCb(cb func(endpointIP string, labels labels.Instance) network.ID) Option {
133return func(o *Controller) {
134o.networkIDCallback = cb
135}
136}
137
138// NewController creates a new ServiceEntry discovery service.
139func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
140options ...Option,
141) *Controller {
142s := newController(configController, xdsUpdater, options...)
143if configController != nil {
144configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
145configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
146}
147return s
148}
149
150// NewWorkloadEntryController creates a new WorkloadEntry discovery service.
151func NewWorkloadEntryController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
152options ...Option,
153) *Controller {
154s := newController(configController, xdsUpdater, options...)
155// Disable service entry processing for workload entry controller.
156s.workloadEntryController = true
157for _, o := range options {
158o(s)
159}
160
161if configController != nil {
162configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
163}
164return s
165}
166
167func newController(store model.ConfigStore, xdsUpdater model.XDSUpdater, options ...Option) *Controller {
168s := &Controller{
169XdsUpdater: xdsUpdater,
170store: store,
171serviceInstances: serviceInstancesStore{
172ip2instance: map[string][]*model.ServiceInstance{},
173instances: map[instancesKey]map[configKey][]*model.ServiceInstance{},
174instancesBySE: map[types.NamespacedName]map[configKey][]*model.ServiceInstance{},
175instancesByHostAndPort: sets.New[hostPort](),
176},
177workloadInstances: workloadinstances.NewIndex(),
178services: serviceStore{
179servicesBySE: map[types.NamespacedName][]*model.Service{},
180},
181edsQueue: queue.NewQueue(time.Second),
182}
183for _, o := range options {
184o(s)
185}
186return s
187}
188
189// ConvertServiceEntry convert se from Config.Spec.
190func ConvertServiceEntry(cfg config.Config) *networking.ServiceEntry {
191se := cfg.Spec.(*networking.ServiceEntry)
192if se == nil {
193return nil
194}
195
196// shallow copy
197copied := &networking.ServiceEntry{}
198protomarshal.ShallowCopy(copied, se)
199return copied
200}
201
202// ConvertWorkloadEntry convert wle from Config.Spec and populate the metadata labels into it.
203func ConvertWorkloadEntry(cfg config.Config) *networking.WorkloadEntry {
204wle := cfg.Spec.(*networking.WorkloadEntry)
205if wle == nil {
206return nil
207}
208
209// we will merge labels from metadata with spec, with precedence to the metadata
210labels := maps.MergeCopy(wle.Labels, cfg.Labels)
211// shallow copy
212copied := &networking.WorkloadEntry{}
213protomarshal.ShallowCopy(copied, wle)
214copied.Labels = labels
215return copied
216}
217
218// workloadEntryHandler defines the handler for workload entries
219func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) {
220log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name)
221var oldWle *networking.WorkloadEntry
222if old.Spec != nil {
223oldWle = ConvertWorkloadEntry(old)
224}
225wle := ConvertWorkloadEntry(curr)
226curr.Spec = wle
227key := configKey{
228kind: workloadEntryConfigType,
229name: curr.Name,
230namespace: curr.Namespace,
231}
232
233// If an entry is unhealthy, we will mark this as a delete instead
234// This ensures we do not track unhealthy endpoints
235if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
236event = model.EventDelete
237}
238
239wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster())
240if wi != nil && !wi.DNSServiceEntryOnly {
241// fire off the k8s handlers
242s.NotifyWorkloadInstanceHandlers(wi, event)
243}
244
245// includes instances new updated or unchanged, in other word it is the current state.
246instancesUpdated := []*model.ServiceInstance{}
247instancesDeleted := []*model.ServiceInstance{}
248fullPush := false
249configsUpdated := sets.New[model.ConfigKey]()
250
251addConfigs := func(se *networking.ServiceEntry, services []*model.Service) {
252// If serviceentry's resolution is DNS, make a full push
253// TODO: maybe cds?
254if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
255fullPush = true
256for key, value := range getUpdatedConfigs(services) {
257configsUpdated[key] = value
258}
259}
260}
261
262cfgs := s.store.List(gvk.ServiceEntry, curr.Namespace)
263currSes := getWorkloadServiceEntries(cfgs, wle)
264var oldSes map[types.NamespacedName]*config.Config
265if oldWle != nil {
266if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
267oldSes = currSes
268} else {
269// labels update should trigger proxy update
270s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
271oldSes = getWorkloadServiceEntries(cfgs, oldWle)
272}
273}
274unSelected := difference(oldSes, currSes)
275log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected)
276s.mutex.Lock()
277for namespacedName, cfg := range currSes {
278services := s.services.getServices(namespacedName)
279se := cfg.Spec.(*networking.ServiceEntry)
280if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
281se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
282log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
283continue
284}
285instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
286instancesUpdated = append(instancesUpdated, instance...)
287if event == model.EventDelete {
288s.serviceInstances.deleteServiceEntryInstances(namespacedName, key)
289} else {
290s.serviceInstances.updateServiceEntryInstancesPerConfig(namespacedName, key, instance)
291}
292addConfigs(se, services)
293}
294
295for _, namespacedName := range unSelected {
296services := s.services.getServices(namespacedName)
297cfg := oldSes[namespacedName]
298se := cfg.Spec.(*networking.ServiceEntry)
299if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
300se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
301log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
302continue
303}
304instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
305instancesDeleted = append(instancesDeleted, instance...)
306s.serviceInstances.deleteServiceEntryInstances(namespacedName, key)
307addConfigs(se, services)
308}
309
310s.serviceInstances.deleteInstanceKeys(key, instancesDeleted)
311if event == model.EventDelete {
312s.workloadInstances.Delete(wi)
313s.serviceInstances.deleteInstanceKeys(key, instancesUpdated)
314} else {
315s.workloadInstances.Insert(wi)
316s.serviceInstances.updateInstances(key, instancesUpdated)
317}
318s.mutex.Unlock()
319
320allInstances := append(instancesUpdated, instancesDeleted...)
321if !fullPush {
322// trigger full xds push to the related sidecar proxy
323if event == model.EventAdd {
324s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
325}
326s.edsUpdate(allInstances)
327return
328}
329
330// update eds cache only
331s.edsCacheUpdate(allInstances)
332
333pushReq := &model.PushRequest{
334Full: true,
335ConfigsUpdated: configsUpdated,
336Reason: model.NewReasonStats(model.EndpointUpdate),
337}
338// trigger a full push
339s.XdsUpdater.ConfigUpdate(pushReq)
340}
341
342func (s *Controller) NotifyWorkloadInstanceHandlers(wi *model.WorkloadInstance, event model.Event) {
343for _, h := range s.workloadHandlers {
344h(wi, event)
345}
346}
347
348// getUpdatedConfigs returns related service entries when full push
349func getUpdatedConfigs(services []*model.Service) sets.Set[model.ConfigKey] {
350configsUpdated := sets.New[model.ConfigKey]()
351for _, svc := range services {
352configsUpdated.Insert(model.ConfigKey{
353Kind: kind.ServiceEntry,
354Name: string(svc.Hostname),
355Namespace: svc.Attributes.Namespace,
356})
357}
358return configsUpdated
359}
360
361// serviceEntryHandler defines the handler for service entries
362func (s *Controller) serviceEntryHandler(old, curr config.Config, event model.Event) {
363log.Debugf("Handle event %s for service entry %s/%s", event, curr.Namespace, curr.Name)
364currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
365cs := convertServices(curr)
366configsUpdated := sets.New[model.ConfigKey]()
367key := curr.NamespacedName()
368
369s.mutex.Lock()
370// If it is add/delete event we should always do a full push. If it is update event, we should do full push,
371// only when services have changed - otherwise, just push endpoint updates.
372var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service
373switch event {
374case model.EventUpdate:
375addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(s.services.getServices(key), cs)
376oldServiceEntry := old.Spec.(*networking.ServiceEntry)
377// Also check if target ports are changed since they are not included in `model.Service`
378if !slices.EqualFunc(oldServiceEntry.Ports, currentServiceEntry.Ports, func(a, b *networking.ServicePort) bool {
379return a.TargetPort == b.TargetPort
380}) {
381// Note: If the length of ports is changed, unchangedSvcs will be nil, this is an no-op
382updatedSvcs = append(updatedSvcs, unchangedSvcs...)
383unchangedSvcs = nil
384}
385s.services.updateServices(key, cs)
386case model.EventDelete:
387deletedSvcs = cs
388s.services.deleteServices(key)
389case model.EventAdd:
390addedSvcs = cs
391s.services.updateServices(key, cs)
392default:
393// this should not happen
394unchangedSvcs = cs
395}
396
397serviceInstancesByConfig, serviceInstances := s.buildServiceInstances(curr, cs)
398oldInstances := s.serviceInstances.getServiceEntryInstances(key)
399for configKey, old := range oldInstances {
400s.serviceInstances.deleteInstanceKeys(configKey, old)
401}
402if event == model.EventDelete {
403s.serviceInstances.deleteAllServiceEntryInstances(key)
404} else {
405// Update the indexes with new instances.
406for ckey, value := range serviceInstancesByConfig {
407s.serviceInstances.addInstances(ckey, value)
408}
409s.serviceInstances.updateServiceEntryInstances(key, serviceInstancesByConfig)
410}
411
412shard := model.ShardKeyFromRegistry(s)
413
414for _, svc := range addedSvcs {
415s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
416configsUpdated.Insert(makeConfigKey(svc))
417}
418
419for _, svc := range updatedSvcs {
420s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)
421configsUpdated.Insert(makeConfigKey(svc))
422}
423// If service entry is deleted, call SvcUpdate to cleanup endpoint shards for services.
424for _, svc := range deletedSvcs {
425instanceKey := instancesKey{namespace: svc.Attributes.Namespace, hostname: svc.Hostname}
426// There can be multiple service entries of same host reside in same namespace.
427// Delete endpoint shards only if there are no service instances.
428if len(s.serviceInstances.getByKey(instanceKey)) == 0 {
429s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
430} else {
431// If there are some endpoints remaining for the host, add svc to updatedSvcs to trigger eds cache update
432updatedSvcs = append(updatedSvcs, svc)
433}
434configsUpdated.Insert(makeConfigKey(svc))
435}
436
437// If a service is updated and is not part of updatedSvcs, that means its endpoints might have changed.
438// If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update.
439// If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do
440// full push (as fqdn endpoints go via strict_dns clusters in cds).
441if len(unchangedSvcs) > 0 {
442if currentServiceEntry.Resolution == networking.ServiceEntry_DNS || currentServiceEntry.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
443for _, svc := range unchangedSvcs {
444configsUpdated.Insert(makeConfigKey(svc))
445}
446}
447}
448s.mutex.Unlock()
449
450fullPush := len(configsUpdated) > 0
451// if not full push needed, at least one service unchanged
452if !fullPush {
453s.edsUpdate(serviceInstances)
454return
455}
456
457// When doing a full push, the non DNS added, updated, unchanged services trigger an eds update
458// so that endpoint shards are updated.
459allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
460allServices = append(allServices, addedSvcs...)
461allServices = append(allServices, updatedSvcs...)
462allServices = append(allServices, unchangedSvcs...)
463
464// non dns service instances
465keys := sets.NewWithLength[instancesKey](len(allServices))
466for _, svc := range allServices {
467keys.Insert(instancesKey{hostname: svc.Hostname, namespace: curr.Namespace})
468}
469
470s.queueEdsEvent(keys, s.doEdsCacheUpdate)
471
472pushReq := &model.PushRequest{
473Full: true,
474ConfigsUpdated: configsUpdated,
475Reason: model.NewReasonStats(model.ServiceUpdate),
476}
477s.XdsUpdater.ConfigUpdate(pushReq)
478}
479
480// WorkloadInstanceHandler defines the handler for service instances generated by other registries
481func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
482log.Debugf("Handle event %s for workload instance (%s/%s) in namespace %s", event,
483wi.Kind, wi.Endpoint.Address, wi.Namespace)
484var oldWi *model.WorkloadInstance
485key := configKey{
486kind: podConfigType,
487name: wi.Name,
488namespace: wi.Namespace,
489}
490// Used to indicate if this event was fired for a pod->workloadentry conversion
491// and that the event can be ignored due to no relevant change in the workloadentry
492redundantEventForPod := false
493
494// Used to indicate if the wi labels changed and we need to recheck all instances
495labelsChanged := false
496
497var addressToDelete string
498s.mutex.Lock()
499// this is from a pod. Store it in separate map so that
500// the refreshIndexes function can use these as well as the store ones.
501switch event {
502case model.EventDelete:
503redundantEventForPod = s.workloadInstances.Delete(wi) == nil
504default: // add or update
505if oldWi = s.workloadInstances.Insert(wi); oldWi != nil {
506if oldWi.Endpoint.Address != wi.Endpoint.Address {
507addressToDelete = oldWi.Endpoint.Address
508}
509// Check if the old labels still match the new labels. If they don't then we need to
510// refresh the list of instances for this wi
511if !oldWi.Endpoint.Labels.Equals(wi.Endpoint.Labels) {
512labelsChanged = true
513}
514// If multiple k8s services select the same pod or a service has multiple ports,
515// we may be getting multiple events ignore them as we only care about the Endpoint IP itself.
516if model.WorkloadInstancesEqual(oldWi, wi) {
517// ignore the update as nothing has changed
518redundantEventForPod = true
519}
520}
521}
522
523if redundantEventForPod {
524s.mutex.Unlock()
525return
526}
527
528// We will only select entries in the same namespace
529cfgs := s.store.List(gvk.ServiceEntry, wi.Namespace)
530if len(cfgs) == 0 {
531s.mutex.Unlock()
532return
533}
534
535instances := []*model.ServiceInstance{}
536instancesDeleted := []*model.ServiceInstance{}
537configsUpdated := sets.New[model.ConfigKey]()
538fullPush := false
539for _, cfg := range cfgs {
540se := cfg.Spec.(*networking.ServiceEntry)
541if se.WorkloadSelector == nil || (!labelsChanged && !labels.Instance(se.WorkloadSelector.Labels).Match(wi.Endpoint.Labels)) {
542// If the labels didn't change. And the new SE doesn't match then the old didn't match either and we can skip processing it.
543continue
544}
545
546// If we are here, then there are 3 possible cases :
547// Case 1 : The new wi is a subset of se
548// Case 2 : The labelsChanged and the new wi is still a subset of se
549// Case 3 : The labelsChanged and the new wi is NOT a subset of se anymore
550
551seNamespacedName := cfg.NamespacedName()
552services := s.services.getServices(seNamespacedName)
553currInstance := convertWorkloadInstanceToServiceInstance(wi, services, se)
554
555// We check if the wi is still a subset of se. This would cover Case 1 and Case 2 from above.
556if labels.Instance(se.WorkloadSelector.Labels).Match(wi.Endpoint.Labels) {
557// If the workload instance still matches. We take care of the possible events.
558instances = append(instances, currInstance...)
559if addressToDelete != "" {
560for _, i := range currInstance {
561di := i.DeepCopy()
562di.Endpoint.Address = addressToDelete
563instancesDeleted = append(instancesDeleted, di)
564}
565s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
566} else if event == model.EventDelete {
567s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
568} else {
569s.serviceInstances.updateServiceEntryInstancesPerConfig(seNamespacedName, key, currInstance)
570}
571// If serviceentry's resolution is DNS, make a full push
572// TODO: maybe cds?
573if (se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN) &&
574se.WorkloadSelector != nil {
575
576fullPush = true
577for _, inst := range currInstance {
578configsUpdated[model.ConfigKey{
579Kind: kind.ServiceEntry,
580Name: string(inst.Service.Hostname),
581Namespace: cfg.Namespace,
582}] = struct{}{}
583}
584}
585} else if labels.Instance(se.WorkloadSelector.Labels).Match(oldWi.Endpoint.Labels) {
586// If we're here, it means that the labels changed and the new labels don't match the SE anymore (Case 3 from above) and the oldWi did
587// match the SE.
588// Since the instance doesn't match the SE anymore. We remove it from the list.
589oldInstance := convertWorkloadInstanceToServiceInstance(oldWi, services, se)
590instancesDeleted = append(instancesDeleted, oldInstance...)
591s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
592}
593}
594
595if len(instancesDeleted) > 0 {
596s.serviceInstances.deleteInstanceKeys(key, instancesDeleted)
597}
598
599if event == model.EventDelete {
600s.serviceInstances.deleteInstanceKeys(key, instances)
601} else {
602s.serviceInstances.updateInstances(key, instances)
603}
604s.mutex.Unlock()
605
606s.edsUpdate(append(instances, instancesDeleted...))
607
608// ServiceEntry with WorkloadEntry results in STRICT_DNS cluster with hardcoded endpoints
609// need to update CDS to refresh endpoints
610// https://github.com/istio/istio/issues/39505
611if fullPush {
612log.Debugf("Full push triggered during event %s for workload instance (%s/%s) in namespace %s", event,
613wi.Kind, wi.Endpoint.Address, wi.Namespace)
614pushReq := &model.PushRequest{
615Full: true,
616ConfigsUpdated: configsUpdated,
617Reason: model.NewReasonStats(model.EndpointUpdate),
618}
619s.XdsUpdater.ConfigUpdate(pushReq)
620}
621}
622
623func (s *Controller) Provider() provider.ID {
624return provider.External
625}
626
627func (s *Controller) Cluster() cluster.ID {
628return s.clusterID
629}
630
631// AppendServiceHandler adds service resource event handler. Service Entries does not use these handlers.
632func (s *Controller) AppendServiceHandler(_ model.ServiceHandler) {}
633
634// AppendWorkloadHandler adds instance event handler. Service Entries does not use these handlers.
635func (s *Controller) AppendWorkloadHandler(h func(*model.WorkloadInstance, model.Event)) {
636s.workloadHandlers = append(s.workloadHandlers, h)
637}
638
639// Run is used by some controllers to execute background jobs after init is done.
640func (s *Controller) Run(stopCh <-chan struct{}) {
641s.edsQueue.Run(stopCh)
642}
643
644// HasSynced always returns true for SE
645func (s *Controller) HasSynced() bool {
646return true
647}
648
649// Services list declarations of all services in the system
650func (s *Controller) Services() []*model.Service {
651s.mutex.Lock()
652allServices := s.services.getAllServices()
653out := make([]*model.Service, 0, len(allServices))
654if s.services.allocateNeeded {
655autoAllocateIPs(allServices)
656s.services.allocateNeeded = false
657}
658s.mutex.Unlock()
659for _, svc := range allServices {
660// shallow copy, copy `AutoAllocatedIPv4Address` and `AutoAllocatedIPv6Address`
661// if return the pointer directly, there will be a race with `BuildNameTable`
662// nolint: govet
663shallowSvc := *svc
664out = append(out, &shallowSvc)
665}
666return out
667}
668
669// GetService retrieves a service by host name if it exists.
670// NOTE: The service entry implementation is used only for tests.
671func (s *Controller) GetService(hostname host.Name) *model.Service {
672if s.workloadEntryController {
673return nil
674}
675// TODO(@hzxuzhonghu): only get the specific service instead of converting all the serviceEntries
676services := s.Services()
677for _, service := range services {
678if service.Hostname == hostname {
679return service
680}
681}
682
683return nil
684}
685
686// ResyncEDS will do a full EDS update. This is needed for some tests where we have many configs loaded without calling
687// the config handlers.
688// This should probably not be used in production code.
689func (s *Controller) ResyncEDS() {
690s.mutex.RLock()
691allInstances := s.serviceInstances.getAll()
692s.mutex.RUnlock()
693s.edsUpdate(allInstances)
694// HACK to workaround Service syncing after WorkloadEntry: https://github.com/istio/istio/issues/45114
695s.workloadInstances.ForEach(func(wi *model.WorkloadInstance) {
696s.NotifyWorkloadInstanceHandlers(wi, model.EventAdd)
697})
698}
699
700// edsUpdate triggers an EDS push serially such that we can prevent all instances
701// got at t1 can accidentally override that got at t2 if multiple threads are
702// running this function. Queueing ensures latest updated wins.
703func (s *Controller) edsUpdate(instances []*model.ServiceInstance) {
704// Find all keys we need to lookup
705keys := map[instancesKey]struct{}{}
706for _, i := range instances {
707keys[makeInstanceKey(i)] = struct{}{}
708}
709s.queueEdsEvent(keys, s.doEdsUpdate)
710}
711
712// edsCacheUpdate updates eds cache serially such that we can prevent allinstances
713// got at t1 can accidentally override that got at t2 if multiple threads are
714// running this function. Queueing ensures latest updated wins.
715func (s *Controller) edsCacheUpdate(instances []*model.ServiceInstance) {
716// Find all keys we need to lookup
717keys := map[instancesKey]struct{}{}
718for _, i := range instances {
719keys[makeInstanceKey(i)] = struct{}{}
720}
721s.queueEdsEvent(keys, s.doEdsCacheUpdate)
722}
723
724// queueEdsEvent processes eds events sequentially for the passed keys and invokes the passed function.
725func (s *Controller) queueEdsEvent(keys sets.Set[instancesKey], edsFn func(keys sets.Set[instancesKey])) {
726// wait for the cache update finished
727waitCh := make(chan struct{})
728// trigger update eds endpoint shards
729s.edsQueue.Push(func() error {
730defer close(waitCh)
731edsFn(keys)
732return nil
733})
734select {
735case <-waitCh:
736return
737// To prevent goroutine leak in tests
738// in case the queue is stopped but the task has not been executed..
739case <-s.edsQueue.Closed():
740return
741}
742}
743
744// doEdsCacheUpdate invokes XdsUpdater's EDSCacheUpdate to update endpoint shards.
745func (s *Controller) doEdsCacheUpdate(keys sets.Set[instancesKey]) {
746endpoints := s.buildEndpoints(keys)
747shard := model.ShardKeyFromRegistry(s)
748// This is delete.
749if len(endpoints) == 0 {
750for k := range keys {
751s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, nil)
752}
753} else {
754for k, eps := range endpoints {
755s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, eps)
756}
757}
758}
759
760// doEdsUpdate invokes XdsUpdater's eds update to trigger eds push.
761func (s *Controller) doEdsUpdate(keys sets.Set[instancesKey]) {
762endpoints := s.buildEndpoints(keys)
763shard := model.ShardKeyFromRegistry(s)
764// This is delete.
765if len(endpoints) == 0 {
766for k := range keys {
767s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, nil)
768}
769} else {
770for k, eps := range endpoints {
771s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, eps)
772}
773}
774}
775
776// buildEndpoints builds endpoints for the instance keys.
777func (s *Controller) buildEndpoints(keys map[instancesKey]struct{}) map[instancesKey][]*model.IstioEndpoint {
778var endpoints map[instancesKey][]*model.IstioEndpoint
779allInstances := []*model.ServiceInstance{}
780s.mutex.RLock()
781for key := range keys {
782i := s.serviceInstances.getByKey(key)
783allInstances = append(allInstances, i...)
784}
785s.mutex.RUnlock()
786
787if len(allInstances) > 0 {
788endpoints = make(map[instancesKey][]*model.IstioEndpoint)
789for _, instance := range allInstances {
790key := makeInstanceKey(instance)
791endpoints[key] = append(endpoints[key], instance.Endpoint)
792}
793
794}
795return endpoints
796}
797
798// GetProxyServiceTargets lists service targets co-located with a given proxy
799// NOTE: The service objects in these instances do not have the auto allocated IP set.
800func (s *Controller) GetProxyServiceTargets(node *model.Proxy) []model.ServiceTarget {
801out := make([]model.ServiceTarget, 0)
802s.mutex.RLock()
803defer s.mutex.RUnlock()
804for _, ip := range node.IPAddresses {
805instances := s.serviceInstances.getByIP(ip)
806for _, i := range instances {
807// Insert all instances for this IP for services within the same namespace. This ensures we
808// match Kubernetes logic where Services do not cross namespace boundaries and avoids
809// possibility of other namespaces inserting service instances into namespaces they do not
810// control.
811if node.Metadata.Namespace == "" || i.Service.Attributes.Namespace == node.Metadata.Namespace {
812out = append(out, model.ServiceInstanceToTarget(i))
813}
814}
815}
816return out
817}
818
819func (s *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
820s.mutex.RLock()
821defer s.mutex.RUnlock()
822for _, ip := range proxy.IPAddresses {
823instances := s.serviceInstances.getByIP(ip)
824for _, i := range instances {
825// Insert first instances for this IP for services within the same namespace. This ensures we
826// match Kubernetes logic where Services do not cross namespace boundaries and avoids
827// possibility of other namespaces inserting service instances into namespaces they do not
828// control.
829// All instances should have the same labels so we just return the first
830if proxy.Metadata.Namespace == "" || i.Service.Attributes.Namespace == proxy.Metadata.Namespace {
831return i.Endpoint.Labels
832}
833}
834}
835return nil
836}
837
838func (s *Controller) NetworkGateways() []model.NetworkGateway {
839// TODO implement mesh networks loading logic from kube controller if needed
840return nil
841}
842
843func (s *Controller) MCSServices() []model.MCSServiceInfo {
844return nil
845}
846
847func servicesDiff(os []*model.Service, ns []*model.Service) ([]*model.Service, []*model.Service, []*model.Service, []*model.Service) {
848var added, deleted, updated, unchanged []*model.Service
849
850oldServiceHosts := make(map[host.Name]*model.Service, len(os))
851for _, s := range os {
852oldServiceHosts[s.Hostname] = s
853}
854
855for _, s := range ns {
856oldSvc, ok := oldServiceHosts[s.Hostname]
857if ok && s.Equals(oldSvc) {
858unchanged = append(unchanged, s)
859} else if ok {
860updated = append(updated, s)
861} else {
862added = append(added, s)
863}
864delete(oldServiceHosts, s.Hostname)
865}
866deleted = maps.Values(oldServiceHosts)
867
868return added, deleted, updated, unchanged
869}
870
871// Automatically allocates IPs for service entry services WITHOUT an
872// address field if the hostname is not a wildcard, or when resolution
873// is not NONE. The IPs are allocated from the reserved Class E subnet
874// (240.240.0.0/16) that is not reachable outside the pod or reserved
875// Benchmarking IP range (2001:2::/48) in RFC5180. When DNS
876// capture is enabled, Envoy will resolve the DNS to these IPs. The
877// listeners for TCP services will also be set up on these IPs. The
878// IPs allocated to a service entry may differ from istiod to istiod
879// but it does not matter because these IPs only affect the listener
880// IPs on a given proxy managed by a given istiod.
881//
882// NOTE: If DNS capture is not enabled by the proxy, the automatically
883// allocated IP addresses do not take effect.
884//
885// The current algorithm to allocate IPs is deterministic across all istiods.
886func autoAllocateIPs(services []*model.Service) []*model.Service {
887hashedServices := make([]*model.Service, maxIPs)
888hash := fnv.New32a()
889// First iterate through the range of services and determine its position by hash
890// so that we can deterministically allocate an IP.
891// We use "Double Hashning" for collision detection.
892// The hash algorithm is
893// - h1(k) = Sum32 hash of the service key (namespace + "/" + hostname)
894// - Check if we have an empty slot for h1(x) % MAXIPS. Use it if available.
895// - If there is a collision, apply second hash i.e. h2(x) = PRIME - (Key % PRIME)
896// where PRIME is the max prime number below MAXIPS.
897// - Calculate new hash iteratively till we find an empty slot with (h1(k) + i*h2(k)) % MAXIPS
898j := 0
899for _, svc := range services {
900// we can allocate IPs only if
901// 1. the service has resolution set to static/dns. We cannot allocate
902// for NONE because we will not know the original DST IP that the application requested.
903// 2. the address is not set (0.0.0.0)
904// 3. the hostname is not a wildcard
905if svc.DefaultAddress == constants.UnspecifiedIP && !svc.Hostname.IsWildCarded() &&
906svc.Resolution != model.Passthrough {
907if j >= maxIPs {
908log.Errorf("out of IPs to allocate for service entries. maxips:= %d", maxIPs)
909break
910}
911// First hash is calculated by hashing the service key i.e. (namespace + "/" + hostname).
912hash.Write([]byte(makeServiceKey(svc)))
913s := hash.Sum32()
914firstHash := s % uint32(maxIPs)
915// Check if there is a service with this hash first. If there is no service
916// at this location - then we can safely assign this position for this service.
917if hashedServices[firstHash] == nil {
918hashedServices[firstHash] = svc
919} else {
920// This means we have a collision. Resolve collision by "DoubleHashing".
921i := uint32(1)
922secondHash := uint32(prime) - (s % uint32(prime))
923for {
924nh := (s + i*secondHash) % uint32(maxIPs-1)
925if hashedServices[nh] == nil {
926hashedServices[nh] = svc
927break
928}
929i++
930}
931}
932hash.Reset()
933j++
934}
935}
936
937x := 0
938hnMap := make(map[string]octetPair)
939for _, svc := range hashedServices {
940x++
941if svc == nil {
942// There is no service in the slot. Just increment x and move forward.
943continue
944}
945n := makeServiceKey(svc)
946// To avoid allocating 240.240.(i).255, if X % 255 is 0, increment X.
947// For example, when X=510, the resulting IP would be 240.240.2.0 (invalid)
948// So we bump X to 511, so that the resulting IP is 240.240.2.1
949if x%255 == 0 {
950x++
951}
952if v, ok := hnMap[n]; ok {
953log.Debugf("Reuse IP for domain %s", n)
954setAutoAllocatedIPs(svc, v)
955} else {
956var thirdOctect, fourthOctect int
957thirdOctect = x / 255
958fourthOctect = x % 255
959pair := octetPair{thirdOctect, fourthOctect}
960setAutoAllocatedIPs(svc, pair)
961hnMap[n] = pair
962}
963}
964return services
965}
966
967func makeServiceKey(svc *model.Service) string {
968return svc.Attributes.Namespace + "/" + svc.Hostname.String()
969}
970
971func setAutoAllocatedIPs(svc *model.Service, octets octetPair) {
972a := octets.thirdOctet
973b := octets.fourthOctet
974svc.AutoAllocatedIPv4Address = fmt.Sprintf("240.240.%d.%d", a, b)
975if a == 0 {
976svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x", b)
977} else {
978svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x%x", a, b)
979}
980}
981
982func makeConfigKey(svc *model.Service) model.ConfigKey {
983return model.ConfigKey{
984Kind: kind.ServiceEntry,
985Name: string(svc.Hostname),
986Namespace: svc.Attributes.Namespace,
987}
988}
989
990// isHealthy checks that the provided WorkloadEntry is healthy. If health checks are not enabled,
991// it is assumed to always be healthy
992func isHealthy(cfg config.Config) bool {
993if parseHealthAnnotation(cfg.Annotations[status.WorkloadEntryHealthCheckAnnotation]) {
994// We default to false if the condition is not set. This ensures newly created WorkloadEntries
995// are treated as unhealthy until we prove they are healthy by probe success.
996return status.GetBoolConditionFromSpec(cfg, status.ConditionHealthy, false)
997}
998// If health check is not enabled, assume its healthy
999return true
1000}
1001
1002func parseHealthAnnotation(s string) bool {
1003if s == "" {
1004return false
1005}
1006p, err := strconv.ParseBool(s)
1007if err != nil {
1008return false
1009}
1010return p
1011}
1012
1013func (s *Controller) buildServiceInstances(
1014curr config.Config,
1015services []*model.Service,
1016) (map[configKey][]*model.ServiceInstance, []*model.ServiceInstance) {
1017currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
1018var serviceInstances []*model.ServiceInstance
1019serviceInstancesByConfig := map[configKey][]*model.ServiceInstance{}
1020// for service entry with labels
1021if currentServiceEntry.WorkloadSelector != nil {
1022selector := workloadinstances.ByServiceSelector(curr.Namespace, currentServiceEntry.WorkloadSelector.Labels)
1023workloadInstances := workloadinstances.FindAllInIndex(s.workloadInstances, selector)
1024for _, wi := range workloadInstances {
1025if wi.DNSServiceEntryOnly && currentServiceEntry.Resolution != networking.ServiceEntry_DNS &&
1026currentServiceEntry.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
1027log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name,
1028currentServiceEntry.Hosts)
1029continue
1030}
1031instances := convertWorkloadInstanceToServiceInstance(wi, services, currentServiceEntry)
1032serviceInstances = append(serviceInstances, instances...)
1033ckey := configKey{namespace: wi.Namespace, name: wi.Name}
1034if wi.Kind == model.PodKind {
1035ckey.kind = podConfigType
1036} else {
1037ckey.kind = workloadEntryConfigType
1038}
1039serviceInstancesByConfig[ckey] = instances
1040}
1041} else {
1042serviceInstances = s.convertServiceEntryToInstances(curr, services)
1043ckey := configKey{
1044kind: serviceEntryConfigType,
1045name: curr.Name,
1046namespace: curr.Namespace,
1047}
1048serviceInstancesByConfig[ckey] = serviceInstances
1049}
1050
1051return serviceInstancesByConfig, serviceInstances
1052}
1053