istio
217 строк · 7.5 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"strings"
19
20metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22klabels "k8s.io/apimachinery/pkg/labels"
23"k8s.io/apimachinery/pkg/types"
24
25"istio.io/istio/pilot/pkg/features"
26"istio.io/istio/pilot/pkg/model"
27kubesr "istio.io/istio/pilot/pkg/serviceregistry/kube"
28"istio.io/istio/pkg/config"
29"istio.io/istio/pkg/config/constants"
30"istio.io/istio/pkg/config/host"
31"istio.io/istio/pkg/kube/controllers"
32"istio.io/istio/pkg/kube/kclient"
33"istio.io/istio/pkg/kube/kubetypes"
34"istio.io/istio/pkg/kube/mcs"
35)
36
37type exportedService struct {
38namespacedName types.NamespacedName
39discoverability map[host.Name]string
40}
41
42// serviceExportCache reads Kubernetes Multi-Cluster Services (MCS) ServiceExport resources in the
43// cluster and generates discoverability policies for the endpoints.
44type serviceExportCache interface {
45// EndpointDiscoverabilityPolicy returns the policy for Service endpoints residing within the current cluster.
46EndpointDiscoverabilityPolicy(svc *model.Service) model.EndpointDiscoverabilityPolicy
47
48// ExportedServices returns the list of services that are exported in this cluster. Used for debugging.
49ExportedServices() []exportedService
50Run(stop <-chan struct{})
51
52// HasSynced indicates whether the kube createClient has synced for the watched resources.
53HasSynced() bool
54}
55
56// newServiceExportCache creates a new serviceExportCache that observes the given cluster.
57func newServiceExportCache(c *Controller) serviceExportCache {
58if features.EnableMCSServiceDiscovery {
59ec := &serviceExportCacheImpl{
60Controller: c,
61}
62
63ec.serviceExports = kclient.NewDelayedInformer[controllers.Object](ec.client, mcs.ServiceExportGVR, kubetypes.DynamicInformer, kclient.Filter{
64ObjectFilter: ec.client.ObjectFilter(),
65})
66// Register callbacks for events.
67registerHandlers(ec.Controller, ec.serviceExports, "ServiceExports", ec.onServiceExportEvent, nil)
68
69// Set the discoverability policy for the clusterset.local host.
70ec.clusterSetLocalPolicySelector = func(svc *model.Service) (policy model.EndpointDiscoverabilityPolicy) {
71// If the service is exported in this cluster, allow the endpoints in this cluster to be discoverable
72// anywhere in the mesh.
73if ec.isExported(namespacedNameForService(svc)) {
74return model.AlwaysDiscoverable
75}
76
77// Otherwise, endpoints are only discoverable from within the same cluster.
78return model.DiscoverableFromSameCluster
79}
80
81// Set the discoverability policy for the cluster.local host.
82if features.EnableMCSClusterLocal {
83// MCS cluster.local mode is enabled. Allow endpoints for the cluster.local host to be
84// discoverable only from within the same cluster.
85ec.clusterLocalPolicySelector = func(svc *model.Service) (policy model.EndpointDiscoverabilityPolicy) {
86return model.DiscoverableFromSameCluster
87}
88} else {
89// MCS cluster.local mode is not enabled, so requests to the cluster.local host are not confined
90// to the same cluster. Use the same discoverability policy as for clusterset.local.
91ec.clusterLocalPolicySelector = ec.clusterSetLocalPolicySelector
92}
93
94return ec
95}
96
97// MCS Service discovery is disabled. Use a placeholder cache.
98return disabledServiceExportCache{}
99}
100
101type discoverabilityPolicySelector func(*model.Service) model.EndpointDiscoverabilityPolicy
102
103// serviceExportCache reads ServiceExport resources for a single cluster.
104type serviceExportCacheImpl struct {
105*Controller
106
107serviceExports kclient.Untyped
108
109// clusterLocalPolicySelector selects an appropriate EndpointDiscoverabilityPolicy for the cluster.local host.
110clusterLocalPolicySelector discoverabilityPolicySelector
111
112// clusterSetLocalPolicySelector selects an appropriate EndpointDiscoverabilityPolicy for the clusterset.local host.
113clusterSetLocalPolicySelector discoverabilityPolicySelector
114}
115
116func (ec *serviceExportCacheImpl) onServiceExportEvent(_, obj controllers.Object, event model.Event) error {
117se := controllers.Extract[*unstructured.Unstructured](obj)
118if se == nil {
119return nil
120}
121
122switch event {
123case model.EventAdd, model.EventDelete:
124ec.updateXDS(se)
125default:
126// Don't care about updates.
127}
128return nil
129}
130
131func (ec *serviceExportCacheImpl) updateXDS(se metav1.Object) {
132for _, svc := range ec.servicesForNamespacedName(config.NamespacedName(se)) {
133// Re-build the endpoints for this service with a new discoverability policy.
134// Also update any internal caching.
135endpoints := ec.buildEndpointsForService(svc, true)
136shard := model.ShardKeyFromRegistry(ec)
137ec.opts.XDSUpdater.EDSUpdate(shard, svc.Hostname.String(), se.GetNamespace(), endpoints)
138}
139}
140
141func (ec *serviceExportCacheImpl) EndpointDiscoverabilityPolicy(svc *model.Service) model.EndpointDiscoverabilityPolicy {
142if svc == nil {
143// Default policy when the service doesn't exist.
144return model.DiscoverableFromSameCluster
145}
146
147if strings.HasSuffix(svc.Hostname.String(), "."+constants.DefaultClusterSetLocalDomain) {
148return ec.clusterSetLocalPolicySelector(svc)
149}
150
151return ec.clusterLocalPolicySelector(svc)
152}
153
154func (ec *serviceExportCacheImpl) isExported(name types.NamespacedName) bool {
155return ec.serviceExports.Get(name.Name, name.Namespace) != nil
156}
157
158func (ec *serviceExportCacheImpl) ExportedServices() []exportedService {
159// List all exports in this cluster.
160exports := ec.serviceExports.List(metav1.NamespaceAll, klabels.Everything())
161
162ec.RLock()
163
164out := make([]exportedService, 0, len(exports))
165for _, export := range exports {
166uExport := export.(*unstructured.Unstructured)
167es := exportedService{
168namespacedName: config.NamespacedName(uExport),
169discoverability: make(map[host.Name]string),
170}
171
172// Generate the map of all hosts for this service to their discoverability policies.
173clusterLocalHost := kubesr.ServiceHostname(uExport.GetName(), uExport.GetNamespace(), ec.opts.DomainSuffix)
174clusterSetLocalHost := serviceClusterSetLocalHostname(es.namespacedName)
175for _, hostName := range []host.Name{clusterLocalHost, clusterSetLocalHost} {
176if svc := ec.servicesMap[hostName]; svc != nil {
177es.discoverability[hostName] = ec.EndpointDiscoverabilityPolicy(svc).String()
178}
179}
180
181out = append(out, es)
182}
183
184ec.RUnlock()
185
186return out
187}
188
189func (ec *serviceExportCacheImpl) Run(stop <-chan struct{}) {
190}
191
192func (ec *serviceExportCacheImpl) HasSynced() bool {
193return ec.serviceExports.HasSynced()
194}
195
196type disabledServiceExportCache struct{}
197
198var _ serviceExportCache = disabledServiceExportCache{}
199
200func (c disabledServiceExportCache) EndpointDiscoverabilityPolicy(*model.Service) model.EndpointDiscoverabilityPolicy {
201return model.AlwaysDiscoverable
202}
203
204func (c disabledServiceExportCache) Run(stop <-chan struct{}) {}
205
206func (c disabledServiceExportCache) HasSynced() bool {
207return true
208}
209
210func (c disabledServiceExportCache) ExportedServices() []exportedService {
211// MCS is disabled - returning `nil`, which is semantically different here than an empty list.
212return nil
213}
214
215func (c disabledServiceExportCache) HasCRDInstalled() bool {
216return false
217}
218