istio

Форк
0
1052 строки · 36.5 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package serviceentry
16

17
import (
18
	"fmt"
19
	"hash/fnv"
20
	"strconv"
21
	"sync"
22
	"time"
23

24
	"k8s.io/apimachinery/pkg/types"
25

26
	networking "istio.io/api/networking/v1alpha3"
27
	"istio.io/istio/pilot/pkg/features"
28
	"istio.io/istio/pilot/pkg/model"
29
	"istio.io/istio/pilot/pkg/model/status"
30
	"istio.io/istio/pilot/pkg/serviceregistry"
31
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
32
	"istio.io/istio/pilot/pkg/serviceregistry/util/workloadinstances"
33
	"istio.io/istio/pkg/cluster"
34
	"istio.io/istio/pkg/config"
35
	"istio.io/istio/pkg/config/constants"
36
	"istio.io/istio/pkg/config/host"
37
	"istio.io/istio/pkg/config/labels"
38
	"istio.io/istio/pkg/config/schema/gvk"
39
	"istio.io/istio/pkg/config/schema/kind"
40
	istiolog "istio.io/istio/pkg/log"
41
	"istio.io/istio/pkg/maps"
42
	"istio.io/istio/pkg/network"
43
	"istio.io/istio/pkg/queue"
44
	"istio.io/istio/pkg/slices"
45
	"istio.io/istio/pkg/util/protomarshal"
46
	"istio.io/istio/pkg/util/sets"
47
)
48

49
var (
50
	_   serviceregistry.Instance = &Controller{}
51
	log                          = istiolog.RegisterScope("serviceentry", "ServiceEntry registry")
52
)
53

54
var (
55
	prime  = 65011     // Used for secondary hash function.
56
	maxIPs = 256 * 254 // Maximum possible IPs for address allocation.
57
)
58

59
// instancesKey acts as a key to identify all instances for a given hostname/namespace pair
60
// This is mostly used as an index
61
type instancesKey struct {
62
	hostname  host.Name
63
	namespace string
64
}
65

66
type octetPair struct {
67
	thirdOctet  int
68
	fourthOctet int
69
}
70

71
func makeInstanceKey(i *model.ServiceInstance) instancesKey {
72
	return instancesKey{i.Service.Hostname, i.Service.Attributes.Namespace}
73
}
74

75
type configType int
76

77
const (
78
	serviceEntryConfigType configType = iota
79
	workloadEntryConfigType
80
	podConfigType
81
)
82

83
// configKey unique identifies a config object managed by this registry (ServiceEntry and WorkloadEntry)
84
type configKey struct {
85
	kind      configType
86
	name      string
87
	namespace string
88
}
89

90
// Controller communicates with ServiceEntry CRDs and monitors for changes.
91
type Controller struct {
92
	XdsUpdater model.XDSUpdater
93

94
	store     model.ConfigStore
95
	clusterID cluster.ID
96

97
	// This lock is to make multi ops on the below stores. For example, in some case,
98
	// it requires delete all instances and then update new ones.
99
	mutex sync.RWMutex
100

101
	serviceInstances serviceInstancesStore
102
	// NOTE: historically, one index for both WorkloadEntry(s) and Pod(s);
103
	//       beware of naming collisions
104
	workloadInstances workloadinstances.Index
105
	services          serviceStore
106

107
	// To make sure the eds update run in serial to prevent stale ones can override new ones
108
	// when edsUpdate is called concurrently.
109
	// If all share one lock, then all the threads can have an obvious performance downgrade.
110
	edsQueue queue.Instance
111

112
	workloadHandlers []func(*model.WorkloadInstance, model.Event)
113

114
	// callback function used to get the networkID according to workload ip and labels.
115
	networkIDCallback func(IP string, labels labels.Instance) network.ID
116

117
	// Indicates whether this controller is for workload entries.
118
	workloadEntryController bool
119

120
	model.NoopAmbientIndexes
121
	model.NetworkGatewaysHandler
122
}
123

124
type Option func(*Controller)
125

126
func WithClusterID(clusterID cluster.ID) Option {
127
	return func(o *Controller) {
128
		o.clusterID = clusterID
129
	}
130
}
131

132
func WithNetworkIDCb(cb func(endpointIP string, labels labels.Instance) network.ID) Option {
133
	return func(o *Controller) {
134
		o.networkIDCallback = cb
135
	}
136
}
137

138
// NewController creates a new ServiceEntry discovery service.
139
func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
140
	options ...Option,
141
) *Controller {
142
	s := newController(configController, xdsUpdater, options...)
143
	if configController != nil {
144
		configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
145
		configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
146
	}
147
	return s
148
}
149

150
// NewWorkloadEntryController creates a new WorkloadEntry discovery service.
151
func NewWorkloadEntryController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
152
	options ...Option,
153
) *Controller {
154
	s := newController(configController, xdsUpdater, options...)
155
	// Disable service entry processing for workload entry controller.
156
	s.workloadEntryController = true
157
	for _, o := range options {
158
		o(s)
159
	}
160

161
	if configController != nil {
162
		configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
163
	}
164
	return s
165
}
166

167
func newController(store model.ConfigStore, xdsUpdater model.XDSUpdater, options ...Option) *Controller {
168
	s := &Controller{
169
		XdsUpdater: xdsUpdater,
170
		store:      store,
171
		serviceInstances: serviceInstancesStore{
172
			ip2instance:            map[string][]*model.ServiceInstance{},
173
			instances:              map[instancesKey]map[configKey][]*model.ServiceInstance{},
174
			instancesBySE:          map[types.NamespacedName]map[configKey][]*model.ServiceInstance{},
175
			instancesByHostAndPort: sets.New[hostPort](),
176
		},
177
		workloadInstances: workloadinstances.NewIndex(),
178
		services: serviceStore{
179
			servicesBySE: map[types.NamespacedName][]*model.Service{},
180
		},
181
		edsQueue: queue.NewQueue(time.Second),
182
	}
183
	for _, o := range options {
184
		o(s)
185
	}
186
	return s
187
}
188

189
// ConvertServiceEntry convert se from Config.Spec.
190
func ConvertServiceEntry(cfg config.Config) *networking.ServiceEntry {
191
	se := cfg.Spec.(*networking.ServiceEntry)
192
	if se == nil {
193
		return nil
194
	}
195

196
	// shallow copy
197
	copied := &networking.ServiceEntry{}
198
	protomarshal.ShallowCopy(copied, se)
199
	return copied
200
}
201

202
// ConvertWorkloadEntry convert wle from Config.Spec and populate the metadata labels into it.
203
func ConvertWorkloadEntry(cfg config.Config) *networking.WorkloadEntry {
204
	wle := cfg.Spec.(*networking.WorkloadEntry)
205
	if wle == nil {
206
		return nil
207
	}
208

209
	// we will merge labels from metadata with spec, with precedence to the metadata
210
	labels := maps.MergeCopy(wle.Labels, cfg.Labels)
211
	// shallow copy
212
	copied := &networking.WorkloadEntry{}
213
	protomarshal.ShallowCopy(copied, wle)
214
	copied.Labels = labels
215
	return copied
216
}
217

218
// workloadEntryHandler defines the handler for workload entries
219
func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) {
220
	log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name)
221
	var oldWle *networking.WorkloadEntry
222
	if old.Spec != nil {
223
		oldWle = ConvertWorkloadEntry(old)
224
	}
225
	wle := ConvertWorkloadEntry(curr)
226
	curr.Spec = wle
227
	key := configKey{
228
		kind:      workloadEntryConfigType,
229
		name:      curr.Name,
230
		namespace: curr.Namespace,
231
	}
232

233
	// If an entry is unhealthy, we will mark this as a delete instead
234
	// This ensures we do not track unhealthy endpoints
235
	if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
236
		event = model.EventDelete
237
	}
238

239
	wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster())
240
	if wi != nil && !wi.DNSServiceEntryOnly {
241
		// fire off the k8s handlers
242
		s.NotifyWorkloadInstanceHandlers(wi, event)
243
	}
244

245
	// includes instances new updated or unchanged, in other word it is the current state.
246
	instancesUpdated := []*model.ServiceInstance{}
247
	instancesDeleted := []*model.ServiceInstance{}
248
	fullPush := false
249
	configsUpdated := sets.New[model.ConfigKey]()
250

251
	addConfigs := func(se *networking.ServiceEntry, services []*model.Service) {
252
		// If serviceentry's resolution is DNS, make a full push
253
		// TODO: maybe cds?
254
		if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
255
			fullPush = true
256
			for key, value := range getUpdatedConfigs(services) {
257
				configsUpdated[key] = value
258
			}
259
		}
260
	}
261

262
	cfgs := s.store.List(gvk.ServiceEntry, curr.Namespace)
263
	currSes := getWorkloadServiceEntries(cfgs, wle)
264
	var oldSes map[types.NamespacedName]*config.Config
265
	if oldWle != nil {
266
		if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
267
			oldSes = currSes
268
		} else {
269
			// labels update should trigger proxy update
270
			s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
271
			oldSes = getWorkloadServiceEntries(cfgs, oldWle)
272
		}
273
	}
274
	unSelected := difference(oldSes, currSes)
275
	log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected)
276
	s.mutex.Lock()
277
	for namespacedName, cfg := range currSes {
278
		services := s.services.getServices(namespacedName)
279
		se := cfg.Spec.(*networking.ServiceEntry)
280
		if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
281
			se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
282
			log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
283
			continue
284
		}
285
		instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
286
		instancesUpdated = append(instancesUpdated, instance...)
287
		if event == model.EventDelete {
288
			s.serviceInstances.deleteServiceEntryInstances(namespacedName, key)
289
		} else {
290
			s.serviceInstances.updateServiceEntryInstancesPerConfig(namespacedName, key, instance)
291
		}
292
		addConfigs(se, services)
293
	}
294

295
	for _, namespacedName := range unSelected {
296
		services := s.services.getServices(namespacedName)
297
		cfg := oldSes[namespacedName]
298
		se := cfg.Spec.(*networking.ServiceEntry)
299
		if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
300
			se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
301
			log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
302
			continue
303
		}
304
		instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
305
		instancesDeleted = append(instancesDeleted, instance...)
306
		s.serviceInstances.deleteServiceEntryInstances(namespacedName, key)
307
		addConfigs(se, services)
308
	}
309

310
	s.serviceInstances.deleteInstanceKeys(key, instancesDeleted)
311
	if event == model.EventDelete {
312
		s.workloadInstances.Delete(wi)
313
		s.serviceInstances.deleteInstanceKeys(key, instancesUpdated)
314
	} else {
315
		s.workloadInstances.Insert(wi)
316
		s.serviceInstances.updateInstances(key, instancesUpdated)
317
	}
318
	s.mutex.Unlock()
319

320
	allInstances := append(instancesUpdated, instancesDeleted...)
321
	if !fullPush {
322
		// trigger full xds push to the related sidecar proxy
323
		if event == model.EventAdd {
324
			s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
325
		}
326
		s.edsUpdate(allInstances)
327
		return
328
	}
329

330
	// update eds cache only
331
	s.edsCacheUpdate(allInstances)
332

333
	pushReq := &model.PushRequest{
334
		Full:           true,
335
		ConfigsUpdated: configsUpdated,
336
		Reason:         model.NewReasonStats(model.EndpointUpdate),
337
	}
338
	// trigger a full push
339
	s.XdsUpdater.ConfigUpdate(pushReq)
340
}
341

342
func (s *Controller) NotifyWorkloadInstanceHandlers(wi *model.WorkloadInstance, event model.Event) {
343
	for _, h := range s.workloadHandlers {
344
		h(wi, event)
345
	}
346
}
347

348
// getUpdatedConfigs returns related service entries when full push
349
func getUpdatedConfigs(services []*model.Service) sets.Set[model.ConfigKey] {
350
	configsUpdated := sets.New[model.ConfigKey]()
351
	for _, svc := range services {
352
		configsUpdated.Insert(model.ConfigKey{
353
			Kind:      kind.ServiceEntry,
354
			Name:      string(svc.Hostname),
355
			Namespace: svc.Attributes.Namespace,
356
		})
357
	}
358
	return configsUpdated
359
}
360

361
// serviceEntryHandler defines the handler for service entries
362
func (s *Controller) serviceEntryHandler(old, curr config.Config, event model.Event) {
363
	log.Debugf("Handle event %s for service entry %s/%s", event, curr.Namespace, curr.Name)
364
	currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
365
	cs := convertServices(curr)
366
	configsUpdated := sets.New[model.ConfigKey]()
367
	key := curr.NamespacedName()
368

369
	s.mutex.Lock()
370
	// If it is add/delete event we should always do a full push. If it is update event, we should do full push,
371
	// only when services have changed - otherwise, just push endpoint updates.
372
	var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service
373
	switch event {
374
	case model.EventUpdate:
375
		addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(s.services.getServices(key), cs)
376
		oldServiceEntry := old.Spec.(*networking.ServiceEntry)
377
		// Also check if target ports are changed since they are not included in `model.Service`
378
		if !slices.EqualFunc(oldServiceEntry.Ports, currentServiceEntry.Ports, func(a, b *networking.ServicePort) bool {
379
			return a.TargetPort == b.TargetPort
380
		}) {
381
			// Note: If the length of ports is changed, unchangedSvcs will be nil, this is an no-op
382
			updatedSvcs = append(updatedSvcs, unchangedSvcs...)
383
			unchangedSvcs = nil
384
		}
385
		s.services.updateServices(key, cs)
386
	case model.EventDelete:
387
		deletedSvcs = cs
388
		s.services.deleteServices(key)
389
	case model.EventAdd:
390
		addedSvcs = cs
391
		s.services.updateServices(key, cs)
392
	default:
393
		// this should not happen
394
		unchangedSvcs = cs
395
	}
396

397
	serviceInstancesByConfig, serviceInstances := s.buildServiceInstances(curr, cs)
398
	oldInstances := s.serviceInstances.getServiceEntryInstances(key)
399
	for configKey, old := range oldInstances {
400
		s.serviceInstances.deleteInstanceKeys(configKey, old)
401
	}
402
	if event == model.EventDelete {
403
		s.serviceInstances.deleteAllServiceEntryInstances(key)
404
	} else {
405
		// Update the indexes with new instances.
406
		for ckey, value := range serviceInstancesByConfig {
407
			s.serviceInstances.addInstances(ckey, value)
408
		}
409
		s.serviceInstances.updateServiceEntryInstances(key, serviceInstancesByConfig)
410
	}
411

412
	shard := model.ShardKeyFromRegistry(s)
413

414
	for _, svc := range addedSvcs {
415
		s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
416
		configsUpdated.Insert(makeConfigKey(svc))
417
	}
418

419
	for _, svc := range updatedSvcs {
420
		s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)
421
		configsUpdated.Insert(makeConfigKey(svc))
422
	}
423
	// If service entry is deleted, call SvcUpdate to cleanup endpoint shards for services.
424
	for _, svc := range deletedSvcs {
425
		instanceKey := instancesKey{namespace: svc.Attributes.Namespace, hostname: svc.Hostname}
426
		// There can be multiple service entries of same host reside in same namespace.
427
		// Delete endpoint shards only if there are no service instances.
428
		if len(s.serviceInstances.getByKey(instanceKey)) == 0 {
429
			s.XdsUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
430
		} else {
431
			// If there are some endpoints remaining for the host, add svc to updatedSvcs to trigger eds cache update
432
			updatedSvcs = append(updatedSvcs, svc)
433
		}
434
		configsUpdated.Insert(makeConfigKey(svc))
435
	}
436

437
	// If a service is updated and is not part of updatedSvcs, that means its endpoints might have changed.
438
	// If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update.
439
	// If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do
440
	// full push (as fqdn endpoints go via strict_dns clusters in cds).
441
	if len(unchangedSvcs) > 0 {
442
		if currentServiceEntry.Resolution == networking.ServiceEntry_DNS || currentServiceEntry.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
443
			for _, svc := range unchangedSvcs {
444
				configsUpdated.Insert(makeConfigKey(svc))
445
			}
446
		}
447
	}
448
	s.mutex.Unlock()
449

450
	fullPush := len(configsUpdated) > 0
451
	// if not full push needed, at least one service unchanged
452
	if !fullPush {
453
		s.edsUpdate(serviceInstances)
454
		return
455
	}
456

457
	// When doing a full push, the non DNS added, updated, unchanged services trigger an eds update
458
	// so that endpoint shards are updated.
459
	allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
460
	allServices = append(allServices, addedSvcs...)
461
	allServices = append(allServices, updatedSvcs...)
462
	allServices = append(allServices, unchangedSvcs...)
463

464
	// non dns service instances
465
	keys := sets.NewWithLength[instancesKey](len(allServices))
466
	for _, svc := range allServices {
467
		keys.Insert(instancesKey{hostname: svc.Hostname, namespace: curr.Namespace})
468
	}
469

470
	s.queueEdsEvent(keys, s.doEdsCacheUpdate)
471

472
	pushReq := &model.PushRequest{
473
		Full:           true,
474
		ConfigsUpdated: configsUpdated,
475
		Reason:         model.NewReasonStats(model.ServiceUpdate),
476
	}
477
	s.XdsUpdater.ConfigUpdate(pushReq)
478
}
479

480
// WorkloadInstanceHandler defines the handler for service instances generated by other registries
481
func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
482
	log.Debugf("Handle event %s for workload instance (%s/%s) in namespace %s", event,
483
		wi.Kind, wi.Endpoint.Address, wi.Namespace)
484
	var oldWi *model.WorkloadInstance
485
	key := configKey{
486
		kind:      podConfigType,
487
		name:      wi.Name,
488
		namespace: wi.Namespace,
489
	}
490
	// Used to indicate if this event was fired for a pod->workloadentry conversion
491
	// and that the event can be ignored due to no relevant change in the workloadentry
492
	redundantEventForPod := false
493

494
	// Used to indicate if the wi labels changed and we need to recheck all instances
495
	labelsChanged := false
496

497
	var addressToDelete string
498
	s.mutex.Lock()
499
	// this is from a pod. Store it in separate map so that
500
	// the refreshIndexes function can use these as well as the store ones.
501
	switch event {
502
	case model.EventDelete:
503
		redundantEventForPod = s.workloadInstances.Delete(wi) == nil
504
	default: // add or update
505
		if oldWi = s.workloadInstances.Insert(wi); oldWi != nil {
506
			if oldWi.Endpoint.Address != wi.Endpoint.Address {
507
				addressToDelete = oldWi.Endpoint.Address
508
			}
509
			// Check if the old labels still match the new labels. If they don't then we need to
510
			// refresh the list of instances for this wi
511
			if !oldWi.Endpoint.Labels.Equals(wi.Endpoint.Labels) {
512
				labelsChanged = true
513
			}
514
			// If multiple k8s services select the same pod or a service has multiple ports,
515
			// we may be getting multiple events ignore them as we only care about the Endpoint IP itself.
516
			if model.WorkloadInstancesEqual(oldWi, wi) {
517
				// ignore the update as nothing has changed
518
				redundantEventForPod = true
519
			}
520
		}
521
	}
522

523
	if redundantEventForPod {
524
		s.mutex.Unlock()
525
		return
526
	}
527

528
	// We will only select entries in the same namespace
529
	cfgs := s.store.List(gvk.ServiceEntry, wi.Namespace)
530
	if len(cfgs) == 0 {
531
		s.mutex.Unlock()
532
		return
533
	}
534

535
	instances := []*model.ServiceInstance{}
536
	instancesDeleted := []*model.ServiceInstance{}
537
	configsUpdated := sets.New[model.ConfigKey]()
538
	fullPush := false
539
	for _, cfg := range cfgs {
540
		se := cfg.Spec.(*networking.ServiceEntry)
541
		if se.WorkloadSelector == nil || (!labelsChanged && !labels.Instance(se.WorkloadSelector.Labels).Match(wi.Endpoint.Labels)) {
542
			// If the labels didn't change. And the new SE doesn't match then the old didn't match either and we can skip processing it.
543
			continue
544
		}
545

546
		// If we are here, then there are 3 possible cases :
547
		// Case 1 : The new wi is a subset of se
548
		// Case 2 : The labelsChanged and the new wi is still a subset of se
549
		// Case 3 : The labelsChanged and the new wi is NOT a subset of se anymore
550

551
		seNamespacedName := cfg.NamespacedName()
552
		services := s.services.getServices(seNamespacedName)
553
		currInstance := convertWorkloadInstanceToServiceInstance(wi, services, se)
554

555
		// We check if the wi is still a subset of se. This would cover Case 1 and Case 2 from above.
556
		if labels.Instance(se.WorkloadSelector.Labels).Match(wi.Endpoint.Labels) {
557
			// If the workload instance still matches. We take care of the possible events.
558
			instances = append(instances, currInstance...)
559
			if addressToDelete != "" {
560
				for _, i := range currInstance {
561
					di := i.DeepCopy()
562
					di.Endpoint.Address = addressToDelete
563
					instancesDeleted = append(instancesDeleted, di)
564
				}
565
				s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
566
			} else if event == model.EventDelete {
567
				s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
568
			} else {
569
				s.serviceInstances.updateServiceEntryInstancesPerConfig(seNamespacedName, key, currInstance)
570
			}
571
			// If serviceentry's resolution is DNS, make a full push
572
			// TODO: maybe cds?
573
			if (se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN) &&
574
				se.WorkloadSelector != nil {
575

576
				fullPush = true
577
				for _, inst := range currInstance {
578
					configsUpdated[model.ConfigKey{
579
						Kind:      kind.ServiceEntry,
580
						Name:      string(inst.Service.Hostname),
581
						Namespace: cfg.Namespace,
582
					}] = struct{}{}
583
				}
584
			}
585
		} else if labels.Instance(se.WorkloadSelector.Labels).Match(oldWi.Endpoint.Labels) {
586
			// If we're here, it means that the labels changed and the new labels don't match the SE anymore (Case 3 from above) and the oldWi did
587
			// match the SE.
588
			// Since the instance doesn't match the SE anymore. We remove it from the list.
589
			oldInstance := convertWorkloadInstanceToServiceInstance(oldWi, services, se)
590
			instancesDeleted = append(instancesDeleted, oldInstance...)
591
			s.serviceInstances.deleteServiceEntryInstances(seNamespacedName, key)
592
		}
593
	}
594

595
	if len(instancesDeleted) > 0 {
596
		s.serviceInstances.deleteInstanceKeys(key, instancesDeleted)
597
	}
598

599
	if event == model.EventDelete {
600
		s.serviceInstances.deleteInstanceKeys(key, instances)
601
	} else {
602
		s.serviceInstances.updateInstances(key, instances)
603
	}
604
	s.mutex.Unlock()
605

606
	s.edsUpdate(append(instances, instancesDeleted...))
607

608
	// ServiceEntry with WorkloadEntry results in STRICT_DNS cluster with hardcoded endpoints
609
	// need to update CDS to refresh endpoints
610
	// https://github.com/istio/istio/issues/39505
611
	if fullPush {
612
		log.Debugf("Full push triggered during event %s for workload instance (%s/%s) in namespace %s", event,
613
			wi.Kind, wi.Endpoint.Address, wi.Namespace)
614
		pushReq := &model.PushRequest{
615
			Full:           true,
616
			ConfigsUpdated: configsUpdated,
617
			Reason:         model.NewReasonStats(model.EndpointUpdate),
618
		}
619
		s.XdsUpdater.ConfigUpdate(pushReq)
620
	}
621
}
622

623
func (s *Controller) Provider() provider.ID {
624
	return provider.External
625
}
626

627
func (s *Controller) Cluster() cluster.ID {
628
	return s.clusterID
629
}
630

631
// AppendServiceHandler adds service resource event handler. Service Entries does not use these handlers.
632
func (s *Controller) AppendServiceHandler(_ model.ServiceHandler) {}
633

634
// AppendWorkloadHandler adds instance event handler. Service Entries does not use these handlers.
635
func (s *Controller) AppendWorkloadHandler(h func(*model.WorkloadInstance, model.Event)) {
636
	s.workloadHandlers = append(s.workloadHandlers, h)
637
}
638

639
// Run is used by some controllers to execute background jobs after init is done.
640
func (s *Controller) Run(stopCh <-chan struct{}) {
641
	s.edsQueue.Run(stopCh)
642
}
643

644
// HasSynced always returns true for SE
645
func (s *Controller) HasSynced() bool {
646
	return true
647
}
648

649
// Services list declarations of all services in the system
650
func (s *Controller) Services() []*model.Service {
651
	s.mutex.Lock()
652
	allServices := s.services.getAllServices()
653
	out := make([]*model.Service, 0, len(allServices))
654
	if s.services.allocateNeeded {
655
		autoAllocateIPs(allServices)
656
		s.services.allocateNeeded = false
657
	}
658
	s.mutex.Unlock()
659
	for _, svc := range allServices {
660
		// shallow copy, copy `AutoAllocatedIPv4Address` and `AutoAllocatedIPv6Address`
661
		// if return the pointer directly, there will be a race with `BuildNameTable`
662
		// nolint: govet
663
		shallowSvc := *svc
664
		out = append(out, &shallowSvc)
665
	}
666
	return out
667
}
668

669
// GetService retrieves a service by host name if it exists.
670
// NOTE: The service entry implementation is used only for tests.
671
func (s *Controller) GetService(hostname host.Name) *model.Service {
672
	if s.workloadEntryController {
673
		return nil
674
	}
675
	// TODO(@hzxuzhonghu): only get the specific service instead of converting all the serviceEntries
676
	services := s.Services()
677
	for _, service := range services {
678
		if service.Hostname == hostname {
679
			return service
680
		}
681
	}
682

683
	return nil
684
}
685

686
// ResyncEDS will do a full EDS update. This is needed for some tests where we have many configs loaded without calling
687
// the config handlers.
688
// This should probably not be used in production code.
689
func (s *Controller) ResyncEDS() {
690
	s.mutex.RLock()
691
	allInstances := s.serviceInstances.getAll()
692
	s.mutex.RUnlock()
693
	s.edsUpdate(allInstances)
694
	// HACK to workaround Service syncing after WorkloadEntry: https://github.com/istio/istio/issues/45114
695
	s.workloadInstances.ForEach(func(wi *model.WorkloadInstance) {
696
		s.NotifyWorkloadInstanceHandlers(wi, model.EventAdd)
697
	})
698
}
699

700
// edsUpdate triggers an EDS push serially such that we can prevent all instances
701
// got at t1 can accidentally override that got at t2 if multiple threads are
702
// running this function. Queueing ensures latest updated wins.
703
func (s *Controller) edsUpdate(instances []*model.ServiceInstance) {
704
	// Find all keys we need to lookup
705
	keys := map[instancesKey]struct{}{}
706
	for _, i := range instances {
707
		keys[makeInstanceKey(i)] = struct{}{}
708
	}
709
	s.queueEdsEvent(keys, s.doEdsUpdate)
710
}
711

712
// edsCacheUpdate updates eds cache serially such that we can prevent allinstances
713
// got at t1 can accidentally override that got at t2 if multiple threads are
714
// running this function. Queueing ensures latest updated wins.
715
func (s *Controller) edsCacheUpdate(instances []*model.ServiceInstance) {
716
	// Find all keys we need to lookup
717
	keys := map[instancesKey]struct{}{}
718
	for _, i := range instances {
719
		keys[makeInstanceKey(i)] = struct{}{}
720
	}
721
	s.queueEdsEvent(keys, s.doEdsCacheUpdate)
722
}
723

724
// queueEdsEvent processes eds events sequentially for the passed keys and invokes the passed function.
725
func (s *Controller) queueEdsEvent(keys sets.Set[instancesKey], edsFn func(keys sets.Set[instancesKey])) {
726
	// wait for the cache update finished
727
	waitCh := make(chan struct{})
728
	// trigger update eds endpoint shards
729
	s.edsQueue.Push(func() error {
730
		defer close(waitCh)
731
		edsFn(keys)
732
		return nil
733
	})
734
	select {
735
	case <-waitCh:
736
		return
737
	// To prevent goroutine leak in tests
738
	// in case the queue is stopped but the task has not been executed..
739
	case <-s.edsQueue.Closed():
740
		return
741
	}
742
}
743

744
// doEdsCacheUpdate invokes XdsUpdater's EDSCacheUpdate to update endpoint shards.
745
func (s *Controller) doEdsCacheUpdate(keys sets.Set[instancesKey]) {
746
	endpoints := s.buildEndpoints(keys)
747
	shard := model.ShardKeyFromRegistry(s)
748
	// This is delete.
749
	if len(endpoints) == 0 {
750
		for k := range keys {
751
			s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, nil)
752
		}
753
	} else {
754
		for k, eps := range endpoints {
755
			s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, eps)
756
		}
757
	}
758
}
759

760
// doEdsUpdate invokes XdsUpdater's eds update to trigger eds push.
761
func (s *Controller) doEdsUpdate(keys sets.Set[instancesKey]) {
762
	endpoints := s.buildEndpoints(keys)
763
	shard := model.ShardKeyFromRegistry(s)
764
	// This is delete.
765
	if len(endpoints) == 0 {
766
		for k := range keys {
767
			s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, nil)
768
		}
769
	} else {
770
		for k, eps := range endpoints {
771
			s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, eps)
772
		}
773
	}
774
}
775

776
// buildEndpoints builds endpoints for the instance keys.
777
func (s *Controller) buildEndpoints(keys map[instancesKey]struct{}) map[instancesKey][]*model.IstioEndpoint {
778
	var endpoints map[instancesKey][]*model.IstioEndpoint
779
	allInstances := []*model.ServiceInstance{}
780
	s.mutex.RLock()
781
	for key := range keys {
782
		i := s.serviceInstances.getByKey(key)
783
		allInstances = append(allInstances, i...)
784
	}
785
	s.mutex.RUnlock()
786

787
	if len(allInstances) > 0 {
788
		endpoints = make(map[instancesKey][]*model.IstioEndpoint)
789
		for _, instance := range allInstances {
790
			key := makeInstanceKey(instance)
791
			endpoints[key] = append(endpoints[key], instance.Endpoint)
792
		}
793

794
	}
795
	return endpoints
796
}
797

798
// GetProxyServiceTargets lists service targets co-located with a given proxy
799
// NOTE: The service objects in these instances do not have the auto allocated IP set.
800
func (s *Controller) GetProxyServiceTargets(node *model.Proxy) []model.ServiceTarget {
801
	out := make([]model.ServiceTarget, 0)
802
	s.mutex.RLock()
803
	defer s.mutex.RUnlock()
804
	for _, ip := range node.IPAddresses {
805
		instances := s.serviceInstances.getByIP(ip)
806
		for _, i := range instances {
807
			// Insert all instances for this IP for services within the same namespace. This ensures we
808
			// match Kubernetes logic where Services do not cross namespace boundaries and avoids
809
			// possibility of other namespaces inserting service instances into namespaces they do not
810
			// control.
811
			if node.Metadata.Namespace == "" || i.Service.Attributes.Namespace == node.Metadata.Namespace {
812
				out = append(out, model.ServiceInstanceToTarget(i))
813
			}
814
		}
815
	}
816
	return out
817
}
818

819
func (s *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
820
	s.mutex.RLock()
821
	defer s.mutex.RUnlock()
822
	for _, ip := range proxy.IPAddresses {
823
		instances := s.serviceInstances.getByIP(ip)
824
		for _, i := range instances {
825
			// Insert first instances for this IP for services within the same namespace. This ensures we
826
			// match Kubernetes logic where Services do not cross namespace boundaries and avoids
827
			// possibility of other namespaces inserting service instances into namespaces they do not
828
			// control.
829
			// All instances should have the same labels so we just return the first
830
			if proxy.Metadata.Namespace == "" || i.Service.Attributes.Namespace == proxy.Metadata.Namespace {
831
				return i.Endpoint.Labels
832
			}
833
		}
834
	}
835
	return nil
836
}
837

838
func (s *Controller) NetworkGateways() []model.NetworkGateway {
839
	// TODO implement mesh networks loading logic from kube controller if needed
840
	return nil
841
}
842

843
func (s *Controller) MCSServices() []model.MCSServiceInfo {
844
	return nil
845
}
846

847
func servicesDiff(os []*model.Service, ns []*model.Service) ([]*model.Service, []*model.Service, []*model.Service, []*model.Service) {
848
	var added, deleted, updated, unchanged []*model.Service
849

850
	oldServiceHosts := make(map[host.Name]*model.Service, len(os))
851
	for _, s := range os {
852
		oldServiceHosts[s.Hostname] = s
853
	}
854

855
	for _, s := range ns {
856
		oldSvc, ok := oldServiceHosts[s.Hostname]
857
		if ok && s.Equals(oldSvc) {
858
			unchanged = append(unchanged, s)
859
		} else if ok {
860
			updated = append(updated, s)
861
		} else {
862
			added = append(added, s)
863
		}
864
		delete(oldServiceHosts, s.Hostname)
865
	}
866
	deleted = maps.Values(oldServiceHosts)
867

868
	return added, deleted, updated, unchanged
869
}
870

871
// Automatically allocates IPs for service entry services WITHOUT an
872
// address field if the hostname is not a wildcard, or when resolution
873
// is not NONE. The IPs are allocated from the reserved Class E subnet
874
// (240.240.0.0/16) that is not reachable outside the pod or reserved
875
// Benchmarking IP range (2001:2::/48) in RFC5180. When DNS
876
// capture is enabled, Envoy will resolve the DNS to these IPs. The
877
// listeners for TCP services will also be set up on these IPs. The
878
// IPs allocated to a service entry may differ from istiod to istiod
879
// but it does not matter because these IPs only affect the listener
880
// IPs on a given proxy managed by a given istiod.
881
//
882
// NOTE: If DNS capture is not enabled by the proxy, the automatically
883
// allocated IP addresses do not take effect.
884
//
885
// The current algorithm to allocate IPs is deterministic across all istiods.
886
func autoAllocateIPs(services []*model.Service) []*model.Service {
887
	hashedServices := make([]*model.Service, maxIPs)
888
	hash := fnv.New32a()
889
	// First iterate through the range of services and determine its position by hash
890
	// so that we can deterministically allocate an IP.
891
	// We use "Double Hashning" for collision detection.
892
	// The hash algorithm is
893
	// - h1(k) = Sum32 hash of the service key (namespace + "/" + hostname)
894
	// - Check if we have an empty slot for h1(x) % MAXIPS. Use it if available.
895
	// - If there is a collision, apply second hash i.e. h2(x) = PRIME - (Key % PRIME)
896
	//   where PRIME is the max prime number below MAXIPS.
897
	// - Calculate new hash iteratively till we find an empty slot with (h1(k) + i*h2(k)) % MAXIPS
898
	j := 0
899
	for _, svc := range services {
900
		// we can allocate IPs only if
901
		// 1. the service has resolution set to static/dns. We cannot allocate
902
		//   for NONE because we will not know the original DST IP that the application requested.
903
		// 2. the address is not set (0.0.0.0)
904
		// 3. the hostname is not a wildcard
905
		if svc.DefaultAddress == constants.UnspecifiedIP && !svc.Hostname.IsWildCarded() &&
906
			svc.Resolution != model.Passthrough {
907
			if j >= maxIPs {
908
				log.Errorf("out of IPs to allocate for service entries. maxips:= %d", maxIPs)
909
				break
910
			}
911
			// First hash is calculated by hashing the service key i.e. (namespace + "/" + hostname).
912
			hash.Write([]byte(makeServiceKey(svc)))
913
			s := hash.Sum32()
914
			firstHash := s % uint32(maxIPs)
915
			// Check if there is a service with this hash first. If there is no service
916
			// at this location - then we can safely assign this position for this service.
917
			if hashedServices[firstHash] == nil {
918
				hashedServices[firstHash] = svc
919
			} else {
920
				// This means we have a collision. Resolve collision by "DoubleHashing".
921
				i := uint32(1)
922
				secondHash := uint32(prime) - (s % uint32(prime))
923
				for {
924
					nh := (s + i*secondHash) % uint32(maxIPs-1)
925
					if hashedServices[nh] == nil {
926
						hashedServices[nh] = svc
927
						break
928
					}
929
					i++
930
				}
931
			}
932
			hash.Reset()
933
			j++
934
		}
935
	}
936

937
	x := 0
938
	hnMap := make(map[string]octetPair)
939
	for _, svc := range hashedServices {
940
		x++
941
		if svc == nil {
942
			// There is no service in the slot. Just increment x and move forward.
943
			continue
944
		}
945
		n := makeServiceKey(svc)
946
		// To avoid allocating 240.240.(i).255, if X % 255 is 0, increment X.
947
		// For example, when X=510, the resulting IP would be 240.240.2.0 (invalid)
948
		// So we bump X to 511, so that the resulting IP is 240.240.2.1
949
		if x%255 == 0 {
950
			x++
951
		}
952
		if v, ok := hnMap[n]; ok {
953
			log.Debugf("Reuse IP for domain %s", n)
954
			setAutoAllocatedIPs(svc, v)
955
		} else {
956
			var thirdOctect, fourthOctect int
957
			thirdOctect = x / 255
958
			fourthOctect = x % 255
959
			pair := octetPair{thirdOctect, fourthOctect}
960
			setAutoAllocatedIPs(svc, pair)
961
			hnMap[n] = pair
962
		}
963
	}
964
	return services
965
}
966

967
func makeServiceKey(svc *model.Service) string {
968
	return svc.Attributes.Namespace + "/" + svc.Hostname.String()
969
}
970

971
func setAutoAllocatedIPs(svc *model.Service, octets octetPair) {
972
	a := octets.thirdOctet
973
	b := octets.fourthOctet
974
	svc.AutoAllocatedIPv4Address = fmt.Sprintf("240.240.%d.%d", a, b)
975
	if a == 0 {
976
		svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x", b)
977
	} else {
978
		svc.AutoAllocatedIPv6Address = fmt.Sprintf("2001:2::f0f0:%x%x", a, b)
979
	}
980
}
981

982
func makeConfigKey(svc *model.Service) model.ConfigKey {
983
	return model.ConfigKey{
984
		Kind:      kind.ServiceEntry,
985
		Name:      string(svc.Hostname),
986
		Namespace: svc.Attributes.Namespace,
987
	}
988
}
989

990
// isHealthy checks that the provided WorkloadEntry is healthy. If health checks are not enabled,
991
// it is assumed to always be healthy
992
func isHealthy(cfg config.Config) bool {
993
	if parseHealthAnnotation(cfg.Annotations[status.WorkloadEntryHealthCheckAnnotation]) {
994
		// We default to false if the condition is not set. This ensures newly created WorkloadEntries
995
		// are treated as unhealthy until we prove they are healthy by probe success.
996
		return status.GetBoolConditionFromSpec(cfg, status.ConditionHealthy, false)
997
	}
998
	// If health check is not enabled, assume its healthy
999
	return true
1000
}
1001

1002
func parseHealthAnnotation(s string) bool {
1003
	if s == "" {
1004
		return false
1005
	}
1006
	p, err := strconv.ParseBool(s)
1007
	if err != nil {
1008
		return false
1009
	}
1010
	return p
1011
}
1012

1013
func (s *Controller) buildServiceInstances(
1014
	curr config.Config,
1015
	services []*model.Service,
1016
) (map[configKey][]*model.ServiceInstance, []*model.ServiceInstance) {
1017
	currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
1018
	var serviceInstances []*model.ServiceInstance
1019
	serviceInstancesByConfig := map[configKey][]*model.ServiceInstance{}
1020
	// for service entry with labels
1021
	if currentServiceEntry.WorkloadSelector != nil {
1022
		selector := workloadinstances.ByServiceSelector(curr.Namespace, currentServiceEntry.WorkloadSelector.Labels)
1023
		workloadInstances := workloadinstances.FindAllInIndex(s.workloadInstances, selector)
1024
		for _, wi := range workloadInstances {
1025
			if wi.DNSServiceEntryOnly && currentServiceEntry.Resolution != networking.ServiceEntry_DNS &&
1026
				currentServiceEntry.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
1027
				log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name,
1028
					currentServiceEntry.Hosts)
1029
				continue
1030
			}
1031
			instances := convertWorkloadInstanceToServiceInstance(wi, services, currentServiceEntry)
1032
			serviceInstances = append(serviceInstances, instances...)
1033
			ckey := configKey{namespace: wi.Namespace, name: wi.Name}
1034
			if wi.Kind == model.PodKind {
1035
				ckey.kind = podConfigType
1036
			} else {
1037
				ckey.kind = workloadEntryConfigType
1038
			}
1039
			serviceInstancesByConfig[ckey] = instances
1040
		}
1041
	} else {
1042
		serviceInstances = s.convertServiceEntryToInstances(curr, services)
1043
		ckey := configKey{
1044
			kind:      serviceEntryConfigType,
1045
			name:      curr.Name,
1046
			namespace: curr.Namespace,
1047
		}
1048
		serviceInstancesByConfig[ckey] = serviceInstances
1049
	}
1050

1051
	return serviceInstancesByConfig, serviceInstances
1052
}
1053

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.