istio

Форк
0
420 строк · 17.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// nolint: gocritic
16
package ambient
17

18
import (
19
	"net/netip"
20

21
	v1 "k8s.io/api/core/v1"
22
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23

24
	networkingv1alpha3 "istio.io/api/networking/v1alpha3"
25
	networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"
26
	securityclient "istio.io/client-go/pkg/apis/security/v1beta1"
27
	"istio.io/istio/pilot/pkg/features"
28
	"istio.io/istio/pilot/pkg/model"
29
	"istio.io/istio/pkg/config/constants"
30
	"istio.io/istio/pkg/config/labels"
31
	"istio.io/istio/pkg/config/schema/kind"
32
	kubeutil "istio.io/istio/pkg/kube"
33
	"istio.io/istio/pkg/kube/krt"
34
	kubelabels "istio.io/istio/pkg/kube/labels"
35
	"istio.io/istio/pkg/log"
36
	"istio.io/istio/pkg/slices"
37
	"istio.io/istio/pkg/spiffe"
38
	"istio.io/istio/pkg/workloadapi"
39
)
40

41
func (a *index) WorkloadsCollection(
42
	Pods krt.Collection[*v1.Pod],
43
	MeshConfig krt.Singleton[MeshConfig],
44
	AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],
45
	PeerAuths krt.Collection[*securityclient.PeerAuthentication],
46
	Waypoints krt.Collection[Waypoint],
47
	WorkloadServices krt.Collection[model.ServiceInfo],
48
	WorkloadEntries krt.Collection[*networkingclient.WorkloadEntry],
49
	ServiceEntries krt.Collection[*networkingclient.ServiceEntry],
50
	AllPolicies krt.Collection[model.WorkloadAuthorization],
51
	Namespaces krt.Collection[*v1.Namespace],
52
) krt.Collection[model.WorkloadInfo] {
53
	PodWorkloads := krt.NewCollection(
54
		Pods,
55
		a.podWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, Namespaces),
56
		krt.WithName("PodWorkloads"),
57
	)
58
	WorkloadEntryWorkloads := krt.NewCollection(
59
		WorkloadEntries,
60
		a.workloadEntryWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, Namespaces),
61
		krt.WithName("WorkloadEntryWorkloads"),
62
	)
63
	ServiceEntryWorkloads := krt.NewManyCollection(ServiceEntries, func(ctx krt.HandlerContext, se *networkingclient.ServiceEntry) []model.WorkloadInfo {
64
		if len(se.Spec.Endpoints) == 0 {
65
			return nil
66
		}
67
		res := make([]model.WorkloadInfo, 0, len(se.Spec.Endpoints))
68

69
		wp := fetchWaypoint(ctx, Waypoints, Namespaces, se.ObjectMeta)
70

71
		// this is some partial object meta we can pass through so that WL found in the Endpoints
72
		// may inherit the namespace scope waypoint from the SE... the Endpoints do not have real object meta
73
		// and therefore they can't be annotated with wl scope waypoints right now
74
		someObjectMeta := metav1.ObjectMeta{
75
			Namespace: se.Namespace,
76
		}
77

78
		svc := slices.First(a.serviceEntriesInfo(se, wp))
79
		if svc == nil {
80
			// Not ready yet
81
			return nil
82
		}
83
		for _, p := range se.Spec.Endpoints {
84
			meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())
85
			// We need to filter from the policies that are present, which apply to us.
86
			// We only want label selector ones; global ones are not attached to the final WorkloadInfo
87
			// In general we just take all of the policies
88
			basePolicies := krt.Fetch(ctx, AllPolicies, krt.FilterSelects(se.Labels), krt.FilterGeneric(func(a any) bool {
89
				return a.(model.WorkloadAuthorization).GetLabelSelector() != nil
90
			}))
91
			policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {
92
				return t.ResourceName()
93
			}))
94
			// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice
95
			auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, se.Namespace, p.Labels)
96
			policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)
97
			var waypoint *Waypoint
98
			if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {
99
				// Waypoints do not have waypoints, but anything else does
100

101
				// this is using object meta which simply defines the namespace since the endpoint doesn't have it's own object meta
102
				waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, someObjectMeta)
103
			}
104
			var waypointAddress *workloadapi.GatewayAddress
105
			if waypoint != nil {
106
				waypointAddress = a.getWaypointAddress(waypoint)
107
			}
108
			a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
109
			network := a.Network(p.Address, p.Labels).String()
110
			if p.Network != "" {
111
				network = p.Network
112
			}
113
			w := &workloadapi.Workload{
114
				Uid:                   a.generateServiceEntryUID(se.Namespace, se.Name, p.Address),
115
				Name:                  se.Name,
116
				Namespace:             se.Namespace,
117
				Network:               network,
118
				ClusterId:             string(a.ClusterID),
119
				ServiceAccount:        p.ServiceAccount,
120
				Services:              constructServicesFromWorkloadEntry(p, []model.ServiceInfo{*svc}),
121
				AuthorizationPolicies: policies,
122
				Status:                workloadapi.WorkloadStatus_HEALTHY, // TODO: WE can be unhealthy
123
				Waypoint:              waypointAddress,
124
				TrustDomain:           pickTrustDomain(),
125
			}
126

127
			if addr, err := netip.ParseAddr(p.Address); err == nil {
128
				w.Addresses = [][]byte{addr.AsSlice()}
129
			} else {
130
				log.Warnf("skipping workload entry %s/%s; DNS Address resolution is not yet implemented", se.Namespace, se.Name)
131
			}
132

133
			w.WorkloadName, w.WorkloadType = se.Name, workloadapi.WorkloadType_POD // XXX(shashankram): HACK to impersonate pod
134
			w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(se.Labels, w.WorkloadName)
135

136
			setTunnelProtocol(se.Labels, se.Annotations, w)
137
			res = append(res, model.WorkloadInfo{Workload: w, Labels: se.Labels, Source: kind.WorkloadEntry, CreationTime: se.CreationTimestamp.Time})
138
		}
139
		return res
140
	}, krt.WithName("ServiceEntryWorkloads"))
141
	Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{PodWorkloads, WorkloadEntryWorkloads, ServiceEntryWorkloads}, krt.WithName("Workloads"))
142
	return Workloads
143
}
144

145
func (a *index) workloadEntryWorkloadBuilder(
146
	MeshConfig krt.Singleton[MeshConfig],
147
	AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],
148
	PeerAuths krt.Collection[*securityclient.PeerAuthentication],
149
	Waypoints krt.Collection[Waypoint],
150
	WorkloadServices krt.Collection[model.ServiceInfo],
151
	Namespaces krt.Collection[*v1.Namespace],
152
) func(ctx krt.HandlerContext, p *networkingclient.WorkloadEntry) *model.WorkloadInfo {
153
	return func(ctx krt.HandlerContext, p *networkingclient.WorkloadEntry) *model.WorkloadInfo {
154
		meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())
155
		// We need to filter from the policies that are present, which apply to us.
156
		// We only want label selector ones; global ones are not attached to the final WorkloadInfo
157
		// In general we just take all of the policies
158
		basePolicies := krt.Fetch(ctx, AuthorizationPolicies, krt.FilterSelects(p.Labels), krt.FilterGeneric(func(a any) bool {
159
			return a.(model.WorkloadAuthorization).GetLabelSelector() != nil
160
		}))
161
		policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {
162
			return t.ResourceName()
163
		}))
164
		// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice
165
		auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, p.Namespace, p.Labels)
166
		policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)
167
		var waypoint *Waypoint
168
		if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {
169
			waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, p.ObjectMeta)
170
		}
171
		var waypointAddress *workloadapi.GatewayAddress
172
		if waypoint != nil {
173
			waypointAddress = a.getWaypointAddress(waypoint)
174
		}
175
		fo := []krt.FetchOption{krt.FilterNamespace(p.Namespace), krt.FilterSelectsNonEmpty(p.GetLabels())}
176
		if !features.EnableK8SServiceSelectWorkloadEntries {
177
			fo = append(fo, krt.FilterGeneric(func(a any) bool {
178
				return a.(model.ServiceInfo).Source == kind.ServiceEntry
179
			}))
180
		}
181
		services := krt.Fetch(ctx, WorkloadServices, fo...)
182
		a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
183
		network := a.Network(p.Spec.Address, p.Labels).String()
184
		if p.Spec.Network != "" {
185
			network = p.Spec.Network
186
		}
187
		w := &workloadapi.Workload{
188
			Uid:                   a.generateWorkloadEntryUID(p.Namespace, p.Name),
189
			Name:                  p.Name,
190
			Namespace:             p.Namespace,
191
			Network:               network,
192
			ClusterId:             string(a.ClusterID),
193
			ServiceAccount:        p.Spec.ServiceAccount,
194
			Services:              constructServicesFromWorkloadEntry(&p.Spec, services),
195
			AuthorizationPolicies: policies,
196
			Status:                workloadapi.WorkloadStatus_HEALTHY, // TODO: WE can be unhealthy
197
			Waypoint:              waypointAddress,
198
			TrustDomain:           pickTrustDomain(),
199
		}
200

201
		if addr, err := netip.ParseAddr(p.Spec.Address); err == nil {
202
			w.Addresses = [][]byte{addr.AsSlice()}
203
		} else {
204
			log.Warnf("skipping workload entry %s/%s; DNS Address resolution is not yet implemented", p.Namespace, p.Name)
205
		}
206

207
		w.WorkloadName, w.WorkloadType = p.Name, workloadapi.WorkloadType_POD // XXX(shashankram): HACK to impersonate pod
208
		w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(p.Labels, w.WorkloadName)
209

210
		setTunnelProtocol(p.Labels, p.Annotations, w)
211
		return &model.WorkloadInfo{Workload: w, Labels: p.Labels, Source: kind.WorkloadEntry, CreationTime: p.CreationTimestamp.Time}
212
	}
213
}
214

215
func (a *index) podWorkloadBuilder(
216
	MeshConfig krt.Singleton[MeshConfig],
217
	AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],
218
	PeerAuths krt.Collection[*securityclient.PeerAuthentication],
219
	Waypoints krt.Collection[Waypoint],
220
	WorkloadServices krt.Collection[model.ServiceInfo],
221
	Namespaces krt.Collection[*v1.Namespace],
222
) func(ctx krt.HandlerContext, p *v1.Pod) *model.WorkloadInfo {
223
	return func(ctx krt.HandlerContext, p *v1.Pod) *model.WorkloadInfo {
224
		// Pod Is Pending but have a pod IP should be a valid workload, we should build it ,
225
		// Such as the pod have initContainer which is initialing.
226
		// See https://github.com/istio/istio/issues/48854
227
		if (!IsPodRunning(p) && !IsPodPending(p)) || p.Spec.HostNetwork {
228
			return nil
229
		}
230
		podIP, err := netip.ParseAddr(p.Status.PodIP)
231
		if err != nil {
232
			// Is this possible? Probably not in typical case, but anyone could put garbage there.
233
			return nil
234
		}
235
		meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())
236
		// We need to filter from the policies that are present, which apply to us.
237
		// We only want label selector ones; global ones are not attached to the final WorkloadInfo
238
		// In general we just take all of the policies
239
		basePolicies := krt.Fetch(ctx, AuthorizationPolicies, krt.FilterSelects(p.Labels), krt.FilterGeneric(func(a any) bool {
240
			return a.(model.WorkloadAuthorization).GetLabelSelector() != nil
241
		}))
242
		policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {
243
			return t.ResourceName()
244
		}))
245
		// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice
246
		auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, p.Namespace, p.Labels)
247
		policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)
248
		var waypoint *Waypoint
249
		if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {
250
			// Waypoints do not have waypoints, but anything else does
251
			waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, p.ObjectMeta)
252
		}
253
		var waypointAddress *workloadapi.GatewayAddress
254
		if waypoint != nil {
255
			waypointAddress = a.getWaypointAddress(waypoint)
256
		}
257
		fo := []krt.FetchOption{krt.FilterNamespace(p.Namespace), krt.FilterSelectsNonEmpty(p.GetLabels())}
258
		if !features.EnableServiceEntrySelectPods {
259
			fo = append(fo, krt.FilterGeneric(func(a any) bool {
260
				return a.(model.ServiceInfo).Source == kind.Service
261
			}))
262
		}
263
		services := krt.Fetch(ctx, WorkloadServices, fo...)
264
		status := workloadapi.WorkloadStatus_HEALTHY
265
		if !IsPodReady(p) {
266
			status = workloadapi.WorkloadStatus_UNHEALTHY
267
		}
268
		a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
269
		network := a.Network(p.Status.PodIP, p.Labels).String()
270
		w := &workloadapi.Workload{
271
			Uid:                   a.generatePodUID(p),
272
			Name:                  p.Name,
273
			Namespace:             p.Namespace,
274
			Network:               network,
275
			ClusterId:             string(a.ClusterID),
276
			Addresses:             [][]byte{podIP.AsSlice()},
277
			ServiceAccount:        p.Spec.ServiceAccountName,
278
			Node:                  p.Spec.NodeName,
279
			Services:              constructServices(p, services),
280
			AuthorizationPolicies: policies,
281
			Status:                status,
282
			Waypoint:              waypointAddress,
283
			TrustDomain:           pickTrustDomain(),
284
		}
285

286
		w.WorkloadName, w.WorkloadType = workloadNameAndType(p)
287
		w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(p.Labels, w.WorkloadName)
288

289
		setTunnelProtocol(p.Labels, p.Annotations, w)
290
		return &model.WorkloadInfo{Workload: w, Labels: p.Labels, Source: kind.Pod, CreationTime: p.CreationTimestamp.Time}
291
	}
292
}
293

294
func setTunnelProtocol(labels, annotations map[string]string, w *workloadapi.Workload) {
295
	if annotations[constants.AmbientRedirection] == constants.AmbientRedirectionEnabled {
296
		// Configured for override
297
		w.TunnelProtocol = workloadapi.TunnelProtocol_HBONE
298
	}
299
	// Otherwise supports tunnel directly
300
	if model.SupportsTunnel(labels, model.TunnelHTTP) {
301
		w.TunnelProtocol = workloadapi.TunnelProtocol_HBONE
302
		w.NativeTunnel = true
303
	}
304
}
305

306
func pickTrustDomain() string {
307
	if td := spiffe.GetTrustDomain(); td != "cluster.local" {
308
		return td
309
	}
310
	return ""
311
}
312

313
func fetchPeerAuthentications(
314
	ctx krt.HandlerContext,
315
	PeerAuths krt.Collection[*securityclient.PeerAuthentication],
316
	meshCfg *MeshConfig,
317
	ns string,
318
	matchLabels map[string]string,
319
) []*securityclient.PeerAuthentication {
320
	return krt.Fetch(ctx, PeerAuths, krt.FilterGeneric(func(a any) bool {
321
		pol := a.(*securityclient.PeerAuthentication)
322
		if pol.Namespace == meshCfg.GetRootNamespace() && pol.Spec.Selector == nil {
323
			return true
324
		}
325
		if pol.Namespace != ns {
326
			return false
327
		}
328
		sel := pol.Spec.Selector
329
		if sel == nil {
330
			return true // No selector matches everything
331
		}
332
		return labels.Instance(sel.MatchLabels).SubsetOf(matchLabels)
333
	}))
334
}
335

336
func constructServicesFromWorkloadEntry(p *networkingv1alpha3.WorkloadEntry, services []model.ServiceInfo) map[string]*workloadapi.PortList {
337
	res := map[string]*workloadapi.PortList{}
338
	for _, svc := range services {
339
		n := namespacedHostname(svc.Namespace, svc.Hostname)
340
		pl := &workloadapi.PortList{}
341
		res[n] = pl
342
		for _, port := range svc.Ports {
343
			targetPort := port.TargetPort
344
			// Named targetPort has different semantics from Service vs ServiceEntry
345
			if svc.Source == kind.Service {
346
				// Service has explicit named targetPorts.
347
				if named, f := svc.PortNames[int32(port.ServicePort)]; f && named.TargetPortName != "" {
348
					// This port is a named target port, look it up
349
					tv, ok := p.Ports[named.TargetPortName]
350
					if !ok {
351
						// We needed an explicit port, but didn't find one - skip this port
352
						continue
353
					}
354
					targetPort = tv
355
				}
356
			} else {
357
				// ServiceEntry has no explicit named targetPorts; targetPort only allows a number
358
				// Instead, there is name matching between the port names
359
				if named, f := svc.PortNames[int32(port.ServicePort)]; f {
360
					// get port name or target port
361
					tv, ok := p.Ports[named.PortName]
362
					if ok {
363
						// if we match one, override it. Otherwise, use the service port
364
						targetPort = tv
365
					} else if targetPort == 0 {
366
						targetPort = port.ServicePort
367
					}
368
				}
369
			}
370
			pl.Ports = append(pl.Ports, &workloadapi.Port{
371
				ServicePort: port.ServicePort,
372
				TargetPort:  targetPort,
373
			})
374
		}
375
	}
376
	return res
377
}
378

379
func workloadNameAndType(pod *v1.Pod) (string, workloadapi.WorkloadType) {
380
	objMeta, typeMeta := kubeutil.GetDeployMetaFromPod(pod)
381
	switch typeMeta.Kind {
382
	case "Deployment":
383
		return objMeta.Name, workloadapi.WorkloadType_DEPLOYMENT
384
	case "Job":
385
		return objMeta.Name, workloadapi.WorkloadType_JOB
386
	case "CronJob":
387
		return objMeta.Name, workloadapi.WorkloadType_CRONJOB
388
	default:
389
		return pod.Name, workloadapi.WorkloadType_POD
390
	}
391
}
392

393
func constructServices(p *v1.Pod, services []model.ServiceInfo) map[string]*workloadapi.PortList {
394
	res := map[string]*workloadapi.PortList{}
395
	for _, svc := range services {
396
		n := namespacedHostname(svc.Namespace, svc.Hostname)
397
		pl := &workloadapi.PortList{}
398
		res[n] = pl
399
		for _, port := range svc.Ports {
400
			targetPort := port.TargetPort
401
			// The svc.Ports represents the workloadapi.Service, which drops the port name info and just has numeric target Port.
402
			// TargetPort can be 0 which indicates its a named port. Check if its a named port and replace with the real targetPort if so.
403
			if named, f := svc.PortNames[int32(port.ServicePort)]; f && named.TargetPortName != "" {
404
				// Pods only match on TargetPort names
405
				tp, ok := FindPortName(p, named.TargetPortName)
406
				if !ok {
407
					// Port not present for this workload. Exclude the port entirely
408
					continue
409
				}
410
				targetPort = uint32(tp)
411
			}
412

413
			pl.Ports = append(pl.Ports, &workloadapi.Port{
414
				ServicePort: port.ServicePort,
415
				TargetPort:  targetPort,
416
			})
417
		}
418
	}
419
	return res
420
}
421

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.