istio

Форк
0
487 строк · 15.7 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package ambient
16

17
import (
18
	"net/netip"
19
	"strings"
20

21
	v1 "k8s.io/api/core/v1"
22
	"sigs.k8s.io/gateway-api/apis/v1beta1"
23

24
	networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"
25
	securityclient "istio.io/client-go/pkg/apis/security/v1beta1"
26
	"istio.io/istio/pilot/pkg/model"
27
	"istio.io/istio/pkg/cluster"
28
	"istio.io/istio/pkg/config/constants"
29
	"istio.io/istio/pkg/config/labels"
30
	"istio.io/istio/pkg/config/schema/gvr"
31
	"istio.io/istio/pkg/config/schema/kind"
32
	kubeclient "istio.io/istio/pkg/kube"
33
	"istio.io/istio/pkg/kube/kclient"
34
	"istio.io/istio/pkg/kube/krt"
35
	"istio.io/istio/pkg/kube/kubetypes"
36
	"istio.io/istio/pkg/log"
37
	"istio.io/istio/pkg/maps"
38
	"istio.io/istio/pkg/network"
39
	"istio.io/istio/pkg/slices"
40
	"istio.io/istio/pkg/util/sets"
41
	"istio.io/istio/pkg/workloadapi"
42
)
43

44
type Index interface {
45
	Lookup(key string) []model.AddressInfo
46
	All() []model.AddressInfo
47
	WorkloadsForWaypoint(key model.WaypointKey) []model.WorkloadInfo
48
	ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo
49
	Waypoint(network, address string) []netip.Addr
50
	SyncAll()
51
	model.AmbientIndexes
52
}
53

54
var _ Index = &index{}
55

56
type workloadsCollection struct {
57
	krt.Collection[model.WorkloadInfo]
58
	ByAddress        *krt.Index[model.WorkloadInfo, networkAddress]
59
	ByServiceKey     *krt.Index[model.WorkloadInfo, string]
60
	ByOwningWaypoint *krt.Index[model.WorkloadInfo, networkAddress]
61
}
62

63
type waypointsCollection struct {
64
	krt.Collection[Waypoint]
65
}
66

67
type servicesCollection struct {
68
	krt.Collection[model.ServiceInfo]
69
	ByAddress        *krt.Index[model.ServiceInfo, networkAddress]
70
	ByOwningWaypoint *krt.Index[model.ServiceInfo, networkAddress]
71
}
72

73
// index maintains an index of ambient WorkloadInfo objects by various keys.
74
// These are intentionally pre-computed based on events such that lookups are efficient.
75
type index struct {
76
	services  servicesCollection
77
	workloads workloadsCollection
78
	waypoints waypointsCollection
79

80
	authorizationPolicies krt.Collection[model.WorkloadAuthorization]
81
	networkUpdateTrigger  *krt.RecomputeTrigger
82

83
	SystemNamespace string
84
	DomainSuffix    string
85
	ClusterID       cluster.ID
86
	XDSUpdater      model.XDSUpdater
87
	Network         LookupNetwork
88
}
89

90
type Options struct {
91
	Client kubeclient.Client
92

93
	Revision        string
94
	SystemNamespace string
95
	DomainSuffix    string
96
	ClusterID       cluster.ID
97
	XDSUpdater      model.XDSUpdater
98
	LookupNetwork   LookupNetwork
99
}
100

101
func New(options Options) Index {
102
	a := &index{
103
		networkUpdateTrigger: krt.NewRecomputeTrigger(),
104

105
		SystemNamespace: options.SystemNamespace,
106
		DomainSuffix:    options.DomainSuffix,
107
		ClusterID:       options.ClusterID,
108
		XDSUpdater:      options.XDSUpdater,
109
		Network:         options.LookupNetwork,
110
	}
111

112
	filter := kclient.Filter{
113
		ObjectFilter: options.Client.ObjectFilter(),
114
	}
115
	ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"))
116

117
	authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,
118
		gvr.AuthorizationPolicy, kubetypes.StandardInformer, filter)
119
	AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"))
120

121
	peerAuths := kclient.NewDelayedInformer[*securityclient.PeerAuthentication](options.Client,
122
		gvr.PeerAuthentication, kubetypes.StandardInformer, filter)
123
	PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"))
124

125
	serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,
126
		gvr.ServiceEntry, kubetypes.StandardInformer, filter)
127
	ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"))
128

129
	workloadEntries := kclient.NewDelayedInformer[*networkingclient.WorkloadEntry](options.Client,
130
		gvr.WorkloadEntry, kubetypes.StandardInformer, filter)
131
	WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"))
132

133
	gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)
134
	Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"))
135

136
	Services := krt.NewInformerFiltered[*v1.Service](options.Client, filter, krt.WithName("Services"))
137
	Pods := krt.NewInformerFiltered[*v1.Pod](options.Client, kclient.Filter{
138
		ObjectFilter:    options.Client.ObjectFilter(),
139
		ObjectTransform: kubeclient.StripPodUnusedFields,
140
	}, krt.WithName("Pods"))
141

142
	// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
143
	Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))
144

145
	MeshConfig := MeshConfigCollection(ConfigMaps, options)
146
	Waypoints := WaypointsCollection(Gateways)
147

148
	// AllPolicies includes peer-authentication converted policies
149
	AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig)
150
	AllPolicies.RegisterBatch(PushXds(a.XDSUpdater, func(i model.WorkloadAuthorization) model.ConfigKey {
151
		return model.ConfigKey{Kind: kind.AuthorizationPolicy, Name: i.Authorization.Name, Namespace: i.Authorization.Namespace}
152
	}), false)
153

154
	// these are workloadapi-style services combined from kube services and service entries
155
	WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces)
156
	ServiceAddressIndex := krt.CreateIndex[model.ServiceInfo, networkAddress](WorkloadServices, networkAddressFromService)
157
	ServiceInfosByOwningWaypoint := krt.CreateIndex[model.ServiceInfo, networkAddress](WorkloadServices, func(s model.ServiceInfo) []networkAddress {
158
		// Filter out waypoint services
159
		if s.Labels[constants.ManagedGatewayLabel] == constants.ManagedGatewayMeshControllerLabel {
160
			return nil
161
		}
162
		waypoint := s.Waypoint
163
		if waypoint == nil {
164
			return nil
165
		}
166
		waypointAddress := waypoint.GetAddress()
167
		if waypointAddress == nil {
168
			return nil
169
		}
170

171
		ip := waypointAddress.GetAddress()
172
		netip, _ := netip.AddrFromSlice(ip)
173
		netaddr := networkAddress{
174
			network: waypointAddress.GetNetwork(),
175
			ip:      netip.String(),
176
		}
177
		return append(make([]networkAddress, 1), netaddr)
178
	})
179
	WorkloadServices.RegisterBatch(krt.BatchedEventFilter(
180
		func(a model.ServiceInfo) *workloadapi.Service {
181
			// Only trigger push if the XDS object changed; the rest is just for computation of others
182
			return a.Service
183
		},
184
		PushXds(a.XDSUpdater, func(i model.ServiceInfo) model.ConfigKey {
185
			return model.ConfigKey{Kind: kind.Address, Name: i.ResourceName()}
186
		})), false)
187

188
	Workloads := a.WorkloadsCollection(
189
		Pods,
190
		MeshConfig,
191
		AuthorizationPolicies,
192
		PeerAuths,
193
		Waypoints,
194
		WorkloadServices,
195
		WorkloadEntries,
196
		ServiceEntries,
197
		AllPolicies,
198
		Namespaces,
199
	)
200
	WorkloadAddressIndex := krt.CreateIndex[model.WorkloadInfo, networkAddress](Workloads, networkAddressFromWorkload)
201
	WorkloadServiceIndex := krt.CreateIndex[model.WorkloadInfo, string](Workloads, func(o model.WorkloadInfo) []string {
202
		return maps.Keys(o.Services)
203
	})
204
	WorkloadWaypointIndex := krt.CreateIndex[model.WorkloadInfo, networkAddress](Workloads, func(w model.WorkloadInfo) []networkAddress {
205
		// Filter out waypoints.
206
		if w.Labels[constants.ManagedGatewayLabel] == constants.ManagedGatewayMeshControllerLabel {
207
			return nil
208
		}
209
		waypoint := w.Waypoint
210
		if waypoint == nil {
211
			return nil
212
		}
213
		waypointAddress := waypoint.GetAddress()
214
		if waypointAddress == nil {
215
			return nil
216
		}
217

218
		ip := waypointAddress.GetAddress()
219
		netip, _ := netip.AddrFromSlice(ip)
220
		netaddr := networkAddress{
221
			network: waypointAddress.GetNetwork(),
222
			ip:      netip.String(),
223
		}
224
		return append(make([]networkAddress, 1), netaddr)
225
	})
226
	// Subtle: make sure we register the event after the Index are created. This ensures when we get the event, the index is populated.
227
	Workloads.RegisterBatch(krt.BatchedEventFilter(
228
		func(a model.WorkloadInfo) *workloadapi.Workload {
229
			// Only trigger push if the XDS object changed; the rest is just for computation of others
230
			return a.Workload
231
		},
232
		PushXds(a.XDSUpdater, func(i model.WorkloadInfo) model.ConfigKey {
233
			return model.ConfigKey{Kind: kind.Address, Name: i.ResourceName()}
234
		})), false)
235

236
	a.workloads = workloadsCollection{
237
		Collection:       Workloads,
238
		ByAddress:        WorkloadAddressIndex,
239
		ByServiceKey:     WorkloadServiceIndex,
240
		ByOwningWaypoint: WorkloadWaypointIndex,
241
	}
242
	a.services = servicesCollection{
243
		Collection:       WorkloadServices,
244
		ByAddress:        ServiceAddressIndex,
245
		ByOwningWaypoint: ServiceInfosByOwningWaypoint,
246
	}
247
	a.waypoints = waypointsCollection{
248
		Collection: Waypoints,
249
	}
250
	a.authorizationPolicies = AllPolicies
251

252
	return a
253
}
254

255
// Lookup finds all addresses associated with a given key. Many different key formats are supported; see inline comments.
256
func (a *index) Lookup(key string) []model.AddressInfo {
257
	// 1. Workload UID
258
	if w := a.workloads.GetKey(krt.Key[model.WorkloadInfo](key)); w != nil {
259
		return []model.AddressInfo{workloadToAddressInfo(w.Workload)}
260
	}
261

262
	network, ip, found := strings.Cut(key, "/")
263
	if !found {
264
		log.Warnf(`key (%v) did not contain the expected "/" character`, key)
265
		return nil
266
	}
267
	networkAddr := networkAddress{network: network, ip: ip}
268

269
	// 2. Workload by IP
270
	if wls := a.workloads.ByAddress.Lookup(networkAddr); len(wls) > 0 {
271
		// If there is just one, return it
272
		if len(wls) == 1 {
273
			return []model.AddressInfo{modelWorkloadToAddressInfo(wls[0])}
274
		}
275
		// Otherwise, try to find a pod - pods have precedence
276
		pod := slices.FindFunc(wls, func(info model.WorkloadInfo) bool {
277
			return info.Source == kind.Pod
278
		})
279
		if pod != nil {
280
			return []model.AddressInfo{modelWorkloadToAddressInfo(*pod)}
281
		}
282
		// Otherwise just return the first one; all WorkloadEntry have the same weight
283
		return []model.AddressInfo{modelWorkloadToAddressInfo(wls[0])}
284
	}
285

286
	// 3. Service
287
	if svc := a.lookupService(key); svc != nil {
288
		res := []model.AddressInfo{serviceToAddressInfo(svc.Service)}
289
		for _, w := range a.workloads.ByServiceKey.Lookup(svc.ResourceName()) {
290
			res = append(res, workloadToAddressInfo(w.Workload))
291
		}
292
		return res
293
	}
294
	return nil
295
}
296

297
func (a *index) lookupService(key string) *model.ServiceInfo {
298
	// 1. namespace/hostname format
299
	s := a.services.GetKey(krt.Key[model.ServiceInfo](key))
300
	if s != nil {
301
		return s
302
	}
303

304
	// 2. network/ip format
305
	network, ip, _ := strings.Cut(key, "/")
306
	services := a.services.ByAddress.Lookup(networkAddress{
307
		network: network,
308
		ip:      ip,
309
	})
310
	return slices.First(services)
311
}
312

313
// All return all known workloads. Result is un-ordered
314
func (a *index) All() []model.AddressInfo {
315
	res := []model.AddressInfo{}
316
	type kindindex struct {
317
		k     kind.Kind
318
		index int
319
	}
320
	addrm := map[netip.Addr]kindindex{}
321
	for _, wl := range a.workloads.List("") {
322
		overwrite := -1
323
		write := true
324
		for _, addr := range wl.Addresses {
325
			a := byteIPToAddr(addr)
326
			if existing, f := addrm[a]; f {
327
				// This address was already found. We want unique addresses in the result.
328
				// Pod > WorkloadEntry
329
				if wl.Source == kind.Pod && existing.k != kind.Pod {
330
					overwrite = existing.index
331
					addrm[a] = kindindex{
332
						k:     wl.Source,
333
						index: overwrite,
334
					}
335
				} else {
336
					write = false
337
				}
338
			}
339
		}
340
		if overwrite >= 0 {
341
			res[overwrite] = workloadToAddressInfo(wl.Workload)
342
		} else if write {
343
			res = append(res, workloadToAddressInfo(wl.Workload))
344
			for _, addr := range wl.Addresses {
345
				a := byteIPToAddr(addr)
346
				addrm[a] = kindindex{
347
					k:     wl.Source,
348
					index: overwrite,
349
				}
350
			}
351
		}
352
	}
353
	for _, s := range a.services.List("") {
354
		res = append(res, serviceToAddressInfo(s.Service))
355
	}
356
	return res
357
}
358

359
// AddressInformation returns all AddressInfo's in the cluster.
360
// This may be scoped to specific subsets by specifying a non-empty addresses field
361
func (a *index) AddressInformation(addresses sets.String) ([]model.AddressInfo, sets.String) {
362
	if len(addresses) == 0 {
363
		// Full update
364
		return a.All(), nil
365
	}
366
	var res []model.AddressInfo
367
	var removed []string
368
	got := sets.New[string]()
369
	for wname := range addresses {
370
		wl := a.Lookup(wname)
371
		if len(wl) == 0 {
372
			removed = append(removed, wname)
373
		} else {
374
			for _, addr := range wl {
375
				if !got.InsertContains(addr.ResourceName()) {
376
					res = append(res, addr)
377
				}
378
			}
379
		}
380
	}
381
	return res, sets.New(removed...)
382
}
383

384
func (a *index) ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo {
385
	return a.services.ByOwningWaypoint.Lookup(networkAddress{
386
		network: key.Network,
387
		ip:      key.Addresses[0],
388
	})
389
}
390

391
func (a *index) WorkloadsForWaypoint(key model.WaypointKey) []model.WorkloadInfo {
392
	// TODO: we should be able to handle multiple IPs or a hostname
393
	if len(key.Addresses) == 0 {
394
		return nil
395
	}
396
	workloads := a.workloads.ByOwningWaypoint.Lookup(networkAddress{
397
		network: key.Network,
398
		ip:      key.Addresses[0],
399
	})
400
	workloads = model.SortWorkloadsByCreationTime(workloads)
401
	return workloads
402
}
403

404
func (a *index) Waypoint(network, address string) []netip.Addr {
405
	res := sets.Set[netip.Addr]{}
406
	networkAddr := networkAddress{
407
		network: network,
408
		ip:      address,
409
	}
410
	addressInfos := a.Lookup(networkAddr.String())
411
	for _, addressInfo := range addressInfos {
412
		waypointAddress := addressInfo.GetService().GetWaypoint().GetAddress().GetAddress()
413
		if a, ok := netip.AddrFromSlice(waypointAddress); ok {
414
			res.Insert(a)
415
			// This was a service, therefore it is not a workload and we can just move on
416
			continue
417
		}
418

419
		waypointAddress = addressInfo.GetWorkload().GetWaypoint().GetAddress().GetAddress()
420
		if a, ok := netip.AddrFromSlice(waypointAddress); ok {
421
			res.Insert(a)
422
		}
423
	}
424
	return res.UnsortedList()
425
}
426

427
func (a *index) AdditionalPodSubscriptions(
428
	proxy *model.Proxy,
429
	allAddresses sets.String,
430
	currentSubs sets.String,
431
) sets.String {
432
	shouldSubscribe := sets.New[string]()
433

434
	// First, we want to handle VIP subscriptions. Example:
435
	// Client subscribes to VIP1. Pod1, part of VIP1, is sent.
436
	// The client wouldn't be explicitly subscribed to Pod1, so it would normally ignore it.
437
	// Since it is a part of VIP1 which we are subscribe to, add it to the subscriptions
438
	for addr := range allAddresses {
439
		for _, wl := range model.ExtractWorkloadsFromAddresses(a.Lookup(addr)) {
440
			// We may have gotten an update for Pod, but are subscribed to a Service.
441
			// We need to force a subscription on the Pod as well
442
			for namespacedHostname := range wl.Services {
443
				if currentSubs.Contains(namespacedHostname) {
444
					shouldSubscribe.Insert(wl.ResourceName())
445
					break
446
				}
447
			}
448
		}
449
	}
450

451
	// Next, as an optimization, we will send all node-local endpoints
452
	if nodeName := proxy.Metadata.NodeName; nodeName != "" {
453
		for _, wl := range model.ExtractWorkloadsFromAddresses(a.All()) {
454
			if wl.Node == nodeName {
455
				n := wl.ResourceName()
456
				if currentSubs.Contains(n) {
457
					continue
458
				}
459
				shouldSubscribe.Insert(n)
460
			}
461
		}
462
	}
463

464
	return shouldSubscribe
465
}
466

467
func (a *index) SyncAll() {
468
	a.networkUpdateTrigger.TriggerRecomputation()
469
}
470

471
type LookupNetwork func(endpointIP string, labels labels.Instance) network.ID
472

473
func PushXds[T any](xds model.XDSUpdater, f func(T) model.ConfigKey) func(events []krt.Event[T]) {
474
	return func(events []krt.Event[T]) {
475
		cu := sets.New[model.ConfigKey]()
476
		for _, e := range events {
477
			for _, i := range e.Items() {
478
				cu.Insert(f(i))
479
			}
480
		}
481
		xds.ConfigUpdate(&model.PushRequest{
482
			Full:           false,
483
			ConfigsUpdated: cu,
484
			Reason:         model.NewReasonStats(model.AmbientUpdate),
485
		})
486
	}
487
}
488

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.