istio
161 строка · 5.5 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// nolint: gocritic
16package ambient
17
18import (
19v1 "k8s.io/api/core/v1"
20
21networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"
22"istio.io/istio/pilot/pkg/model"
23"istio.io/istio/pilot/pkg/serviceregistry/kube"
24"istio.io/istio/pkg/config"
25"istio.io/istio/pkg/config/schema/kind"
26"istio.io/istio/pkg/kube/krt"
27"istio.io/istio/pkg/log"
28"istio.io/istio/pkg/slices"
29"istio.io/istio/pkg/workloadapi"
30)
31
32func (a *index) ServicesCollection(
33Services krt.Collection[*v1.Service],
34ServiceEntries krt.Collection[*networkingclient.ServiceEntry],
35Waypoints krt.Collection[Waypoint],
36Namespaces krt.Collection[*v1.Namespace],
37) krt.Collection[model.ServiceInfo] {
38ServicesInfo := krt.NewCollection(Services, func(ctx krt.HandlerContext, s *v1.Service) *model.ServiceInfo {
39portNames := map[int32]model.ServicePortName{}
40for _, p := range s.Spec.Ports {
41portNames[p.Port] = model.ServicePortName{
42PortName: p.Name,
43TargetPortName: p.TargetPort.StrVal,
44}
45}
46waypoint := fetchWaypoint(ctx, Waypoints, Namespaces, s.ObjectMeta)
47a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
48return &model.ServiceInfo{
49Service: a.constructService(s, waypoint),
50PortNames: portNames,
51LabelSelector: model.NewSelector(s.Spec.Selector),
52Source: kind.Service,
53}
54}, krt.WithName("ServicesInfo"))
55ServiceEntriesInfo := krt.NewManyCollection(ServiceEntries, func(ctx krt.HandlerContext, s *networkingclient.ServiceEntry) []model.ServiceInfo {
56waypoint := fetchWaypoint(ctx, Waypoints, Namespaces, s.ObjectMeta)
57a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
58return a.serviceEntriesInfo(s, waypoint)
59}, krt.WithName("ServiceEntriesInfo"))
60WorkloadServices := krt.JoinCollection([]krt.Collection[model.ServiceInfo]{ServicesInfo, ServiceEntriesInfo}, krt.WithName("WorkloadServices"))
61// workloadapi services NOT workloads x services somehow
62return WorkloadServices
63}
64
65func (a *index) serviceEntriesInfo(s *networkingclient.ServiceEntry, w *Waypoint) []model.ServiceInfo {
66sel := model.NewSelector(s.Spec.GetWorkloadSelector().GetLabels())
67portNames := map[int32]model.ServicePortName{}
68for _, p := range s.Spec.Ports {
69portNames[int32(p.Number)] = model.ServicePortName{
70PortName: p.Name,
71}
72}
73return slices.Map(a.constructServiceEntries(s, w), func(e *workloadapi.Service) model.ServiceInfo {
74return model.ServiceInfo{
75Service: e,
76PortNames: portNames,
77LabelSelector: sel,
78Source: kind.ServiceEntry,
79}
80})
81}
82
83func (a *index) constructServiceEntries(svc *networkingclient.ServiceEntry, w *Waypoint) []*workloadapi.Service {
84addresses, err := slices.MapErr(svc.Spec.Addresses, a.toNetworkAddressFromCidr)
85if err != nil {
86// TODO: perhaps we should support CIDR in the future?
87return nil
88}
89ports := make([]*workloadapi.Port, 0, len(svc.Spec.Ports))
90for _, p := range svc.Spec.Ports {
91ports = append(ports, &workloadapi.Port{
92ServicePort: p.Number,
93TargetPort: p.TargetPort,
94})
95}
96
97// handle svc waypoint scenario
98var waypointAddress *workloadapi.GatewayAddress
99if w != nil {
100waypointAddress = a.getWaypointAddress(w)
101}
102
103// TODO this is only checking one controller - we may be missing service vips for instances in another cluster
104res := make([]*workloadapi.Service, 0, len(svc.Spec.Hosts))
105for _, h := range svc.Spec.Hosts {
106res = append(res, &workloadapi.Service{
107Name: svc.Name,
108Namespace: svc.Namespace,
109Hostname: h,
110Addresses: addresses,
111Ports: ports,
112Waypoint: waypointAddress,
113})
114}
115return res
116}
117
118func (a *index) constructService(svc *v1.Service, w *Waypoint) *workloadapi.Service {
119ports := make([]*workloadapi.Port, 0, len(svc.Spec.Ports))
120for _, p := range svc.Spec.Ports {
121ports = append(ports, &workloadapi.Port{
122ServicePort: uint32(p.Port),
123TargetPort: uint32(p.TargetPort.IntVal),
124})
125}
126
127addresses, err := slices.MapErr(getVIPs(svc), a.toNetworkAddress)
128if err != nil {
129log.Warnf("fail to parse service %v: %v", config.NamespacedName(svc), err)
130return nil
131}
132// handle svc waypoint scenario
133var waypointAddress *workloadapi.GatewayAddress
134if w != nil {
135waypointAddress = a.getWaypointAddress(w)
136}
137
138// TODO this is only checking one controller - we may be missing service vips for instances in another cluster
139return &workloadapi.Service{
140Name: svc.Name,
141Namespace: svc.Namespace,
142Hostname: string(kube.ServiceHostname(svc.Name, svc.Namespace, a.DomainSuffix)),
143Addresses: addresses,
144Ports: ports,
145Waypoint: waypointAddress,
146}
147}
148
149func getVIPs(svc *v1.Service) []string {
150res := []string{}
151if svc.Spec.ClusterIP != "" && svc.Spec.ClusterIP != v1.ClusterIPNone {
152res = append(res, svc.Spec.ClusterIP)
153}
154for _, ing := range svc.Status.LoadBalancer.Ingress {
155// IPs are strictly optional for loadbalancers - they may just have a hostname.
156if ing.IP != "" {
157res = append(res, ing.IP)
158}
159}
160return res
161}
162