istio
487 строк · 15.7 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package ambient16
17import (18"net/netip"19"strings"20
21v1 "k8s.io/api/core/v1"22"sigs.k8s.io/gateway-api/apis/v1beta1"23
24networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"25securityclient "istio.io/client-go/pkg/apis/security/v1beta1"26"istio.io/istio/pilot/pkg/model"27"istio.io/istio/pkg/cluster"28"istio.io/istio/pkg/config/constants"29"istio.io/istio/pkg/config/labels"30"istio.io/istio/pkg/config/schema/gvr"31"istio.io/istio/pkg/config/schema/kind"32kubeclient "istio.io/istio/pkg/kube"33"istio.io/istio/pkg/kube/kclient"34"istio.io/istio/pkg/kube/krt"35"istio.io/istio/pkg/kube/kubetypes"36"istio.io/istio/pkg/log"37"istio.io/istio/pkg/maps"38"istio.io/istio/pkg/network"39"istio.io/istio/pkg/slices"40"istio.io/istio/pkg/util/sets"41"istio.io/istio/pkg/workloadapi"42)
43
44type Index interface {45Lookup(key string) []model.AddressInfo46All() []model.AddressInfo47WorkloadsForWaypoint(key model.WaypointKey) []model.WorkloadInfo48ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo49Waypoint(network, address string) []netip.Addr50SyncAll()51model.AmbientIndexes52}
53
54var _ Index = &index{}55
56type workloadsCollection struct {57krt.Collection[model.WorkloadInfo]58ByAddress *krt.Index[model.WorkloadInfo, networkAddress]59ByServiceKey *krt.Index[model.WorkloadInfo, string]60ByOwningWaypoint *krt.Index[model.WorkloadInfo, networkAddress]61}
62
63type waypointsCollection struct {64krt.Collection[Waypoint]65}
66
67type servicesCollection struct {68krt.Collection[model.ServiceInfo]69ByAddress *krt.Index[model.ServiceInfo, networkAddress]70ByOwningWaypoint *krt.Index[model.ServiceInfo, networkAddress]71}
72
73// index maintains an index of ambient WorkloadInfo objects by various keys.
74// These are intentionally pre-computed based on events such that lookups are efficient.
75type index struct {76services servicesCollection
77workloads workloadsCollection
78waypoints waypointsCollection
79
80authorizationPolicies krt.Collection[model.WorkloadAuthorization]81networkUpdateTrigger *krt.RecomputeTrigger82
83SystemNamespace string84DomainSuffix string85ClusterID cluster.ID86XDSUpdater model.XDSUpdater87Network LookupNetwork
88}
89
90type Options struct {91Client kubeclient.Client92
93Revision string94SystemNamespace string95DomainSuffix string96ClusterID cluster.ID97XDSUpdater model.XDSUpdater98LookupNetwork LookupNetwork
99}
100
101func New(options Options) Index {102a := &index{103networkUpdateTrigger: krt.NewRecomputeTrigger(),104
105SystemNamespace: options.SystemNamespace,106DomainSuffix: options.DomainSuffix,107ClusterID: options.ClusterID,108XDSUpdater: options.XDSUpdater,109Network: options.LookupNetwork,110}111
112filter := kclient.Filter{113ObjectFilter: options.Client.ObjectFilter(),114}115ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"))116
117authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,118gvr.AuthorizationPolicy, kubetypes.StandardInformer, filter)119AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"))120
121peerAuths := kclient.NewDelayedInformer[*securityclient.PeerAuthentication](options.Client,122gvr.PeerAuthentication, kubetypes.StandardInformer, filter)123PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"))124
125serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,126gvr.ServiceEntry, kubetypes.StandardInformer, filter)127ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"))128
129workloadEntries := kclient.NewDelayedInformer[*networkingclient.WorkloadEntry](options.Client,130gvr.WorkloadEntry, kubetypes.StandardInformer, filter)131WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"))132
133gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)134Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"))135
136Services := krt.NewInformerFiltered[*v1.Service](options.Client, filter, krt.WithName("Services"))137Pods := krt.NewInformerFiltered[*v1.Pod](options.Client, kclient.Filter{138ObjectFilter: options.Client.ObjectFilter(),139ObjectTransform: kubeclient.StripPodUnusedFields,140}, krt.WithName("Pods"))141
142// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?143Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))144
145MeshConfig := MeshConfigCollection(ConfigMaps, options)146Waypoints := WaypointsCollection(Gateways)147
148// AllPolicies includes peer-authentication converted policies149AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig)150AllPolicies.RegisterBatch(PushXds(a.XDSUpdater, func(i model.WorkloadAuthorization) model.ConfigKey {151return model.ConfigKey{Kind: kind.AuthorizationPolicy, Name: i.Authorization.Name, Namespace: i.Authorization.Namespace}152}), false)153
154// these are workloadapi-style services combined from kube services and service entries155WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces)156ServiceAddressIndex := krt.CreateIndex[model.ServiceInfo, networkAddress](WorkloadServices, networkAddressFromService)157ServiceInfosByOwningWaypoint := krt.CreateIndex[model.ServiceInfo, networkAddress](WorkloadServices, func(s model.ServiceInfo) []networkAddress {158// Filter out waypoint services159if s.Labels[constants.ManagedGatewayLabel] == constants.ManagedGatewayMeshControllerLabel {160return nil161}162waypoint := s.Waypoint163if waypoint == nil {164return nil165}166waypointAddress := waypoint.GetAddress()167if waypointAddress == nil {168return nil169}170
171ip := waypointAddress.GetAddress()172netip, _ := netip.AddrFromSlice(ip)173netaddr := networkAddress{174network: waypointAddress.GetNetwork(),175ip: netip.String(),176}177return append(make([]networkAddress, 1), netaddr)178})179WorkloadServices.RegisterBatch(krt.BatchedEventFilter(180func(a model.ServiceInfo) *workloadapi.Service {181// Only trigger push if the XDS object changed; the rest is just for computation of others182return a.Service183},184PushXds(a.XDSUpdater, func(i model.ServiceInfo) model.ConfigKey {185return model.ConfigKey{Kind: kind.Address, Name: i.ResourceName()}186})), false)187
188Workloads := a.WorkloadsCollection(189Pods,190MeshConfig,191AuthorizationPolicies,192PeerAuths,193Waypoints,194WorkloadServices,195WorkloadEntries,196ServiceEntries,197AllPolicies,198Namespaces,199)200WorkloadAddressIndex := krt.CreateIndex[model.WorkloadInfo, networkAddress](Workloads, networkAddressFromWorkload)201WorkloadServiceIndex := krt.CreateIndex[model.WorkloadInfo, string](Workloads, func(o model.WorkloadInfo) []string {202return maps.Keys(o.Services)203})204WorkloadWaypointIndex := krt.CreateIndex[model.WorkloadInfo, networkAddress](Workloads, func(w model.WorkloadInfo) []networkAddress {205// Filter out waypoints.206if w.Labels[constants.ManagedGatewayLabel] == constants.ManagedGatewayMeshControllerLabel {207return nil208}209waypoint := w.Waypoint210if waypoint == nil {211return nil212}213waypointAddress := waypoint.GetAddress()214if waypointAddress == nil {215return nil216}217
218ip := waypointAddress.GetAddress()219netip, _ := netip.AddrFromSlice(ip)220netaddr := networkAddress{221network: waypointAddress.GetNetwork(),222ip: netip.String(),223}224return append(make([]networkAddress, 1), netaddr)225})226// Subtle: make sure we register the event after the Index are created. This ensures when we get the event, the index is populated.227Workloads.RegisterBatch(krt.BatchedEventFilter(228func(a model.WorkloadInfo) *workloadapi.Workload {229// Only trigger push if the XDS object changed; the rest is just for computation of others230return a.Workload231},232PushXds(a.XDSUpdater, func(i model.WorkloadInfo) model.ConfigKey {233return model.ConfigKey{Kind: kind.Address, Name: i.ResourceName()}234})), false)235
236a.workloads = workloadsCollection{237Collection: Workloads,238ByAddress: WorkloadAddressIndex,239ByServiceKey: WorkloadServiceIndex,240ByOwningWaypoint: WorkloadWaypointIndex,241}242a.services = servicesCollection{243Collection: WorkloadServices,244ByAddress: ServiceAddressIndex,245ByOwningWaypoint: ServiceInfosByOwningWaypoint,246}247a.waypoints = waypointsCollection{248Collection: Waypoints,249}250a.authorizationPolicies = AllPolicies251
252return a253}
254
255// Lookup finds all addresses associated with a given key. Many different key formats are supported; see inline comments.
256func (a *index) Lookup(key string) []model.AddressInfo {257// 1. Workload UID258if w := a.workloads.GetKey(krt.Key[model.WorkloadInfo](key)); w != nil {259return []model.AddressInfo{workloadToAddressInfo(w.Workload)}260}261
262network, ip, found := strings.Cut(key, "/")263if !found {264log.Warnf(`key (%v) did not contain the expected "/" character`, key)265return nil266}267networkAddr := networkAddress{network: network, ip: ip}268
269// 2. Workload by IP270if wls := a.workloads.ByAddress.Lookup(networkAddr); len(wls) > 0 {271// If there is just one, return it272if len(wls) == 1 {273return []model.AddressInfo{modelWorkloadToAddressInfo(wls[0])}274}275// Otherwise, try to find a pod - pods have precedence276pod := slices.FindFunc(wls, func(info model.WorkloadInfo) bool {277return info.Source == kind.Pod278})279if pod != nil {280return []model.AddressInfo{modelWorkloadToAddressInfo(*pod)}281}282// Otherwise just return the first one; all WorkloadEntry have the same weight283return []model.AddressInfo{modelWorkloadToAddressInfo(wls[0])}284}285
286// 3. Service287if svc := a.lookupService(key); svc != nil {288res := []model.AddressInfo{serviceToAddressInfo(svc.Service)}289for _, w := range a.workloads.ByServiceKey.Lookup(svc.ResourceName()) {290res = append(res, workloadToAddressInfo(w.Workload))291}292return res293}294return nil295}
296
297func (a *index) lookupService(key string) *model.ServiceInfo {298// 1. namespace/hostname format299s := a.services.GetKey(krt.Key[model.ServiceInfo](key))300if s != nil {301return s302}303
304// 2. network/ip format305network, ip, _ := strings.Cut(key, "/")306services := a.services.ByAddress.Lookup(networkAddress{307network: network,308ip: ip,309})310return slices.First(services)311}
312
313// All return all known workloads. Result is un-ordered
314func (a *index) All() []model.AddressInfo {315res := []model.AddressInfo{}316type kindindex struct {317k kind.Kind318index int319}320addrm := map[netip.Addr]kindindex{}321for _, wl := range a.workloads.List("") {322overwrite := -1323write := true324for _, addr := range wl.Addresses {325a := byteIPToAddr(addr)326if existing, f := addrm[a]; f {327// This address was already found. We want unique addresses in the result.328// Pod > WorkloadEntry329if wl.Source == kind.Pod && existing.k != kind.Pod {330overwrite = existing.index331addrm[a] = kindindex{332k: wl.Source,333index: overwrite,334}335} else {336write = false337}338}339}340if overwrite >= 0 {341res[overwrite] = workloadToAddressInfo(wl.Workload)342} else if write {343res = append(res, workloadToAddressInfo(wl.Workload))344for _, addr := range wl.Addresses {345a := byteIPToAddr(addr)346addrm[a] = kindindex{347k: wl.Source,348index: overwrite,349}350}351}352}353for _, s := range a.services.List("") {354res = append(res, serviceToAddressInfo(s.Service))355}356return res357}
358
359// AddressInformation returns all AddressInfo's in the cluster.
360// This may be scoped to specific subsets by specifying a non-empty addresses field
361func (a *index) AddressInformation(addresses sets.String) ([]model.AddressInfo, sets.String) {362if len(addresses) == 0 {363// Full update364return a.All(), nil365}366var res []model.AddressInfo367var removed []string368got := sets.New[string]()369for wname := range addresses {370wl := a.Lookup(wname)371if len(wl) == 0 {372removed = append(removed, wname)373} else {374for _, addr := range wl {375if !got.InsertContains(addr.ResourceName()) {376res = append(res, addr)377}378}379}380}381return res, sets.New(removed...)382}
383
384func (a *index) ServicesForWaypoint(key model.WaypointKey) []model.ServiceInfo {385return a.services.ByOwningWaypoint.Lookup(networkAddress{386network: key.Network,387ip: key.Addresses[0],388})389}
390
391func (a *index) WorkloadsForWaypoint(key model.WaypointKey) []model.WorkloadInfo {392// TODO: we should be able to handle multiple IPs or a hostname393if len(key.Addresses) == 0 {394return nil395}396workloads := a.workloads.ByOwningWaypoint.Lookup(networkAddress{397network: key.Network,398ip: key.Addresses[0],399})400workloads = model.SortWorkloadsByCreationTime(workloads)401return workloads402}
403
404func (a *index) Waypoint(network, address string) []netip.Addr {405res := sets.Set[netip.Addr]{}406networkAddr := networkAddress{407network: network,408ip: address,409}410addressInfos := a.Lookup(networkAddr.String())411for _, addressInfo := range addressInfos {412waypointAddress := addressInfo.GetService().GetWaypoint().GetAddress().GetAddress()413if a, ok := netip.AddrFromSlice(waypointAddress); ok {414res.Insert(a)415// This was a service, therefore it is not a workload and we can just move on416continue417}418
419waypointAddress = addressInfo.GetWorkload().GetWaypoint().GetAddress().GetAddress()420if a, ok := netip.AddrFromSlice(waypointAddress); ok {421res.Insert(a)422}423}424return res.UnsortedList()425}
426
427func (a *index) AdditionalPodSubscriptions(428proxy *model.Proxy,429allAddresses sets.String,430currentSubs sets.String,431) sets.String {432shouldSubscribe := sets.New[string]()433
434// First, we want to handle VIP subscriptions. Example:435// Client subscribes to VIP1. Pod1, part of VIP1, is sent.436// The client wouldn't be explicitly subscribed to Pod1, so it would normally ignore it.437// Since it is a part of VIP1 which we are subscribe to, add it to the subscriptions438for addr := range allAddresses {439for _, wl := range model.ExtractWorkloadsFromAddresses(a.Lookup(addr)) {440// We may have gotten an update for Pod, but are subscribed to a Service.441// We need to force a subscription on the Pod as well442for namespacedHostname := range wl.Services {443if currentSubs.Contains(namespacedHostname) {444shouldSubscribe.Insert(wl.ResourceName())445break446}447}448}449}450
451// Next, as an optimization, we will send all node-local endpoints452if nodeName := proxy.Metadata.NodeName; nodeName != "" {453for _, wl := range model.ExtractWorkloadsFromAddresses(a.All()) {454if wl.Node == nodeName {455n := wl.ResourceName()456if currentSubs.Contains(n) {457continue458}459shouldSubscribe.Insert(n)460}461}462}463
464return shouldSubscribe465}
466
467func (a *index) SyncAll() {468a.networkUpdateTrigger.TriggerRecomputation()469}
470
471type LookupNetwork func(endpointIP string, labels labels.Instance) network.ID472
473func PushXds[T any](xds model.XDSUpdater, f func(T) model.ConfigKey) func(events []krt.Event[T]) {474return func(events []krt.Event[T]) {475cu := sets.New[model.ConfigKey]()476for _, e := range events {477for _, i := range e.Items() {478cu.Insert(f(i))479}480}481xds.ConfigUpdate(&model.PushRequest{482Full: false,483ConfigsUpdated: cu,484Reason: model.NewReasonStats(model.AmbientUpdate),485})486}487}
488