istio

Форк
0
437 строк · 13.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package aggregate
16

17
import (
18
	"net/netip"
19
	"sync"
20

21
	"istio.io/istio/pilot/pkg/features"
22
	"istio.io/istio/pilot/pkg/model"
23
	"istio.io/istio/pilot/pkg/serviceregistry"
24
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
25
	"istio.io/istio/pkg/cluster"
26
	"istio.io/istio/pkg/config/host"
27
	"istio.io/istio/pkg/config/labels"
28
	"istio.io/istio/pkg/config/mesh"
29
	"istio.io/istio/pkg/log"
30
	"istio.io/istio/pkg/maps"
31
	"istio.io/istio/pkg/util/sets"
32
)
33

34
// The aggregate controller does not implement serviceregistry.Instance since it may be comprised of various
35
// providers and clusters.
36
var (
37
	_ model.ServiceDiscovery    = &Controller{}
38
	_ model.AggregateController = &Controller{}
39
)
40

41
// Controller aggregates data across different registries and monitors for changes
42
type Controller struct {
43
	meshHolder mesh.Holder
44

45
	// The lock is used to protect the registries and controller's running status.
46
	storeLock  sync.RWMutex
47
	registries []*registryEntry
48
	// indicates whether the controller has run.
49
	// if true, all the registries added later should be run manually.
50
	running bool
51

52
	handlers          model.ControllerHandlers
53
	handlersByCluster map[cluster.ID]*model.ControllerHandlers
54
	model.NetworkGatewaysHandler
55
}
56

57
func (c *Controller) ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo {
58
	if !features.EnableAmbientControllers {
59
		return nil
60
	}
61
	var res []model.ServiceInfo
62
	for _, p := range c.GetRegistries() {
63
		res = append(res, p.ServicesForWaypoint(key)...)
64
	}
65
	return res
66
}
67

68
func (c *Controller) Waypoint(network, address string) []netip.Addr {
69
	if !features.EnableAmbientWaypoints {
70
		return nil
71
	}
72
	var res []netip.Addr
73
	for _, p := range c.GetRegistries() {
74
		res = append(res, p.Waypoint(network, address)...)
75
	}
76
	return res
77
}
78

79
func (c *Controller) WorkloadsForWaypoint(key model.WaypointKey) []model.WorkloadInfo {
80
	if !features.EnableAmbientWaypoints {
81
		return nil
82
	}
83
	var res []model.WorkloadInfo
84
	for _, p := range c.GetRegistries() {
85
		res = append(res, p.WorkloadsForWaypoint(key)...)
86
	}
87
	return res
88
}
89

90
func (c *Controller) AdditionalPodSubscriptions(proxy *model.Proxy, addr, cur sets.String) sets.String {
91
	if !features.EnableAmbientControllers {
92
		return nil
93
	}
94
	res := sets.New[string]()
95
	for _, p := range c.GetRegistries() {
96
		res = res.Merge(p.AdditionalPodSubscriptions(proxy, addr, cur))
97
	}
98
	return res
99
}
100

101
func (c *Controller) Policies(requested sets.Set[model.ConfigKey]) []model.WorkloadAuthorization {
102
	var res []model.WorkloadAuthorization
103
	if !features.EnableAmbientControllers {
104
		return res
105
	}
106
	for _, p := range c.GetRegistries() {
107
		res = append(res, p.Policies(requested)...)
108
	}
109
	return res
110
}
111

112
func (c *Controller) AddressInformation(addresses sets.String) ([]model.AddressInfo, sets.String) {
113
	i := []model.AddressInfo{}
114
	if !features.EnableAmbientControllers {
115
		return i, nil
116
	}
117
	removed := sets.String{}
118
	for _, p := range c.GetRegistries() {
119
		wis, r := p.AddressInformation(addresses)
120
		i = append(i, wis...)
121
		removed.Merge(r)
122
	}
123
	// We may have 'removed' it in one registry but found it in another
124
	for _, wl := range i {
125
		// TODO(@hzxuzhonghu) This is not right for workload, we may search workload by ip, but the resource name is uid.
126
		if removed.Contains(wl.ResourceName()) {
127
			removed.Delete(wl.ResourceName())
128
		}
129
	}
130
	return i, removed
131
}
132

133
type registryEntry struct {
134
	serviceregistry.Instance
135
	// stop if not nil is the per-registry stop chan. If null, the server stop chan should be used to Run the registry.
136
	stop <-chan struct{}
137
}
138

139
type Options struct {
140
	MeshHolder mesh.Holder
141
}
142

143
// NewController creates a new Aggregate controller
144
func NewController(opt Options) *Controller {
145
	return &Controller{
146
		registries:        make([]*registryEntry, 0),
147
		meshHolder:        opt.MeshHolder,
148
		running:           false,
149
		handlersByCluster: map[cluster.ID]*model.ControllerHandlers{},
150
	}
151
}
152

153
func (c *Controller) addRegistry(registry serviceregistry.Instance, stop <-chan struct{}) {
154
	c.registries = append(c.registries, &registryEntry{Instance: registry, stop: stop})
155

156
	// Observe the registry for events.
157
	registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers)
158
	registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers)
159
	registry.AppendServiceHandler(func(prev, curr *model.Service, event model.Event) {
160
		for _, handlers := range c.getClusterHandlers() {
161
			handlers.NotifyServiceHandlers(prev, curr, event)
162
		}
163
	})
164
}
165

166
func (c *Controller) getClusterHandlers() []*model.ControllerHandlers {
167
	c.storeLock.Lock()
168
	defer c.storeLock.Unlock()
169
	return maps.Values(c.handlersByCluster)
170
}
171

172
// AddRegistry adds registries into the aggregated controller.
173
// If the aggregated controller is already Running, the given registry will never be started.
174
func (c *Controller) AddRegistry(registry serviceregistry.Instance) {
175
	c.storeLock.Lock()
176
	defer c.storeLock.Unlock()
177
	c.addRegistry(registry, nil)
178
}
179

180
// AddRegistryAndRun adds registries into the aggregated controller and makes sure it is Run.
181
// If the aggregated controller is running, the given registry is Run immediately.
182
// Otherwise, the given registry is Run when the aggregate controller is Run, using the given stop.
183
func (c *Controller) AddRegistryAndRun(registry serviceregistry.Instance, stop <-chan struct{}) {
184
	if stop == nil {
185
		log.Warnf("nil stop channel passed to AddRegistryAndRun for registry %s/%s", registry.Provider(), registry.Cluster())
186
	}
187
	c.storeLock.Lock()
188
	defer c.storeLock.Unlock()
189
	c.addRegistry(registry, stop)
190
	if c.running {
191
		go registry.Run(stop)
192
	}
193
}
194

195
// DeleteRegistry deletes specified registry from the aggregated controller
196
func (c *Controller) DeleteRegistry(clusterID cluster.ID, providerID provider.ID) {
197
	c.storeLock.Lock()
198
	defer c.storeLock.Unlock()
199

200
	if len(c.registries) == 0 {
201
		log.Warnf("Registry list is empty, nothing to delete")
202
		return
203
	}
204
	index, ok := c.getRegistryIndex(clusterID, providerID)
205
	if !ok {
206
		log.Warnf("Registry %s/%s is not found in the registries list, nothing to delete", providerID, clusterID)
207
		return
208
	}
209
	c.registries[index] = nil
210
	c.registries = append(c.registries[:index], c.registries[index+1:]...)
211
	log.Infof("%s registry for the cluster %s has been deleted.", providerID, clusterID)
212
}
213

214
// GetRegistries returns a copy of all registries
215
func (c *Controller) GetRegistries() []serviceregistry.Instance {
216
	c.storeLock.RLock()
217
	defer c.storeLock.RUnlock()
218

219
	// copy registries to prevent race, no need to deep copy here.
220
	out := make([]serviceregistry.Instance, len(c.registries))
221
	for i := range c.registries {
222
		out[i] = c.registries[i]
223
	}
224
	return out
225
}
226

227
func (c *Controller) getRegistryIndex(clusterID cluster.ID, provider provider.ID) (int, bool) {
228
	for i, r := range c.registries {
229
		if r.Cluster().Equals(clusterID) && r.Provider() == provider {
230
			return i, true
231
		}
232
	}
233
	return 0, false
234
}
235

236
// Services lists services from all platforms
237
func (c *Controller) Services() []*model.Service {
238
	// smap is a map of hostname (string) to service index, used to identify services that
239
	// are installed in multiple clusters.
240
	smap := make(map[host.Name]int)
241
	index := 0
242
	services := make([]*model.Service, 0)
243
	// Locking Registries list while walking it to prevent inconsistent results
244
	for _, r := range c.GetRegistries() {
245
		svcs := r.Services()
246
		if r.Provider() != provider.Kubernetes {
247
			index += len(svcs)
248
			services = append(services, svcs...)
249
		} else {
250
			for _, s := range svcs {
251
				previous, ok := smap[s.Hostname]
252
				if !ok {
253
					// First time we see a service. The result will have a single service per hostname
254
					// The first cluster will be listed first, so the services in the primary cluster
255
					// will be used for default settings. If a service appears in multiple clusters,
256
					// the order is less clear.
257
					smap[s.Hostname] = index
258
					index++
259
					services = append(services, s)
260
				} else {
261
					// We must deepcopy before merge, and after merging, the ClusterVips length will be >= 2.
262
					// This is an optimization to prevent deepcopy multi-times
263
					if services[previous].ClusterVIPs.Len() < 2 {
264
						// Deep copy before merging, otherwise there is a case
265
						// a service in remote cluster can be deleted, but the ClusterIP left.
266
						services[previous] = services[previous].DeepCopy()
267
					}
268
					// If it is seen second time, that means it is from a different cluster, update cluster VIPs.
269
					mergeService(services[previous], s, r)
270
				}
271
			}
272
		}
273
	}
274
	return services
275
}
276

277
// GetService retrieves a service by hostname if exists
278
func (c *Controller) GetService(hostname host.Name) *model.Service {
279
	var out *model.Service
280
	for _, r := range c.GetRegistries() {
281
		service := r.GetService(hostname)
282
		if service == nil {
283
			continue
284
		}
285
		if r.Provider() != provider.Kubernetes {
286
			return service
287
		}
288
		if out == nil {
289
			out = service.DeepCopy()
290
		} else {
291
			// If we are seeing the service for the second time, it means it is available in multiple clusters.
292
			mergeService(out, service, r)
293
		}
294
	}
295
	return out
296
}
297

298
// mergeService only merges two clusters' k8s services
299
func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance) {
300
	if !src.Ports.Equals(dst.Ports) {
301
		log.Debugf("service %s defined from cluster %s is different from others", src.Hostname, srcRegistry.Cluster())
302
	}
303
	// Prefer the k8s HostVIPs where possible
304
	clusterID := srcRegistry.Cluster()
305
	if len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 {
306
		newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
307
		dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
308
	}
309
}
310

311
// NetworkGateways merges the service-based cross-network gateways from each registry.
312
func (c *Controller) NetworkGateways() []model.NetworkGateway {
313
	var gws []model.NetworkGateway
314
	for _, r := range c.GetRegistries() {
315
		gws = append(gws, r.NetworkGateways()...)
316
	}
317
	return gws
318
}
319

320
func (c *Controller) MCSServices() []model.MCSServiceInfo {
321
	var out []model.MCSServiceInfo
322
	for _, r := range c.GetRegistries() {
323
		out = append(out, r.MCSServices()...)
324
	}
325
	return out
326
}
327

328
func nodeClusterID(node *model.Proxy) cluster.ID {
329
	if node.Metadata == nil || node.Metadata.ClusterID == "" {
330
		return ""
331
	}
332
	return node.Metadata.ClusterID
333
}
334

335
// Skip the service registry when there won't be a match
336
// because the proxy is in a different cluster.
337
func skipSearchingRegistryForProxy(nodeClusterID cluster.ID, r serviceregistry.Instance) bool {
338
	// Always search non-kube (usually serviceentry) registry.
339
	// Check every registry if cluster ID isn't specified.
340
	if r.Provider() != provider.Kubernetes || nodeClusterID == "" {
341
		return false
342
	}
343

344
	return !r.Cluster().Equals(nodeClusterID)
345
}
346

347
// GetProxyServiceTargets lists service instances co-located with a given proxy
348
func (c *Controller) GetProxyServiceTargets(node *model.Proxy) []model.ServiceTarget {
349
	out := make([]model.ServiceTarget, 0)
350
	nodeClusterID := nodeClusterID(node)
351
	for _, r := range c.GetRegistries() {
352
		if skipSearchingRegistryForProxy(nodeClusterID, r) {
353
			log.Debugf("GetProxyServiceTargets(): not searching registry %v: proxy %v CLUSTER_ID is %v",
354
				r.Cluster(), node.ID, nodeClusterID)
355
			continue
356
		}
357

358
		instances := r.GetProxyServiceTargets(node)
359
		if len(instances) > 0 {
360
			out = append(out, instances...)
361
		}
362
	}
363

364
	return out
365
}
366

367
func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
368
	clusterID := nodeClusterID(proxy)
369
	for _, r := range c.GetRegistries() {
370
		// If proxy clusterID unset, we may find incorrect workload label.
371
		// This can not happen in k8s env.
372
		if clusterID == "" || clusterID == r.Cluster() {
373
			lbls := r.GetProxyWorkloadLabels(proxy)
374
			if lbls != nil {
375
				return lbls
376
			}
377
		}
378
	}
379

380
	return nil
381
}
382

383
// Run starts all the controllers
384
func (c *Controller) Run(stop <-chan struct{}) {
385
	c.storeLock.Lock()
386
	for _, r := range c.registries {
387
		// prefer the per-registry stop channel
388
		registryStop := stop
389
		if s := r.stop; s != nil {
390
			registryStop = s
391
		}
392
		go r.Run(registryStop)
393
	}
394
	c.running = true
395
	c.storeLock.Unlock()
396

397
	<-stop
398
	log.Info("Registry Aggregator terminated")
399
}
400

401
// HasSynced returns true when all registries have synced
402
func (c *Controller) HasSynced() bool {
403
	for _, r := range c.GetRegistries() {
404
		if !r.HasSynced() {
405
			log.Debugf("registry %s is syncing", r.Cluster())
406
			return false
407
		}
408
	}
409
	return true
410
}
411

412
func (c *Controller) AppendServiceHandler(f model.ServiceHandler) {
413
	c.handlers.AppendServiceHandler(f)
414
}
415

416
func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) {
417
	// Currently, it is not used.
418
	// Note: take care when you want to enable it, it will register the handlers to all registries
419
	// c.handlers.AppendWorkloadHandler(f)
420
}
421

422
func (c *Controller) AppendServiceHandlerForCluster(id cluster.ID, f model.ServiceHandler) {
423
	c.storeLock.Lock()
424
	defer c.storeLock.Unlock()
425
	handler, ok := c.handlersByCluster[id]
426
	if !ok {
427
		c.handlersByCluster[id] = &model.ControllerHandlers{}
428
		handler = c.handlersByCluster[id]
429
	}
430
	handler.AppendServiceHandler(f)
431
}
432

433
func (c *Controller) UnRegisterHandlersForCluster(id cluster.ID) {
434
	c.storeLock.Lock()
435
	defer c.storeLock.Unlock()
436
	delete(c.handlersByCluster, id)
437
}
438

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.