istio
487 строк · 16.9 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"sync"
19
20"github.com/hashicorp/go-multierror"
21corev1 "k8s.io/api/core/v1"
22v1 "k8s.io/api/discovery/v1"
23"k8s.io/api/discovery/v1beta1"
24metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25klabels "k8s.io/apimachinery/pkg/labels"
26"k8s.io/apimachinery/pkg/selection"
27"k8s.io/apimachinery/pkg/types"
28mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
29
30"istio.io/istio/pilot/pkg/features"
31"istio.io/istio/pilot/pkg/model"
32"istio.io/istio/pkg/config"
33"istio.io/istio/pkg/config/host"
34"istio.io/istio/pkg/config/schema/kind"
35"istio.io/istio/pkg/config/visibility"
36"istio.io/istio/pkg/kube/kclient"
37"istio.io/istio/pkg/util/sets"
38)
39
40type endpointSliceController struct {
41endpointCache *endpointSliceCache
42slices kclient.Client[*v1.EndpointSlice]
43c *Controller
44}
45
46var (
47endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, selection.DoesNotExist, nil)
48endpointSliceSelector = klabels.NewSelector().Add(*endpointSliceRequirement)
49)
50
51func newEndpointSliceController(c *Controller) *endpointSliceController {
52slices := kclient.NewFiltered[*v1.EndpointSlice](c.client, kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
53out := &endpointSliceController{
54c: c,
55slices: slices,
56endpointCache: newEndpointSliceCache(),
57}
58registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice", out.onEvent, nil)
59return out
60}
61
62func (esc *endpointSliceController) podArrived(name, ns string) error {
63ep := esc.slices.Get(name, ns)
64if ep == nil {
65return nil
66}
67return esc.onEvent(nil, ep, model.EventAdd)
68}
69
70// initializeNamespace initializes endpoints for a given namespace.
71func (esc *endpointSliceController) initializeNamespace(ns string, filtered bool) error {
72var err *multierror.Error
73var endpoints []*v1.EndpointSlice
74if filtered {
75endpoints = esc.slices.List(ns, klabels.Everything())
76} else {
77endpoints = esc.slices.ListUnfiltered(ns, klabels.Everything())
78}
79log.Debugf("initializing %d endpointslices", len(endpoints))
80for _, s := range endpoints {
81err = multierror.Append(err, esc.onEvent(nil, s, model.EventAdd))
82}
83return err.ErrorOrNil()
84}
85
86func (esc *endpointSliceController) onEvent(_, ep *v1.EndpointSlice, event model.Event) error {
87esc.onEventInternal(nil, ep, event)
88return nil
89}
90
91func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, event model.Event) {
92esLabels := ep.GetLabels()
93if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
94return
95}
96// Update internal endpoint cache no matter what kind of service, even headless service.
97// As for gateways, the cluster discovery type is `EDS` for headless service.
98namespacedName := getServiceNamespacedName(ep)
99log.Debugf("Handle EDS endpoint %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
100if event == model.EventDelete {
101esc.deleteEndpointSlice(ep)
102} else {
103esc.updateEndpointSlice(ep)
104}
105hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
106// Trigger EDS push for all hostnames.
107esc.pushEDS(hostnames, namespacedName.Namespace)
108
109name := serviceNameForEndpointSlice(esLabels)
110namespace := ep.GetNamespace()
111svc := esc.c.services.Get(name, namespace)
112if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || svc.Spec.Type == corev1.ServiceTypeExternalName {
113return
114}
115
116configs := []types.NamespacedName{}
117pureHTTP := true
118for _, modelSvc := range esc.c.servicesForNamespacedName(config.NamespacedName(svc)) {
119// skip push if it is not exported
120if modelSvc.Attributes.ExportTo.Contains(visibility.None) {
121continue
122}
123
124configs = append(configs, types.NamespacedName{Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
125
126for _, p := range modelSvc.Ports {
127if !p.Protocol.IsHTTP() {
128pureHTTP = false
129break
130}
131}
132}
133
134configsUpdated := sets.New[model.ConfigKey]()
135for _, config := range configs {
136if !pureHTTP {
137configsUpdated.Insert(model.ConfigKey{Kind: kind.ServiceEntry, Name: config.Name, Namespace: config.Namespace})
138} else {
139// pure HTTP headless services should not need a full push since they do not
140// require a Listener based on IP: https://github.com/istio/istio/issues/48207
141configsUpdated.Insert(model.ConfigKey{Kind: kind.DNSName, Name: config.Name, Namespace: config.Namespace})
142}
143}
144
145if len(configsUpdated) > 0 {
146// For headless services, trigger a full push.
147// If EnableHeadlessService is true and svc ports are not pure HTTP, we need to regenerate listeners per endpoint.
148// Otherwise we only need to push NDS, but still need to set full but we skip all other xDS except NDS during the push.
149esc.c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
150Full: true,
151ConfigsUpdated: configsUpdated,
152Reason: model.NewReasonStats(model.HeadlessEndpointUpdate),
153})
154}
155}
156
157// GetProxyServiceTargets returns service instances co-located with a given proxy
158// TODO: this code does not return k8s service instances when the proxy's IP is a workload entry
159// To tackle this, we need a ip2instance map like what we have in service entry.
160func (esc *endpointSliceController) GetProxyServiceTargets(proxy *model.Proxy) []model.ServiceTarget {
161eps := esc.slices.List(proxy.Metadata.Namespace, endpointSliceSelector)
162var out []model.ServiceTarget
163for _, ep := range eps {
164instances := esc.serviceTargets(ep, proxy)
165out = append(out, instances...)
166}
167
168return out
169}
170
171func serviceNameForEndpointSlice(labels map[string]string) string {
172return labels[v1.LabelServiceName]
173}
174
175func (esc *endpointSliceController) serviceTargets(ep *v1.EndpointSlice, proxy *model.Proxy) []model.ServiceTarget {
176var out []model.ServiceTarget
177esc.endpointCache.mu.RLock()
178defer esc.endpointCache.mu.RUnlock()
179for _, svc := range esc.c.servicesForNamespacedName(getServiceNamespacedName(ep)) {
180for _, instance := range esc.endpointCache.get(svc.Hostname) {
181port, f := svc.Ports.Get(instance.ServicePortName)
182if !f {
183log.Warnf("unexpected state, svc %v missing port %v", svc.Hostname, instance.ServicePortName)
184continue
185}
186// consider multiple IP scenarios
187for _, ip := range proxy.IPAddresses {
188if ip != instance.Address {
189continue
190}
191// If the endpoint isn't ready, report this
192if instance.HealthStatus == model.UnHealthy && esc.c.opts.Metrics != nil {
193esc.c.opts.Metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy.ID, "")
194}
195si := model.ServiceTarget{
196Service: svc,
197Port: model.ServiceInstancePort{
198ServicePort: port,
199TargetPort: instance.EndpointPort,
200},
201}
202out = append(out, si)
203}
204}
205}
206return out
207}
208
209func (esc *endpointSliceController) deleteEndpointSlice(slice *v1.EndpointSlice) {
210key := config.NamespacedName(slice)
211for _, e := range slice.Endpoints {
212for _, a := range e.Addresses {
213esc.c.pods.endpointDeleted(key, a)
214}
215}
216
217esc.endpointCache.mu.Lock()
218defer esc.endpointCache.mu.Unlock()
219for _, hostName := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
220// endpointSlice cache update
221if esc.endpointCache.has(hostName) {
222esc.endpointCache.delete(hostName, slice.Name)
223}
224}
225}
226
227func (esc *endpointSliceController) updateEndpointSlice(slice *v1.EndpointSlice) {
228for _, hostname := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
229esc.updateEndpointCacheForSlice(hostname, slice)
230}
231}
232
233func endpointHealthStatus(svc *model.Service, e v1.Endpoint) model.HealthStatus {
234if e.Conditions.Ready == nil || *e.Conditions.Ready {
235return model.Healthy
236}
237
238if features.PersistentSessionLabel != "" &&
239svc != nil &&
240svc.Attributes.Labels[features.PersistentSessionLabel] != "" &&
241(e.Conditions.Serving == nil || *e.Conditions.Serving) &&
242(e.Conditions.Terminating == nil || *e.Conditions.Terminating) {
243return model.Draining
244}
245
246return model.UnHealthy
247}
248
249func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Name, slice *v1.EndpointSlice) {
250var endpoints []*model.IstioEndpoint
251if slice.AddressType == v1.AddressTypeFQDN {
252// TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice
253return
254}
255svc := esc.c.GetService(hostName)
256discoverabilityPolicy := esc.c.exports.EndpointDiscoverabilityPolicy(svc)
257
258for _, e := range slice.Endpoints {
259// Draining tracking is only enabled if persistent sessions is enabled.
260// If we start using them for other features, this can be adjusted.
261healthStatus := endpointHealthStatus(svc, e)
262for _, a := range e.Addresses {
263pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostName)
264if pod == nil && expectedPod {
265continue
266}
267builder := NewEndpointBuilder(esc.c, pod)
268// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
269for _, port := range slice.Ports {
270var portNum int32
271if port.Port != nil {
272portNum = *port.Port
273}
274var portName string
275if port.Name != nil {
276portName = *port.Name
277}
278
279istioEndpoint := builder.buildIstioEndpoint(a, portNum, portName, discoverabilityPolicy, healthStatus)
280endpoints = append(endpoints, istioEndpoint)
281}
282}
283}
284esc.endpointCache.Update(hostName, slice.Name, endpoints)
285}
286
287func (esc *endpointSliceController) buildIstioEndpointsWithService(name, namespace string, hostName host.Name, updateCache bool) []*model.IstioEndpoint {
288esLabelSelector := endpointSliceSelectorForService(name)
289slices := esc.slices.List(namespace, esLabelSelector)
290if len(slices) == 0 {
291log.Debugf("endpoint slices of (%s, %s) not found", name, namespace)
292return nil
293}
294
295if updateCache {
296// A cache update was requested. Rebuild the endpoints for these slices.
297for _, slice := range slices {
298esc.updateEndpointCacheForSlice(hostName, slice)
299}
300}
301
302return esc.endpointCache.Get(hostName)
303}
304
305func getServiceNamespacedName(slice *v1.EndpointSlice) types.NamespacedName {
306return types.NamespacedName{
307Namespace: slice.GetNamespace(),
308Name: serviceNameForEndpointSlice(slice.GetLabels()),
309}
310}
311
312// endpointKey unique identifies an endpoint by IP and port name
313// This is used for deduping endpoints across slices.
314type endpointKey struct {
315ip string
316port string
317}
318
319type endpointSliceCache struct {
320mu sync.RWMutex
321endpointsByServiceAndSlice map[host.Name]map[string][]*model.IstioEndpoint
322}
323
324func newEndpointSliceCache() *endpointSliceCache {
325out := &endpointSliceCache{
326endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.IstioEndpoint),
327}
328return out
329}
330
331func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
332e.mu.Lock()
333defer e.mu.Unlock()
334e.update(hostname, slice, endpoints)
335}
336
337func (e *endpointSliceCache) update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
338if len(endpoints) == 0 {
339delete(e.endpointsByServiceAndSlice[hostname], slice)
340}
341if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
342e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.IstioEndpoint)
343}
344// We will always overwrite. A conflict here means an endpoint is transitioning
345// from one slice to another See
346// https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
347// In this case, we can always assume and update is fresh, although older slices
348// we have not gotten updates may be stale; therefore we always take the new
349// update.
350e.endpointsByServiceAndSlice[hostname][slice] = endpoints
351}
352
353func (e *endpointSliceCache) Delete(hostname host.Name, slice string) {
354e.mu.Lock()
355defer e.mu.Unlock()
356e.delete(hostname, slice)
357}
358
359func (e *endpointSliceCache) delete(hostname host.Name, slice string) {
360delete(e.endpointsByServiceAndSlice[hostname], slice)
361if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
362delete(e.endpointsByServiceAndSlice, hostname)
363}
364}
365
366func (e *endpointSliceCache) Get(hostname host.Name) []*model.IstioEndpoint {
367e.mu.RLock()
368defer e.mu.RUnlock()
369return e.get(hostname)
370}
371
372func (e *endpointSliceCache) get(hostname host.Name) []*model.IstioEndpoint {
373var endpoints []*model.IstioEndpoint
374found := sets.New[endpointKey]()
375for _, eps := range e.endpointsByServiceAndSlice[hostname] {
376for _, ep := range eps {
377key := endpointKey{ep.Address, ep.ServicePortName}
378if found.InsertContains(key) {
379// This a duplicate. Update() already handles conflict resolution, so we don't
380// need to pick the "right" one here.
381continue
382}
383endpoints = append(endpoints, ep)
384}
385}
386return endpoints
387}
388
389func (e *endpointSliceCache) Has(hostname host.Name) bool {
390e.mu.RLock()
391defer e.mu.RUnlock()
392return e.has(hostname)
393}
394
395func (e *endpointSliceCache) has(hostname host.Name) bool {
396_, found := e.endpointsByServiceAndSlice[hostname]
397return found
398}
399
400func endpointSliceSelectorForService(name string) klabels.Selector {
401return klabels.Set(map[string]string{
402v1beta1.LabelServiceName: name,
403}).AsSelectorPreValidated().Add(*endpointSliceRequirement)
404}
405
406func (esc *endpointSliceController) pushEDS(hostnames []host.Name, namespace string) {
407shard := model.ShardKeyFromRegistry(esc.c)
408// Even though we just read from the cache, we need the full lock to ensure pushEDS
409// runs sequentially when `EnableK8SServiceSelectWorkloadEntries` is enabled. Otherwise,
410// we may end up with eds updates can go out of order with workload entry updates causing
411// incorrect endpoints. For regular endpoint updates, pushEDS is already serialized
412// because the events are queued.
413esc.endpointCache.mu.Lock()
414defer esc.endpointCache.mu.Unlock()
415for _, hostname := range hostnames {
416endpoints := esc.endpointCache.get(hostname)
417if features.EnableK8SServiceSelectWorkloadEntries {
418svc := esc.c.GetService(hostname)
419if svc != nil {
420fep := esc.c.collectWorkloadInstanceEndpoints(svc)
421endpoints = append(endpoints, fep...)
422} else {
423log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/ has not been populated",
424hostname)
425}
426}
427
428esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname), namespace, endpoints)
429}
430}
431
432// getPod fetches a pod by name or IP address.
433// A pod may be missing (nil) for two reasons:
434// - It is an endpoint without an associated Pod. In this case, expectPod will be false.
435// - It is an endpoint with an associate Pod, but its not found. In this case, expectPod will be true.
436// this may happen due to eventually consistency issues, out of order events, etc. In this case, the caller
437// should not precede with the endpoint, or inaccurate information would be sent which may have impacts on
438// correctness and security.
439//
440// Note: this is only used by endpointslice controller
441func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef *corev1.ObjectReference, host host.Name) (*corev1.Pod, bool) {
442var expectPod bool
443pod := c.getPod(ip, ep.Namespace, targetRef)
444if targetRef != nil && targetRef.Kind == "Pod" {
445expectPod = true
446if pod == nil {
447c.registerEndpointResync(ep, ip, host)
448}
449}
450
451return pod, expectPod
452}
453
454func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string, host host.Name) {
455// This means, the endpoint event has arrived before pod event.
456// This might happen because PodCache is eventually consistent.
457log.Debugf("Endpoint without pod %s %s.%s", ip, ep.Name, ep.Namespace)
458endpointsWithNoPods.Increment()
459if c.opts.Metrics != nil {
460c.opts.Metrics.AddMetric(model.EndpointNoPod, string(host), "", ip)
461}
462// Tell pod cache we want to queue the endpoint event when this pod arrives.
463c.pods.queueEndpointEventOnPodArrival(config.NamespacedName(ep), ip)
464}
465
466// getPod fetches a pod by name or IP address.
467// A pod may be missing (nil) for two reasons:
468// * It is an endpoint without an associated Pod.
469// * It is an endpoint with an associate Pod, but its not found.
470func (c *Controller) getPod(ip string, namespace string, targetRef *corev1.ObjectReference) *corev1.Pod {
471if targetRef != nil && targetRef.Kind == "Pod" {
472key := types.NamespacedName{Name: targetRef.Name, Namespace: targetRef.Namespace}
473pod := c.pods.getPodByKey(key)
474return pod
475}
476// This means the endpoint is manually controlled
477// We will want to lookup a pod to find metadata like service account, labels, etc. But for hostNetwork, we just get a raw IP,
478// and the IP may be shared by many pods. Best we can do is guess.
479pods := c.pods.getPodsByIP(ip)
480for _, p := range pods {
481if p.Namespace == namespace {
482// Might not be right, but best we can do.
483return p
484}
485}
486return nil
487}
488