istio

Форк
0
/
serviceexportcache.go 
217 строк · 7.5 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"strings"
19

20
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22
	klabels "k8s.io/apimachinery/pkg/labels"
23
	"k8s.io/apimachinery/pkg/types"
24

25
	"istio.io/istio/pilot/pkg/features"
26
	"istio.io/istio/pilot/pkg/model"
27
	kubesr "istio.io/istio/pilot/pkg/serviceregistry/kube"
28
	"istio.io/istio/pkg/config"
29
	"istio.io/istio/pkg/config/constants"
30
	"istio.io/istio/pkg/config/host"
31
	"istio.io/istio/pkg/kube/controllers"
32
	"istio.io/istio/pkg/kube/kclient"
33
	"istio.io/istio/pkg/kube/kubetypes"
34
	"istio.io/istio/pkg/kube/mcs"
35
)
36

37
type exportedService struct {
38
	namespacedName  types.NamespacedName
39
	discoverability map[host.Name]string
40
}
41

42
// serviceExportCache reads Kubernetes Multi-Cluster Services (MCS) ServiceExport resources in the
43
// cluster and generates discoverability policies for the endpoints.
44
type serviceExportCache interface {
45
	// EndpointDiscoverabilityPolicy returns the policy for Service endpoints residing within the current cluster.
46
	EndpointDiscoverabilityPolicy(svc *model.Service) model.EndpointDiscoverabilityPolicy
47

48
	// ExportedServices returns the list of services that are exported in this cluster. Used for debugging.
49
	ExportedServices() []exportedService
50
	Run(stop <-chan struct{})
51

52
	// HasSynced indicates whether the kube createClient has synced for the watched resources.
53
	HasSynced() bool
54
}
55

56
// newServiceExportCache creates a new serviceExportCache that observes the given cluster.
57
func newServiceExportCache(c *Controller) serviceExportCache {
58
	if features.EnableMCSServiceDiscovery {
59
		ec := &serviceExportCacheImpl{
60
			Controller: c,
61
		}
62

63
		ec.serviceExports = kclient.NewDelayedInformer[controllers.Object](ec.client, mcs.ServiceExportGVR, kubetypes.DynamicInformer, kclient.Filter{
64
			ObjectFilter: ec.client.ObjectFilter(),
65
		})
66
		// Register callbacks for events.
67
		registerHandlers(ec.Controller, ec.serviceExports, "ServiceExports", ec.onServiceExportEvent, nil)
68

69
		// Set the discoverability policy for the clusterset.local host.
70
		ec.clusterSetLocalPolicySelector = func(svc *model.Service) (policy model.EndpointDiscoverabilityPolicy) {
71
			// If the service is exported in this cluster, allow the endpoints in this cluster to be discoverable
72
			// anywhere in the mesh.
73
			if ec.isExported(namespacedNameForService(svc)) {
74
				return model.AlwaysDiscoverable
75
			}
76

77
			// Otherwise, endpoints are only discoverable from within the same cluster.
78
			return model.DiscoverableFromSameCluster
79
		}
80

81
		// Set the discoverability policy for the cluster.local host.
82
		if features.EnableMCSClusterLocal {
83
			// MCS cluster.local mode is enabled. Allow endpoints for the cluster.local host to be
84
			// discoverable only from within the same cluster.
85
			ec.clusterLocalPolicySelector = func(svc *model.Service) (policy model.EndpointDiscoverabilityPolicy) {
86
				return model.DiscoverableFromSameCluster
87
			}
88
		} else {
89
			// MCS cluster.local mode is not enabled, so requests to the cluster.local host are not confined
90
			// to the same cluster. Use the same discoverability policy as for clusterset.local.
91
			ec.clusterLocalPolicySelector = ec.clusterSetLocalPolicySelector
92
		}
93

94
		return ec
95
	}
96

97
	// MCS Service discovery is disabled. Use a placeholder cache.
98
	return disabledServiceExportCache{}
99
}
100

101
type discoverabilityPolicySelector func(*model.Service) model.EndpointDiscoverabilityPolicy
102

103
// serviceExportCache reads ServiceExport resources for a single cluster.
104
type serviceExportCacheImpl struct {
105
	*Controller
106

107
	serviceExports kclient.Untyped
108

109
	// clusterLocalPolicySelector selects an appropriate EndpointDiscoverabilityPolicy for the cluster.local host.
110
	clusterLocalPolicySelector discoverabilityPolicySelector
111

112
	// clusterSetLocalPolicySelector selects an appropriate EndpointDiscoverabilityPolicy for the clusterset.local host.
113
	clusterSetLocalPolicySelector discoverabilityPolicySelector
114
}
115

116
func (ec *serviceExportCacheImpl) onServiceExportEvent(_, obj controllers.Object, event model.Event) error {
117
	se := controllers.Extract[*unstructured.Unstructured](obj)
118
	if se == nil {
119
		return nil
120
	}
121

122
	switch event {
123
	case model.EventAdd, model.EventDelete:
124
		ec.updateXDS(se)
125
	default:
126
		// Don't care about updates.
127
	}
128
	return nil
129
}
130

131
func (ec *serviceExportCacheImpl) updateXDS(se metav1.Object) {
132
	for _, svc := range ec.servicesForNamespacedName(config.NamespacedName(se)) {
133
		// Re-build the endpoints for this service with a new discoverability policy.
134
		// Also update any internal caching.
135
		endpoints := ec.buildEndpointsForService(svc, true)
136
		shard := model.ShardKeyFromRegistry(ec)
137
		ec.opts.XDSUpdater.EDSUpdate(shard, svc.Hostname.String(), se.GetNamespace(), endpoints)
138
	}
139
}
140

141
func (ec *serviceExportCacheImpl) EndpointDiscoverabilityPolicy(svc *model.Service) model.EndpointDiscoverabilityPolicy {
142
	if svc == nil {
143
		// Default policy when the service doesn't exist.
144
		return model.DiscoverableFromSameCluster
145
	}
146

147
	if strings.HasSuffix(svc.Hostname.String(), "."+constants.DefaultClusterSetLocalDomain) {
148
		return ec.clusterSetLocalPolicySelector(svc)
149
	}
150

151
	return ec.clusterLocalPolicySelector(svc)
152
}
153

154
func (ec *serviceExportCacheImpl) isExported(name types.NamespacedName) bool {
155
	return ec.serviceExports.Get(name.Name, name.Namespace) != nil
156
}
157

158
func (ec *serviceExportCacheImpl) ExportedServices() []exportedService {
159
	// List all exports in this cluster.
160
	exports := ec.serviceExports.List(metav1.NamespaceAll, klabels.Everything())
161

162
	ec.RLock()
163

164
	out := make([]exportedService, 0, len(exports))
165
	for _, export := range exports {
166
		uExport := export.(*unstructured.Unstructured)
167
		es := exportedService{
168
			namespacedName:  config.NamespacedName(uExport),
169
			discoverability: make(map[host.Name]string),
170
		}
171

172
		// Generate the map of all hosts for this service to their discoverability policies.
173
		clusterLocalHost := kubesr.ServiceHostname(uExport.GetName(), uExport.GetNamespace(), ec.opts.DomainSuffix)
174
		clusterSetLocalHost := serviceClusterSetLocalHostname(es.namespacedName)
175
		for _, hostName := range []host.Name{clusterLocalHost, clusterSetLocalHost} {
176
			if svc := ec.servicesMap[hostName]; svc != nil {
177
				es.discoverability[hostName] = ec.EndpointDiscoverabilityPolicy(svc).String()
178
			}
179
		}
180

181
		out = append(out, es)
182
	}
183

184
	ec.RUnlock()
185

186
	return out
187
}
188

189
func (ec *serviceExportCacheImpl) Run(stop <-chan struct{}) {
190
}
191

192
func (ec *serviceExportCacheImpl) HasSynced() bool {
193
	return ec.serviceExports.HasSynced()
194
}
195

196
type disabledServiceExportCache struct{}
197

198
var _ serviceExportCache = disabledServiceExportCache{}
199

200
func (c disabledServiceExportCache) EndpointDiscoverabilityPolicy(*model.Service) model.EndpointDiscoverabilityPolicy {
201
	return model.AlwaysDiscoverable
202
}
203

204
func (c disabledServiceExportCache) Run(stop <-chan struct{}) {}
205

206
func (c disabledServiceExportCache) HasSynced() bool {
207
	return true
208
}
209

210
func (c disabledServiceExportCache) ExportedServices() []exportedService {
211
	// MCS is disabled - returning `nil`, which is semantically different here than an empty list.
212
	return nil
213
}
214

215
func (c disabledServiceExportCache) HasCRDInstalled() bool {
216
	return false
217
}
218

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.