istio
420 строк · 17.4 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// nolint: gocritic
16package ambient17
18import (19"net/netip"20
21v1 "k8s.io/api/core/v1"22metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"23
24networkingv1alpha3 "istio.io/api/networking/v1alpha3"25networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"26securityclient "istio.io/client-go/pkg/apis/security/v1beta1"27"istio.io/istio/pilot/pkg/features"28"istio.io/istio/pilot/pkg/model"29"istio.io/istio/pkg/config/constants"30"istio.io/istio/pkg/config/labels"31"istio.io/istio/pkg/config/schema/kind"32kubeutil "istio.io/istio/pkg/kube"33"istio.io/istio/pkg/kube/krt"34kubelabels "istio.io/istio/pkg/kube/labels"35"istio.io/istio/pkg/log"36"istio.io/istio/pkg/slices"37"istio.io/istio/pkg/spiffe"38"istio.io/istio/pkg/workloadapi"39)
40
41func (a *index) WorkloadsCollection(42Pods krt.Collection[*v1.Pod],43MeshConfig krt.Singleton[MeshConfig],44AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],45PeerAuths krt.Collection[*securityclient.PeerAuthentication],46Waypoints krt.Collection[Waypoint],47WorkloadServices krt.Collection[model.ServiceInfo],48WorkloadEntries krt.Collection[*networkingclient.WorkloadEntry],49ServiceEntries krt.Collection[*networkingclient.ServiceEntry],50AllPolicies krt.Collection[model.WorkloadAuthorization],51Namespaces krt.Collection[*v1.Namespace],52) krt.Collection[model.WorkloadInfo] {53PodWorkloads := krt.NewCollection(54Pods,55a.podWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, Namespaces),56krt.WithName("PodWorkloads"),57)58WorkloadEntryWorkloads := krt.NewCollection(59WorkloadEntries,60a.workloadEntryWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, WorkloadServices, Namespaces),61krt.WithName("WorkloadEntryWorkloads"),62)63ServiceEntryWorkloads := krt.NewManyCollection(ServiceEntries, func(ctx krt.HandlerContext, se *networkingclient.ServiceEntry) []model.WorkloadInfo {64if len(se.Spec.Endpoints) == 0 {65return nil66}67res := make([]model.WorkloadInfo, 0, len(se.Spec.Endpoints))68
69wp := fetchWaypoint(ctx, Waypoints, Namespaces, se.ObjectMeta)70
71// this is some partial object meta we can pass through so that WL found in the Endpoints72// may inherit the namespace scope waypoint from the SE... the Endpoints do not have real object meta73// and therefore they can't be annotated with wl scope waypoints right now74someObjectMeta := metav1.ObjectMeta{75Namespace: se.Namespace,76}77
78svc := slices.First(a.serviceEntriesInfo(se, wp))79if svc == nil {80// Not ready yet81return nil82}83for _, p := range se.Spec.Endpoints {84meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())85// We need to filter from the policies that are present, which apply to us.86// We only want label selector ones; global ones are not attached to the final WorkloadInfo87// In general we just take all of the policies88basePolicies := krt.Fetch(ctx, AllPolicies, krt.FilterSelects(se.Labels), krt.FilterGeneric(func(a any) bool {89return a.(model.WorkloadAuthorization).GetLabelSelector() != nil90}))91policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {92return t.ResourceName()93}))94// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice95auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, se.Namespace, p.Labels)96policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)97var waypoint *Waypoint98if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {99// Waypoints do not have waypoints, but anything else does100
101// this is using object meta which simply defines the namespace since the endpoint doesn't have it's own object meta102waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, someObjectMeta)103}104var waypointAddress *workloadapi.GatewayAddress105if waypoint != nil {106waypointAddress = a.getWaypointAddress(waypoint)107}108a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network109network := a.Network(p.Address, p.Labels).String()110if p.Network != "" {111network = p.Network112}113w := &workloadapi.Workload{114Uid: a.generateServiceEntryUID(se.Namespace, se.Name, p.Address),115Name: se.Name,116Namespace: se.Namespace,117Network: network,118ClusterId: string(a.ClusterID),119ServiceAccount: p.ServiceAccount,120Services: constructServicesFromWorkloadEntry(p, []model.ServiceInfo{*svc}),121AuthorizationPolicies: policies,122Status: workloadapi.WorkloadStatus_HEALTHY, // TODO: WE can be unhealthy123Waypoint: waypointAddress,124TrustDomain: pickTrustDomain(),125}126
127if addr, err := netip.ParseAddr(p.Address); err == nil {128w.Addresses = [][]byte{addr.AsSlice()}129} else {130log.Warnf("skipping workload entry %s/%s; DNS Address resolution is not yet implemented", se.Namespace, se.Name)131}132
133w.WorkloadName, w.WorkloadType = se.Name, workloadapi.WorkloadType_POD // XXX(shashankram): HACK to impersonate pod134w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(se.Labels, w.WorkloadName)135
136setTunnelProtocol(se.Labels, se.Annotations, w)137res = append(res, model.WorkloadInfo{Workload: w, Labels: se.Labels, Source: kind.WorkloadEntry, CreationTime: se.CreationTimestamp.Time})138}139return res140}, krt.WithName("ServiceEntryWorkloads"))141Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{PodWorkloads, WorkloadEntryWorkloads, ServiceEntryWorkloads}, krt.WithName("Workloads"))142return Workloads143}
144
145func (a *index) workloadEntryWorkloadBuilder(146MeshConfig krt.Singleton[MeshConfig],147AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],148PeerAuths krt.Collection[*securityclient.PeerAuthentication],149Waypoints krt.Collection[Waypoint],150WorkloadServices krt.Collection[model.ServiceInfo],151Namespaces krt.Collection[*v1.Namespace],152) func(ctx krt.HandlerContext, p *networkingclient.WorkloadEntry) *model.WorkloadInfo {153return func(ctx krt.HandlerContext, p *networkingclient.WorkloadEntry) *model.WorkloadInfo {154meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())155// We need to filter from the policies that are present, which apply to us.156// We only want label selector ones; global ones are not attached to the final WorkloadInfo157// In general we just take all of the policies158basePolicies := krt.Fetch(ctx, AuthorizationPolicies, krt.FilterSelects(p.Labels), krt.FilterGeneric(func(a any) bool {159return a.(model.WorkloadAuthorization).GetLabelSelector() != nil160}))161policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {162return t.ResourceName()163}))164// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice165auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, p.Namespace, p.Labels)166policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)167var waypoint *Waypoint168if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {169waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, p.ObjectMeta)170}171var waypointAddress *workloadapi.GatewayAddress172if waypoint != nil {173waypointAddress = a.getWaypointAddress(waypoint)174}175fo := []krt.FetchOption{krt.FilterNamespace(p.Namespace), krt.FilterSelectsNonEmpty(p.GetLabels())}176if !features.EnableK8SServiceSelectWorkloadEntries {177fo = append(fo, krt.FilterGeneric(func(a any) bool {178return a.(model.ServiceInfo).Source == kind.ServiceEntry179}))180}181services := krt.Fetch(ctx, WorkloadServices, fo...)182a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network183network := a.Network(p.Spec.Address, p.Labels).String()184if p.Spec.Network != "" {185network = p.Spec.Network186}187w := &workloadapi.Workload{188Uid: a.generateWorkloadEntryUID(p.Namespace, p.Name),189Name: p.Name,190Namespace: p.Namespace,191Network: network,192ClusterId: string(a.ClusterID),193ServiceAccount: p.Spec.ServiceAccount,194Services: constructServicesFromWorkloadEntry(&p.Spec, services),195AuthorizationPolicies: policies,196Status: workloadapi.WorkloadStatus_HEALTHY, // TODO: WE can be unhealthy197Waypoint: waypointAddress,198TrustDomain: pickTrustDomain(),199}200
201if addr, err := netip.ParseAddr(p.Spec.Address); err == nil {202w.Addresses = [][]byte{addr.AsSlice()}203} else {204log.Warnf("skipping workload entry %s/%s; DNS Address resolution is not yet implemented", p.Namespace, p.Name)205}206
207w.WorkloadName, w.WorkloadType = p.Name, workloadapi.WorkloadType_POD // XXX(shashankram): HACK to impersonate pod208w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(p.Labels, w.WorkloadName)209
210setTunnelProtocol(p.Labels, p.Annotations, w)211return &model.WorkloadInfo{Workload: w, Labels: p.Labels, Source: kind.WorkloadEntry, CreationTime: p.CreationTimestamp.Time}212}213}
214
215func (a *index) podWorkloadBuilder(216MeshConfig krt.Singleton[MeshConfig],217AuthorizationPolicies krt.Collection[model.WorkloadAuthorization],218PeerAuths krt.Collection[*securityclient.PeerAuthentication],219Waypoints krt.Collection[Waypoint],220WorkloadServices krt.Collection[model.ServiceInfo],221Namespaces krt.Collection[*v1.Namespace],222) func(ctx krt.HandlerContext, p *v1.Pod) *model.WorkloadInfo {223return func(ctx krt.HandlerContext, p *v1.Pod) *model.WorkloadInfo {224// Pod Is Pending but have a pod IP should be a valid workload, we should build it ,225// Such as the pod have initContainer which is initialing.226// See https://github.com/istio/istio/issues/48854227if (!IsPodRunning(p) && !IsPodPending(p)) || p.Spec.HostNetwork {228return nil229}230podIP, err := netip.ParseAddr(p.Status.PodIP)231if err != nil {232// Is this possible? Probably not in typical case, but anyone could put garbage there.233return nil234}235meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())236// We need to filter from the policies that are present, which apply to us.237// We only want label selector ones; global ones are not attached to the final WorkloadInfo238// In general we just take all of the policies239basePolicies := krt.Fetch(ctx, AuthorizationPolicies, krt.FilterSelects(p.Labels), krt.FilterGeneric(func(a any) bool {240return a.(model.WorkloadAuthorization).GetLabelSelector() != nil241}))242policies := slices.Sort(slices.Map(basePolicies, func(t model.WorkloadAuthorization) string {243return t.ResourceName()244}))245// We could do a non-FilterGeneric but krt currently blows up if we depend on the same collection twice246auths := fetchPeerAuthentications(ctx, PeerAuths, meshCfg, p.Namespace, p.Labels)247policies = append(policies, convertedSelectorPeerAuthentications(meshCfg.GetRootNamespace(), auths)...)248var waypoint *Waypoint249if p.Labels[constants.ManagedGatewayLabel] != constants.ManagedGatewayMeshControllerLabel {250// Waypoints do not have waypoints, but anything else does251waypoint = fetchWaypoint(ctx, Waypoints, Namespaces, p.ObjectMeta)252}253var waypointAddress *workloadapi.GatewayAddress254if waypoint != nil {255waypointAddress = a.getWaypointAddress(waypoint)256}257fo := []krt.FetchOption{krt.FilterNamespace(p.Namespace), krt.FilterSelectsNonEmpty(p.GetLabels())}258if !features.EnableServiceEntrySelectPods {259fo = append(fo, krt.FilterGeneric(func(a any) bool {260return a.(model.ServiceInfo).Source == kind.Service261}))262}263services := krt.Fetch(ctx, WorkloadServices, fo...)264status := workloadapi.WorkloadStatus_HEALTHY265if !IsPodReady(p) {266status = workloadapi.WorkloadStatus_UNHEALTHY267}268a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network269network := a.Network(p.Status.PodIP, p.Labels).String()270w := &workloadapi.Workload{271Uid: a.generatePodUID(p),272Name: p.Name,273Namespace: p.Namespace,274Network: network,275ClusterId: string(a.ClusterID),276Addresses: [][]byte{podIP.AsSlice()},277ServiceAccount: p.Spec.ServiceAccountName,278Node: p.Spec.NodeName,279Services: constructServices(p, services),280AuthorizationPolicies: policies,281Status: status,282Waypoint: waypointAddress,283TrustDomain: pickTrustDomain(),284}285
286w.WorkloadName, w.WorkloadType = workloadNameAndType(p)287w.CanonicalName, w.CanonicalRevision = kubelabels.CanonicalService(p.Labels, w.WorkloadName)288
289setTunnelProtocol(p.Labels, p.Annotations, w)290return &model.WorkloadInfo{Workload: w, Labels: p.Labels, Source: kind.Pod, CreationTime: p.CreationTimestamp.Time}291}292}
293
294func setTunnelProtocol(labels, annotations map[string]string, w *workloadapi.Workload) {295if annotations[constants.AmbientRedirection] == constants.AmbientRedirectionEnabled {296// Configured for override297w.TunnelProtocol = workloadapi.TunnelProtocol_HBONE298}299// Otherwise supports tunnel directly300if model.SupportsTunnel(labels, model.TunnelHTTP) {301w.TunnelProtocol = workloadapi.TunnelProtocol_HBONE302w.NativeTunnel = true303}304}
305
306func pickTrustDomain() string {307if td := spiffe.GetTrustDomain(); td != "cluster.local" {308return td309}310return ""311}
312
313func fetchPeerAuthentications(314ctx krt.HandlerContext,315PeerAuths krt.Collection[*securityclient.PeerAuthentication],316meshCfg *MeshConfig,317ns string,318matchLabels map[string]string,319) []*securityclient.PeerAuthentication {320return krt.Fetch(ctx, PeerAuths, krt.FilterGeneric(func(a any) bool {321pol := a.(*securityclient.PeerAuthentication)322if pol.Namespace == meshCfg.GetRootNamespace() && pol.Spec.Selector == nil {323return true324}325if pol.Namespace != ns {326return false327}328sel := pol.Spec.Selector329if sel == nil {330return true // No selector matches everything331}332return labels.Instance(sel.MatchLabels).SubsetOf(matchLabels)333}))334}
335
336func constructServicesFromWorkloadEntry(p *networkingv1alpha3.WorkloadEntry, services []model.ServiceInfo) map[string]*workloadapi.PortList {337res := map[string]*workloadapi.PortList{}338for _, svc := range services {339n := namespacedHostname(svc.Namespace, svc.Hostname)340pl := &workloadapi.PortList{}341res[n] = pl342for _, port := range svc.Ports {343targetPort := port.TargetPort344// Named targetPort has different semantics from Service vs ServiceEntry345if svc.Source == kind.Service {346// Service has explicit named targetPorts.347if named, f := svc.PortNames[int32(port.ServicePort)]; f && named.TargetPortName != "" {348// This port is a named target port, look it up349tv, ok := p.Ports[named.TargetPortName]350if !ok {351// We needed an explicit port, but didn't find one - skip this port352continue353}354targetPort = tv355}356} else {357// ServiceEntry has no explicit named targetPorts; targetPort only allows a number358// Instead, there is name matching between the port names359if named, f := svc.PortNames[int32(port.ServicePort)]; f {360// get port name or target port361tv, ok := p.Ports[named.PortName]362if ok {363// if we match one, override it. Otherwise, use the service port364targetPort = tv365} else if targetPort == 0 {366targetPort = port.ServicePort367}368}369}370pl.Ports = append(pl.Ports, &workloadapi.Port{371ServicePort: port.ServicePort,372TargetPort: targetPort,373})374}375}376return res377}
378
379func workloadNameAndType(pod *v1.Pod) (string, workloadapi.WorkloadType) {380objMeta, typeMeta := kubeutil.GetDeployMetaFromPod(pod)381switch typeMeta.Kind {382case "Deployment":383return objMeta.Name, workloadapi.WorkloadType_DEPLOYMENT384case "Job":385return objMeta.Name, workloadapi.WorkloadType_JOB386case "CronJob":387return objMeta.Name, workloadapi.WorkloadType_CRONJOB388default:389return pod.Name, workloadapi.WorkloadType_POD390}391}
392
393func constructServices(p *v1.Pod, services []model.ServiceInfo) map[string]*workloadapi.PortList {394res := map[string]*workloadapi.PortList{}395for _, svc := range services {396n := namespacedHostname(svc.Namespace, svc.Hostname)397pl := &workloadapi.PortList{}398res[n] = pl399for _, port := range svc.Ports {400targetPort := port.TargetPort401// The svc.Ports represents the workloadapi.Service, which drops the port name info and just has numeric target Port.402// TargetPort can be 0 which indicates its a named port. Check if its a named port and replace with the real targetPort if so.403if named, f := svc.PortNames[int32(port.ServicePort)]; f && named.TargetPortName != "" {404// Pods only match on TargetPort names405tp, ok := FindPortName(p, named.TargetPortName)406if !ok {407// Port not present for this workload. Exclude the port entirely408continue409}410targetPort = uint32(tp)411}412
413pl.Ports = append(pl.Ports, &workloadapi.Port{414ServicePort: port.ServicePort,415TargetPort: targetPort,416})417}418}419return res420}
421