istio

Форк
0
487 строк · 16.9 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"sync"
19

20
	"github.com/hashicorp/go-multierror"
21
	corev1 "k8s.io/api/core/v1"
22
	v1 "k8s.io/api/discovery/v1"
23
	"k8s.io/api/discovery/v1beta1"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	klabels "k8s.io/apimachinery/pkg/labels"
26
	"k8s.io/apimachinery/pkg/selection"
27
	"k8s.io/apimachinery/pkg/types"
28
	mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
29

30
	"istio.io/istio/pilot/pkg/features"
31
	"istio.io/istio/pilot/pkg/model"
32
	"istio.io/istio/pkg/config"
33
	"istio.io/istio/pkg/config/host"
34
	"istio.io/istio/pkg/config/schema/kind"
35
	"istio.io/istio/pkg/config/visibility"
36
	"istio.io/istio/pkg/kube/kclient"
37
	"istio.io/istio/pkg/util/sets"
38
)
39

40
type endpointSliceController struct {
41
	endpointCache *endpointSliceCache
42
	slices        kclient.Client[*v1.EndpointSlice]
43
	c             *Controller
44
}
45

46
var (
47
	endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, selection.DoesNotExist, nil)
48
	endpointSliceSelector    = klabels.NewSelector().Add(*endpointSliceRequirement)
49
)
50

51
func newEndpointSliceController(c *Controller) *endpointSliceController {
52
	slices := kclient.NewFiltered[*v1.EndpointSlice](c.client, kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
53
	out := &endpointSliceController{
54
		c:             c,
55
		slices:        slices,
56
		endpointCache: newEndpointSliceCache(),
57
	}
58
	registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice", out.onEvent, nil)
59
	return out
60
}
61

62
func (esc *endpointSliceController) podArrived(name, ns string) error {
63
	ep := esc.slices.Get(name, ns)
64
	if ep == nil {
65
		return nil
66
	}
67
	return esc.onEvent(nil, ep, model.EventAdd)
68
}
69

70
// initializeNamespace initializes endpoints for a given namespace.
71
func (esc *endpointSliceController) initializeNamespace(ns string, filtered bool) error {
72
	var err *multierror.Error
73
	var endpoints []*v1.EndpointSlice
74
	if filtered {
75
		endpoints = esc.slices.List(ns, klabels.Everything())
76
	} else {
77
		endpoints = esc.slices.ListUnfiltered(ns, klabels.Everything())
78
	}
79
	log.Debugf("initializing %d endpointslices", len(endpoints))
80
	for _, s := range endpoints {
81
		err = multierror.Append(err, esc.onEvent(nil, s, model.EventAdd))
82
	}
83
	return err.ErrorOrNil()
84
}
85

86
func (esc *endpointSliceController) onEvent(_, ep *v1.EndpointSlice, event model.Event) error {
87
	esc.onEventInternal(nil, ep, event)
88
	return nil
89
}
90

91
func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, event model.Event) {
92
	esLabels := ep.GetLabels()
93
	if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
94
		return
95
	}
96
	// Update internal endpoint cache no matter what kind of service, even headless service.
97
	// As for gateways, the cluster discovery type is `EDS` for headless service.
98
	namespacedName := getServiceNamespacedName(ep)
99
	log.Debugf("Handle EDS endpoint %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
100
	if event == model.EventDelete {
101
		esc.deleteEndpointSlice(ep)
102
	} else {
103
		esc.updateEndpointSlice(ep)
104
	}
105
	hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
106
	// Trigger EDS push for all hostnames.
107
	esc.pushEDS(hostnames, namespacedName.Namespace)
108

109
	name := serviceNameForEndpointSlice(esLabels)
110
	namespace := ep.GetNamespace()
111
	svc := esc.c.services.Get(name, namespace)
112
	if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || svc.Spec.Type == corev1.ServiceTypeExternalName {
113
		return
114
	}
115

116
	configs := []types.NamespacedName{}
117
	pureHTTP := true
118
	for _, modelSvc := range esc.c.servicesForNamespacedName(config.NamespacedName(svc)) {
119
		// skip push if it is not exported
120
		if modelSvc.Attributes.ExportTo.Contains(visibility.None) {
121
			continue
122
		}
123

124
		configs = append(configs, types.NamespacedName{Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
125

126
		for _, p := range modelSvc.Ports {
127
			if !p.Protocol.IsHTTP() {
128
				pureHTTP = false
129
				break
130
			}
131
		}
132
	}
133

134
	configsUpdated := sets.New[model.ConfigKey]()
135
	for _, config := range configs {
136
		if !pureHTTP {
137
			configsUpdated.Insert(model.ConfigKey{Kind: kind.ServiceEntry, Name: config.Name, Namespace: config.Namespace})
138
		} else {
139
			// pure HTTP headless services should not need a full push since they do not
140
			// require a Listener based on IP: https://github.com/istio/istio/issues/48207
141
			configsUpdated.Insert(model.ConfigKey{Kind: kind.DNSName, Name: config.Name, Namespace: config.Namespace})
142
		}
143
	}
144

145
	if len(configsUpdated) > 0 {
146
		// For headless services, trigger a full push.
147
		// If EnableHeadlessService is true and svc ports are not pure HTTP, we need to regenerate listeners per endpoint.
148
		// Otherwise we only need to push NDS, but still need to set full but we skip all other xDS except NDS during the push.
149
		esc.c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
150
			Full:           true,
151
			ConfigsUpdated: configsUpdated,
152
			Reason:         model.NewReasonStats(model.HeadlessEndpointUpdate),
153
		})
154
	}
155
}
156

157
// GetProxyServiceTargets returns service instances co-located with a given proxy
158
// TODO: this code does not return k8s service instances when the proxy's IP is a workload entry
159
// To tackle this, we need a ip2instance map like what we have in service entry.
160
func (esc *endpointSliceController) GetProxyServiceTargets(proxy *model.Proxy) []model.ServiceTarget {
161
	eps := esc.slices.List(proxy.Metadata.Namespace, endpointSliceSelector)
162
	var out []model.ServiceTarget
163
	for _, ep := range eps {
164
		instances := esc.serviceTargets(ep, proxy)
165
		out = append(out, instances...)
166
	}
167

168
	return out
169
}
170

171
func serviceNameForEndpointSlice(labels map[string]string) string {
172
	return labels[v1.LabelServiceName]
173
}
174

175
func (esc *endpointSliceController) serviceTargets(ep *v1.EndpointSlice, proxy *model.Proxy) []model.ServiceTarget {
176
	var out []model.ServiceTarget
177
	esc.endpointCache.mu.RLock()
178
	defer esc.endpointCache.mu.RUnlock()
179
	for _, svc := range esc.c.servicesForNamespacedName(getServiceNamespacedName(ep)) {
180
		for _, instance := range esc.endpointCache.get(svc.Hostname) {
181
			port, f := svc.Ports.Get(instance.ServicePortName)
182
			if !f {
183
				log.Warnf("unexpected state, svc %v missing port %v", svc.Hostname, instance.ServicePortName)
184
				continue
185
			}
186
			// consider multiple IP scenarios
187
			for _, ip := range proxy.IPAddresses {
188
				if ip != instance.Address {
189
					continue
190
				}
191
				// If the endpoint isn't ready, report this
192
				if instance.HealthStatus == model.UnHealthy && esc.c.opts.Metrics != nil {
193
					esc.c.opts.Metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy.ID, "")
194
				}
195
				si := model.ServiceTarget{
196
					Service: svc,
197
					Port: model.ServiceInstancePort{
198
						ServicePort: port,
199
						TargetPort:  instance.EndpointPort,
200
					},
201
				}
202
				out = append(out, si)
203
			}
204
		}
205
	}
206
	return out
207
}
208

209
func (esc *endpointSliceController) deleteEndpointSlice(slice *v1.EndpointSlice) {
210
	key := config.NamespacedName(slice)
211
	for _, e := range slice.Endpoints {
212
		for _, a := range e.Addresses {
213
			esc.c.pods.endpointDeleted(key, a)
214
		}
215
	}
216

217
	esc.endpointCache.mu.Lock()
218
	defer esc.endpointCache.mu.Unlock()
219
	for _, hostName := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
220
		// endpointSlice cache update
221
		if esc.endpointCache.has(hostName) {
222
			esc.endpointCache.delete(hostName, slice.Name)
223
		}
224
	}
225
}
226

227
func (esc *endpointSliceController) updateEndpointSlice(slice *v1.EndpointSlice) {
228
	for _, hostname := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
229
		esc.updateEndpointCacheForSlice(hostname, slice)
230
	}
231
}
232

233
func endpointHealthStatus(svc *model.Service, e v1.Endpoint) model.HealthStatus {
234
	if e.Conditions.Ready == nil || *e.Conditions.Ready {
235
		return model.Healthy
236
	}
237

238
	if features.PersistentSessionLabel != "" &&
239
		svc != nil &&
240
		svc.Attributes.Labels[features.PersistentSessionLabel] != "" &&
241
		(e.Conditions.Serving == nil || *e.Conditions.Serving) &&
242
		(e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
243
		return model.Draining
244
	}
245

246
	return model.UnHealthy
247
}
248

249
func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Name, slice *v1.EndpointSlice) {
250
	var endpoints []*model.IstioEndpoint
251
	if slice.AddressType == v1.AddressTypeFQDN {
252
		// TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice
253
		return
254
	}
255
	svc := esc.c.GetService(hostName)
256
	discoverabilityPolicy := esc.c.exports.EndpointDiscoverabilityPolicy(svc)
257

258
	for _, e := range slice.Endpoints {
259
		// Draining tracking is only enabled if persistent sessions is enabled.
260
		// If we start using them for other features, this can be adjusted.
261
		healthStatus := endpointHealthStatus(svc, e)
262
		for _, a := range e.Addresses {
263
			pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostName)
264
			if pod == nil && expectedPod {
265
				continue
266
			}
267
			builder := NewEndpointBuilder(esc.c, pod)
268
			// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
269
			for _, port := range slice.Ports {
270
				var portNum int32
271
				if port.Port != nil {
272
					portNum = *port.Port
273
				}
274
				var portName string
275
				if port.Name != nil {
276
					portName = *port.Name
277
				}
278

279
				istioEndpoint := builder.buildIstioEndpoint(a, portNum, portName, discoverabilityPolicy, healthStatus)
280
				endpoints = append(endpoints, istioEndpoint)
281
			}
282
		}
283
	}
284
	esc.endpointCache.Update(hostName, slice.Name, endpoints)
285
}
286

287
func (esc *endpointSliceController) buildIstioEndpointsWithService(name, namespace string, hostName host.Name, updateCache bool) []*model.IstioEndpoint {
288
	esLabelSelector := endpointSliceSelectorForService(name)
289
	slices := esc.slices.List(namespace, esLabelSelector)
290
	if len(slices) == 0 {
291
		log.Debugf("endpoint slices of (%s, %s) not found", name, namespace)
292
		return nil
293
	}
294

295
	if updateCache {
296
		// A cache update was requested. Rebuild the endpoints for these slices.
297
		for _, slice := range slices {
298
			esc.updateEndpointCacheForSlice(hostName, slice)
299
		}
300
	}
301

302
	return esc.endpointCache.Get(hostName)
303
}
304

305
func getServiceNamespacedName(slice *v1.EndpointSlice) types.NamespacedName {
306
	return types.NamespacedName{
307
		Namespace: slice.GetNamespace(),
308
		Name:      serviceNameForEndpointSlice(slice.GetLabels()),
309
	}
310
}
311

312
// endpointKey unique identifies an endpoint by IP and port name
313
// This is used for deduping endpoints across slices.
314
type endpointKey struct {
315
	ip   string
316
	port string
317
}
318

319
type endpointSliceCache struct {
320
	mu                         sync.RWMutex
321
	endpointsByServiceAndSlice map[host.Name]map[string][]*model.IstioEndpoint
322
}
323

324
func newEndpointSliceCache() *endpointSliceCache {
325
	out := &endpointSliceCache{
326
		endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.IstioEndpoint),
327
	}
328
	return out
329
}
330

331
func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
332
	e.mu.Lock()
333
	defer e.mu.Unlock()
334
	e.update(hostname, slice, endpoints)
335
}
336

337
func (e *endpointSliceCache) update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
338
	if len(endpoints) == 0 {
339
		delete(e.endpointsByServiceAndSlice[hostname], slice)
340
	}
341
	if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
342
		e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.IstioEndpoint)
343
	}
344
	// We will always overwrite. A conflict here means an endpoint is transitioning
345
	// from one slice to another See
346
	// https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
347
	// In this case, we can always assume and update is fresh, although older slices
348
	// we have not gotten updates may be stale; therefore we always take the new
349
	// update.
350
	e.endpointsByServiceAndSlice[hostname][slice] = endpoints
351
}
352

353
func (e *endpointSliceCache) Delete(hostname host.Name, slice string) {
354
	e.mu.Lock()
355
	defer e.mu.Unlock()
356
	e.delete(hostname, slice)
357
}
358

359
func (e *endpointSliceCache) delete(hostname host.Name, slice string) {
360
	delete(e.endpointsByServiceAndSlice[hostname], slice)
361
	if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
362
		delete(e.endpointsByServiceAndSlice, hostname)
363
	}
364
}
365

366
func (e *endpointSliceCache) Get(hostname host.Name) []*model.IstioEndpoint {
367
	e.mu.RLock()
368
	defer e.mu.RUnlock()
369
	return e.get(hostname)
370
}
371

372
func (e *endpointSliceCache) get(hostname host.Name) []*model.IstioEndpoint {
373
	var endpoints []*model.IstioEndpoint
374
	found := sets.New[endpointKey]()
375
	for _, eps := range e.endpointsByServiceAndSlice[hostname] {
376
		for _, ep := range eps {
377
			key := endpointKey{ep.Address, ep.ServicePortName}
378
			if found.InsertContains(key) {
379
				// This a duplicate. Update() already handles conflict resolution, so we don't
380
				// need to pick the "right" one here.
381
				continue
382
			}
383
			endpoints = append(endpoints, ep)
384
		}
385
	}
386
	return endpoints
387
}
388

389
func (e *endpointSliceCache) Has(hostname host.Name) bool {
390
	e.mu.RLock()
391
	defer e.mu.RUnlock()
392
	return e.has(hostname)
393
}
394

395
func (e *endpointSliceCache) has(hostname host.Name) bool {
396
	_, found := e.endpointsByServiceAndSlice[hostname]
397
	return found
398
}
399

400
func endpointSliceSelectorForService(name string) klabels.Selector {
401
	return klabels.Set(map[string]string{
402
		v1beta1.LabelServiceName: name,
403
	}).AsSelectorPreValidated().Add(*endpointSliceRequirement)
404
}
405

406
func (esc *endpointSliceController) pushEDS(hostnames []host.Name, namespace string) {
407
	shard := model.ShardKeyFromRegistry(esc.c)
408
	// Even though we just read from the cache, we need the full lock to ensure pushEDS
409
	// runs sequentially when `EnableK8SServiceSelectWorkloadEntries` is enabled. Otherwise,
410
	// we may end up with eds updates can go out of order with workload entry updates causing
411
	// incorrect endpoints. For regular endpoint updates, pushEDS is already serialized
412
	// because the events are queued.
413
	esc.endpointCache.mu.Lock()
414
	defer esc.endpointCache.mu.Unlock()
415
	for _, hostname := range hostnames {
416
		endpoints := esc.endpointCache.get(hostname)
417
		if features.EnableK8SServiceSelectWorkloadEntries {
418
			svc := esc.c.GetService(hostname)
419
			if svc != nil {
420
				fep := esc.c.collectWorkloadInstanceEndpoints(svc)
421
				endpoints = append(endpoints, fep...)
422
			} else {
423
				log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/ has not been populated",
424
					hostname)
425
			}
426
		}
427

428
		esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname), namespace, endpoints)
429
	}
430
}
431

432
// getPod fetches a pod by name or IP address.
433
// A pod may be missing (nil) for two reasons:
434
//   - It is an endpoint without an associated Pod. In this case, expectPod will be false.
435
//   - It is an endpoint with an associate Pod, but its not found. In this case, expectPod will be true.
436
//     this may happen due to eventually consistency issues, out of order events, etc. In this case, the caller
437
//     should not precede with the endpoint, or inaccurate information would be sent which may have impacts on
438
//     correctness and security.
439
//
440
// Note: this is only used by endpointslice controller
441
func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef *corev1.ObjectReference, host host.Name) (*corev1.Pod, bool) {
442
	var expectPod bool
443
	pod := c.getPod(ip, ep.Namespace, targetRef)
444
	if targetRef != nil && targetRef.Kind == "Pod" {
445
		expectPod = true
446
		if pod == nil {
447
			c.registerEndpointResync(ep, ip, host)
448
		}
449
	}
450

451
	return pod, expectPod
452
}
453

454
func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string, host host.Name) {
455
	// This means, the endpoint event has arrived before pod event.
456
	// This might happen because PodCache is eventually consistent.
457
	log.Debugf("Endpoint without pod %s %s.%s", ip, ep.Name, ep.Namespace)
458
	endpointsWithNoPods.Increment()
459
	if c.opts.Metrics != nil {
460
		c.opts.Metrics.AddMetric(model.EndpointNoPod, string(host), "", ip)
461
	}
462
	// Tell pod cache we want to queue the endpoint event when this pod arrives.
463
	c.pods.queueEndpointEventOnPodArrival(config.NamespacedName(ep), ip)
464
}
465

466
// getPod fetches a pod by name or IP address.
467
// A pod may be missing (nil) for two reasons:
468
// * It is an endpoint without an associated Pod.
469
// * It is an endpoint with an associate Pod, but its not found.
470
func (c *Controller) getPod(ip string, namespace string, targetRef *corev1.ObjectReference) *corev1.Pod {
471
	if targetRef != nil && targetRef.Kind == "Pod" {
472
		key := types.NamespacedName{Name: targetRef.Name, Namespace: targetRef.Namespace}
473
		pod := c.pods.getPodByKey(key)
474
		return pod
475
	}
476
	// This means the endpoint is manually controlled
477
	// We will want to lookup a pod to find metadata like service account, labels, etc. But for hostNetwork, we just get a raw IP,
478
	// and the IP may be shared by many pods. Best we can do is guess.
479
	pods := c.pods.getPodsByIP(ip)
480
	for _, p := range pods {
481
		if p.Namespace == namespace {
482
			// Might not be right, but best we can do.
483
			return p
484
		}
485
	}
486
	return nil
487
}
488

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.