istio

Форк
0
/
serviceimportcache.go 
315 строк · 10.2 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"sort"
19
	"strings"
20

21
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23
	klabels "k8s.io/apimachinery/pkg/labels"
24
	"k8s.io/apimachinery/pkg/types"
25

26
	"istio.io/istio/pilot/pkg/features"
27
	"istio.io/istio/pilot/pkg/model"
28
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
29
	"istio.io/istio/pkg/cluster"
30
	"istio.io/istio/pkg/config"
31
	"istio.io/istio/pkg/config/constants"
32
	"istio.io/istio/pkg/config/host"
33
	"istio.io/istio/pkg/config/schema/kind"
34
	"istio.io/istio/pkg/kube/controllers"
35
	"istio.io/istio/pkg/kube/kclient"
36
	"istio.io/istio/pkg/kube/kubetypes"
37
	"istio.io/istio/pkg/kube/mcs"
38
	"istio.io/istio/pkg/slices"
39
	netutil "istio.io/istio/pkg/util/net"
40
	"istio.io/istio/pkg/util/sets"
41
)
42

43
const (
44
	mcsDomainSuffix = "." + constants.DefaultClusterSetLocalDomain
45
)
46

47
type importedService struct {
48
	namespacedName types.NamespacedName
49
	clusterSetVIP  string
50
}
51

52
// serviceImportCache reads and processes Kubernetes Multi-Cluster Services (MCS) ServiceImport
53
// resources.
54
//
55
// An MCS controller is responsible for reading ServiceExport resources in one cluster and generating
56
// ServiceImport in all clusters of the ClusterSet (i.e. mesh). While the serviceExportCache reads
57
// ServiceExport to control the discoverability policy for individual endpoints, this controller
58
// reads ServiceImport in the cluster in order to extract the ClusterSet VIP and generate a
59
// synthetic service for the MCS host (i.e. clusterset.local). The aggregate.Controller will then
60
// merge together the MCS services from all the clusters, filling out the full map of Cluster IPs.
61
//
62
// The synthetic MCS service is a copy of the real k8s Service (e.g. cluster.local) with the same
63
// namespaced name, but with the hostname and VIPs changed to the appropriate ClusterSet values.
64
// The real k8s Service can live anywhere in the mesh and does not have to reside in the same
65
// cluster as the ServiceImport.
66
type serviceImportCache interface {
67
	Run(stop <-chan struct{})
68
	HasSynced() bool
69
	ImportedServices() []importedService
70
}
71

72
// newServiceImportCache creates a new cache of ServiceImport resources in the cluster.
73
func newServiceImportCache(c *Controller) serviceImportCache {
74
	if features.EnableMCSHost {
75
		sic := &serviceImportCacheImpl{
76
			Controller: c,
77
		}
78

79
		sic.serviceImports = kclient.NewDelayedInformer[controllers.Object](sic.client, mcs.ServiceImportGVR, kubetypes.DynamicInformer, kclient.Filter{
80
			ObjectFilter: sic.client.ObjectFilter(),
81
		})
82
		// Register callbacks for events.
83
		registerHandlers(sic.Controller, sic.serviceImports, "ServiceImports", sic.onServiceImportEvent, nil)
84
		sic.opts.MeshServiceController.AppendServiceHandlerForCluster(sic.Cluster(), sic.onServiceEvent)
85

86
		return sic
87
	}
88

89
	// MCS Service discovery is disabled. Use a placeholder cache.
90
	return disabledServiceImportCache{}
91
}
92

93
// serviceImportCacheImpl reads ServiceImport resources for a single cluster.
94
type serviceImportCacheImpl struct {
95
	*Controller
96

97
	serviceImports kclient.Untyped
98
}
99

100
// onServiceEvent is called when the controller receives an event for the kube Service (i.e. cluster.local).
101
// When this happens, we need to update the state of the associated synthetic MCS service.
102
func (ic *serviceImportCacheImpl) onServiceEvent(_, curr *model.Service, event model.Event) {
103
	if strings.HasSuffix(curr.Hostname.String(), mcsDomainSuffix) {
104
		// Ignore events for MCS services that were triggered by this controller.
105
		return
106
	}
107

108
	// This method is called concurrently from each cluster's queue. Process it in `this` cluster's queue
109
	// in order to synchronize event processing.
110
	ic.queue.Push(func() error {
111
		namespacedName := namespacedNameForService(curr)
112

113
		// Lookup the previous MCS service if there was one.
114
		mcsHost := serviceClusterSetLocalHostname(namespacedName)
115
		prevMcsService := ic.GetService(mcsHost)
116

117
		// Get the ClusterSet VIPs for this service in this cluster. Will only be populated if the
118
		// service has a ServiceImport in this cluster.
119
		vips := ic.getClusterSetIPs(namespacedName)
120
		name := namespacedName.Name
121
		ns := namespacedName.Namespace
122

123
		if len(vips) == 0 || (event == model.EventDelete &&
124
			ic.opts.MeshServiceController.GetService(kube.ServiceHostname(name, ns, ic.opts.DomainSuffix)) == nil) {
125
			if prevMcsService != nil {
126
				// There are no vips in this cluster. Just delete the MCS service now.
127
				ic.deleteService(prevMcsService)
128
			}
129
			return nil
130
		}
131

132
		if prevMcsService != nil {
133
			event = model.EventUpdate
134
		} else {
135
			event = model.EventAdd
136
		}
137

138
		mcsService := ic.genMCSService(curr, mcsHost, vips)
139
		ic.addOrUpdateService(nil, nil, mcsService, event, false)
140
		return nil
141
	})
142
}
143

144
func (ic *serviceImportCacheImpl) onServiceImportEvent(_, obj controllers.Object, event model.Event) error {
145
	si := controllers.Extract[*unstructured.Unstructured](obj)
146
	if si == nil {
147
		return nil
148
	}
149

150
	// We need a full push if the cluster VIP changes.
151
	needsFullPush := false
152

153
	// Get the updated MCS service.
154
	mcsHost := serviceClusterSetLocalHostnameForKR(si)
155
	mcsService := ic.GetService(mcsHost)
156

157
	ips := GetServiceImportIPs(si)
158
	if mcsService == nil {
159
		if event == model.EventDelete || len(ips) == 0 {
160
			// We never created the service. Nothing to delete.
161
			return nil
162
		}
163

164
		// The service didn't exist prior. Treat it as an add.
165
		event = model.EventAdd
166

167
		// Create the MCS service, based on the cluster.local service. We get the merged, mesh-wide service
168
		// from the aggregate controller so that we don't rely on the service existing in this cluster.
169
		realService := ic.opts.MeshServiceController.GetService(kube.ServiceHostnameForKR(si, ic.opts.DomainSuffix))
170
		if realService == nil {
171
			log.Warnf("failed processing %s event for ServiceImport %s/%s in cluster %s. No matching service found in cluster",
172
				event, si.GetNamespace(), si.GetName(), ic.Cluster())
173
			return nil
174
		}
175

176
		// Create the MCS service from the cluster.local service.
177
		mcsService = ic.genMCSService(realService, mcsHost, ips)
178
	} else {
179
		if event == model.EventDelete || len(ips) == 0 {
180
			ic.deleteService(mcsService)
181
			return nil
182
		}
183

184
		// The service already existed. Treat it as an update.
185
		event = model.EventUpdate
186
		mcsService = mcsService.DeepCopy()
187
		if ic.updateIPs(mcsService, ips) {
188
			needsFullPush = true
189
		}
190
	}
191

192
	// Always force a rebuild of the endpoint cache in case this import caused
193
	// a change to the discoverability policy.
194
	ic.addOrUpdateService(nil, nil, mcsService, event, true)
195

196
	// TODO: do we really need a full push, we should do it in `addOrUpdateService`.
197
	if needsFullPush {
198
		ic.doFullPush(mcsHost, si.GetNamespace())
199
	}
200

201
	return nil
202
}
203

204
func (ic *serviceImportCacheImpl) updateIPs(mcsService *model.Service, ips []string) (updated bool) {
205
	prevIPs := mcsService.ClusterVIPs.GetAddressesFor(ic.Cluster())
206
	if !slices.Equal(prevIPs, ips) {
207
		// Update the VIPs
208
		mcsService.ClusterVIPs.SetAddressesFor(ic.Cluster(), ips)
209
		updated = true
210
	}
211
	return
212
}
213

214
func (ic *serviceImportCacheImpl) doFullPush(mcsHost host.Name, ns string) {
215
	pushReq := &model.PushRequest{
216
		Full:           true,
217
		ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: mcsHost.String(), Namespace: ns}),
218

219
		Reason: model.NewReasonStats(model.ServiceUpdate),
220
	}
221
	ic.opts.XDSUpdater.ConfigUpdate(pushReq)
222
}
223

224
// GetServiceImportIPs returns the list of ClusterSet IPs for the ServiceImport.
225
// Exported for testing only.
226
func GetServiceImportIPs(si *unstructured.Unstructured) []string {
227
	var ips []string
228
	if spec, ok := si.Object["spec"].(map[string]any); ok {
229
		if rawIPs, ok := spec["ips"].([]any); ok {
230
			for _, rawIP := range rawIPs {
231
				ip := rawIP.(string)
232
				if netutil.IsValidIPAddress(ip) {
233
					ips = append(ips, ip)
234
				}
235
			}
236
		}
237
	}
238
	sort.Strings(ips)
239
	return ips
240
}
241

242
// genMCSService generates an MCS service based on the given real k8s service. The list of vips must be non-empty.
243
func (ic *serviceImportCacheImpl) genMCSService(realService *model.Service, mcsHost host.Name, vips []string) *model.Service {
244
	mcsService := realService.DeepCopy()
245
	mcsService.Hostname = mcsHost
246
	mcsService.DefaultAddress = vips[0]
247
	mcsService.ClusterVIPs.Addresses = map[cluster.ID][]string{
248
		ic.Cluster(): vips,
249
	}
250

251
	return mcsService
252
}
253

254
func (ic *serviceImportCacheImpl) getClusterSetIPs(name types.NamespacedName) []string {
255
	si := ic.serviceImports.Get(name.Name, name.Namespace)
256
	if si != nil {
257
		return GetServiceImportIPs(si.(*unstructured.Unstructured))
258
	}
259
	return nil
260
}
261

262
func (ic *serviceImportCacheImpl) ImportedServices() []importedService {
263
	sis := ic.serviceImports.List(metav1.NamespaceAll, klabels.Everything())
264

265
	// Iterate over the ServiceImport resources in this cluster.
266
	out := make([]importedService, 0, len(sis))
267

268
	ic.RLock()
269
	for _, si := range sis {
270
		usi := si.(*unstructured.Unstructured)
271
		info := importedService{
272
			namespacedName: config.NamespacedName(usi),
273
		}
274

275
		// Lookup the synthetic MCS service.
276
		hostName := serviceClusterSetLocalHostnameForKR(usi)
277
		svc := ic.servicesMap[hostName]
278
		if svc != nil {
279
			if vips := svc.ClusterVIPs.GetAddressesFor(ic.Cluster()); len(vips) > 0 {
280
				info.clusterSetVIP = vips[0]
281
			}
282
		}
283

284
		out = append(out, info)
285
	}
286
	ic.RUnlock()
287

288
	return out
289
}
290

291
func (ic *serviceImportCacheImpl) Run(stop <-chan struct{}) {
292
}
293

294
func (ic *serviceImportCacheImpl) HasSynced() bool {
295
	return ic.serviceImports.HasSynced()
296
}
297

298
type disabledServiceImportCache struct{}
299

300
var _ serviceImportCache = disabledServiceImportCache{}
301

302
func (c disabledServiceImportCache) Run(stop <-chan struct{}) {}
303

304
func (c disabledServiceImportCache) HasSynced() bool {
305
	return true
306
}
307

308
func (c disabledServiceImportCache) ImportedServices() []importedService {
309
	// MCS is disabled - returning `nil`, which is semantically different here than an empty list.
310
	return nil
311
}
312

313
func (c disabledServiceImportCache) HasCRDInstalled() bool {
314
	return false
315
}
316

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.