istio
470 строк · 15.4 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"net"
19"strconv"
20"sync"
21
22"github.com/yl2chen/cidranger"
23v1 "k8s.io/api/core/v1"
24metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25"k8s.io/apimachinery/pkg/types"
26"sigs.k8s.io/gateway-api/apis/v1beta1"
27
28"istio.io/api/label"
29"istio.io/istio/pilot/pkg/features"
30"istio.io/istio/pilot/pkg/model"
31"istio.io/istio/pilot/pkg/serviceregistry/kube"
32"istio.io/istio/pkg/cluster"
33"istio.io/istio/pkg/config/constants"
34"istio.io/istio/pkg/config/host"
35"istio.io/istio/pkg/config/mesh"
36"istio.io/istio/pkg/config/schema/gvr"
37"istio.io/istio/pkg/kube/kclient"
38"istio.io/istio/pkg/kube/kubetypes"
39"istio.io/istio/pkg/network"
40"istio.io/istio/pkg/slices"
41)
42
43type networkManager struct {
44sync.RWMutex
45// CIDR ranger based on path-compressed prefix trie
46ranger cidranger.Ranger
47clusterID cluster.ID
48
49gatewayResourceClient kclient.Informer[*v1beta1.Gateway]
50meshNetworksWatcher mesh.NetworksWatcher
51
52// Network name for to be used when the meshNetworks fromRegistry nor network label on pod is specified
53// This is defined by a topology.istio.io/network label on the system namespace.
54network network.ID
55// Network name for the registry as specified by the MeshNetworks configmap
56networkFromMeshConfig network.ID
57// map of svc fqdn to partially built network gateways; the actual gateways will be built from these into networkGatewaysBySvc
58// this map just enumerates which networks/ports each Service is a gateway for
59registryServiceNameGateways map[host.Name][]model.NetworkGateway
60// gateways for each service
61networkGatewaysBySvc map[host.Name]model.NetworkGatewaySet
62// gateways from kubernetes Gateway resources
63gatewaysFromResource map[types.UID]model.NetworkGatewaySet
64// we don't want to discover gateways with class "istio-remote" from outside cluster's API servers.
65discoverRemoteGatewayResources bool
66
67// implements NetworkGatewaysWatcher; we need to call c.NotifyGatewayHandlers when our gateways change
68model.NetworkGatewaysHandler
69}
70
71func initNetworkManager(c *Controller, options Options) *networkManager {
72n := &networkManager{
73clusterID: options.ClusterID,
74meshNetworksWatcher: options.MeshNetworksWatcher,
75// zero values are a workaround structcheck issue: https://github.com/golangci/golangci-lint/issues/826
76ranger: nil,
77network: "",
78networkFromMeshConfig: "",
79registryServiceNameGateways: make(map[host.Name][]model.NetworkGateway),
80networkGatewaysBySvc: make(map[host.Name]model.NetworkGatewaySet),
81gatewaysFromResource: make(map[types.UID]model.NetworkGatewaySet),
82discoverRemoteGatewayResources: options.ConfigCluster,
83}
84// initialize the gateway resource client when any feature that uses it is enabled
85if features.MultiNetworkGatewayAPI {
86n.gatewayResourceClient = kclient.NewDelayedInformer[*v1beta1.Gateway](c.client, gvr.KubernetesGateway, kubetypes.StandardInformer, kubetypes.Filter{})
87}
88if features.MultiNetworkGatewayAPI {
89// conditionally register this handler
90registerHandlers(c, n.gatewayResourceClient, "Gateways", n.handleGatewayResource, nil)
91}
92return n
93}
94
95// setNetworkFromNamespace sets network got from system namespace, returns whether it has changed
96func (n *networkManager) setNetworkFromNamespace(ns *v1.Namespace) bool {
97nw := ns.Labels[label.TopologyNetwork.Name]
98n.Lock()
99defer n.Unlock()
100oldDefaultNetwork := n.network
101n.network = network.ID(nw)
102return oldDefaultNetwork != n.network
103}
104
105func (n *networkManager) networkFromSystemNamespace() network.ID {
106n.RLock()
107defer n.RUnlock()
108return n.network
109}
110
111func (n *networkManager) networkFromMeshNetworks(endpointIP string) network.ID {
112n.RLock()
113defer n.RUnlock()
114if n.networkFromMeshConfig != "" {
115return n.networkFromMeshConfig
116}
117
118if n.ranger != nil {
119ip := net.ParseIP(endpointIP)
120if ip == nil {
121return ""
122}
123entries, err := n.ranger.ContainingNetworks(ip)
124if err != nil {
125log.Errorf("error getting cidr ranger entry from endpoint ip %s", endpointIP)
126return ""
127}
128if len(entries) > 1 {
129log.Warnf("Found multiple networks CIDRs matching the endpoint IP: %s. Using the first match.", endpointIP)
130}
131if len(entries) > 0 {
132return (entries[0].(namedRangerEntry)).name
133}
134}
135return ""
136}
137
138// namedRangerEntry for holding network's CIDR and name
139type namedRangerEntry struct {
140name network.ID
141network net.IPNet
142}
143
144// Network returns the IPNet for the network
145func (n namedRangerEntry) Network() net.IPNet {
146return n.network
147}
148
149// onNetworkChange is fired if the default network is changed either via the namespace label or mesh-networks
150func (c *Controller) onNetworkChange() {
151// the network for endpoints are computed when we process the events; this will fix the cache
152// NOTE: this must run before the other network watcher handler that creates a force push
153if err := c.syncPods(); err != nil {
154log.Errorf("one or more errors force-syncing pods: %v", err)
155}
156if err := c.endpoints.initializeNamespace(metav1.NamespaceAll, true); err != nil {
157log.Errorf("one or more errors force-syncing endpoints: %v", err)
158}
159c.reloadNetworkGateways()
160// This is to ensure the ambient workloads are updated dynamically, aligning them with the current network settings.
161// With this, the pod do not need to restart when the network configuration changes.
162if c.ambientIndex != nil {
163c.ambientIndex.SyncAll()
164}
165}
166
167// reloadMeshNetworks will read the mesh networks configuration to setup
168// fromRegistry and cidr based network lookups for this registry
169func (n *networkManager) reloadMeshNetworks() {
170n.Lock()
171defer n.Unlock()
172ranger := cidranger.NewPCTrieRanger()
173
174n.networkFromMeshConfig = ""
175n.registryServiceNameGateways = make(map[host.Name][]model.NetworkGateway)
176
177meshNetworks := n.meshNetworksWatcher.Networks()
178if meshNetworks == nil || len(meshNetworks.Networks) == 0 {
179return
180}
181for id, v := range meshNetworks.Networks {
182// track endpoints items from this registry are a part of this network
183fromRegistry := false
184for _, ep := range v.Endpoints {
185if ep.GetFromCidr() != "" {
186_, nw, err := net.ParseCIDR(ep.GetFromCidr())
187if err != nil {
188log.Warnf("unable to parse CIDR %q for network %s", ep.GetFromCidr(), id)
189continue
190}
191rangerEntry := namedRangerEntry{
192name: network.ID(id),
193network: *nw,
194}
195_ = ranger.Insert(rangerEntry)
196}
197if ep.GetFromRegistry() != "" && cluster.ID(ep.GetFromRegistry()) == n.clusterID {
198fromRegistry = true
199}
200}
201
202// fromRegistry field specified this cluster
203if fromRegistry {
204// treat endpoints in this cluster as part of this network
205if n.networkFromMeshConfig != "" {
206log.Warnf("multiple networks specify %s in fromRegistry; endpoints from %s will continue to be treated as part of %s",
207n.clusterID, n.clusterID, n.networkFromMeshConfig)
208} else {
209n.networkFromMeshConfig = network.ID(id)
210}
211
212// services in this registry matching the registryServiceName and port are part of this network
213for _, gw := range v.Gateways {
214if gwSvcName := gw.GetRegistryServiceName(); gwSvcName != "" {
215svc := host.Name(gwSvcName)
216n.registryServiceNameGateways[svc] = append(n.registryServiceNameGateways[svc], model.NetworkGateway{
217Network: network.ID(id),
218Cluster: n.clusterID,
219Port: gw.GetPort(),
220})
221}
222}
223}
224
225}
226n.ranger = ranger
227}
228
229func (c *Controller) NetworkGateways() []model.NetworkGateway {
230c.networkManager.RLock()
231defer c.networkManager.RUnlock()
232
233// Merge all the gateways into a single set to eliminate duplicates.
234out := make(model.NetworkGatewaySet)
235for _, gateways := range c.networkGatewaysBySvc {
236out.Merge(gateways)
237}
238for _, gateways := range c.gatewaysFromResource {
239out.Merge(gateways)
240}
241
242unsorted := out.UnsortedList()
243return model.SortGateways(unsorted)
244}
245
246// extractGatewaysFromService checks if the service is a cross-network gateway
247// and if it is, updates the controller's gateways.
248func (c *Controller) extractGatewaysFromService(svc *model.Service) bool {
249changed := c.extractGatewaysInner(svc)
250if changed {
251c.NotifyGatewayHandlers()
252}
253return changed
254}
255
256// reloadNetworkGateways performs extractGatewaysFromService for all services registered with the controller.
257// It is called only by `onNetworkChange`.
258// It iterates over all services, because mesh networks can be set with a service name.
259func (c *Controller) reloadNetworkGateways() {
260c.Lock()
261gwsChanged := false
262for _, svc := range c.servicesMap {
263if c.extractGatewaysInner(svc) {
264gwsChanged = true
265break
266}
267}
268c.Unlock()
269if gwsChanged {
270c.NotifyGatewayHandlers()
271// TODO ConfigUpdate via gateway handler
272c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
273}
274}
275
276// extractGatewaysInner performs the logic for extractGatewaysFromService without locking the controller.
277// Returns true if any gateways changed.
278func (n *networkManager) extractGatewaysInner(svc *model.Service) bool {
279n.Lock()
280defer n.Unlock()
281previousGateways := n.networkGatewaysBySvc[svc.Hostname]
282gateways := n.getGatewayDetails(svc)
283// short circuit for most services.
284if len(previousGateways) == 0 && len(gateways) == 0 {
285return false
286}
287
288newGateways := make(model.NetworkGatewaySet)
289// check if we have node port mappings
290nodePortMap := make(map[uint32]uint32)
291if svc.Attributes.ClusterExternalPorts != nil {
292if npm, exists := svc.Attributes.ClusterExternalPorts[n.clusterID]; exists {
293nodePortMap = npm
294}
295}
296
297for _, addr := range svc.Attributes.ClusterExternalAddresses.GetAddressesFor(n.clusterID) {
298for _, gw := range gateways {
299// what we now have is a service port. If there is a mapping for cluster external ports,
300// look it up and get the node port for the remote port
301if nodePort, exists := nodePortMap[gw.Port]; exists {
302gw.Port = nodePort
303}
304
305gw.Cluster = n.clusterID
306gw.Addr = addr
307newGateways.Insert(gw)
308}
309}
310
311gatewaysChanged := !newGateways.Equals(previousGateways)
312if len(newGateways) > 0 {
313n.networkGatewaysBySvc[svc.Hostname] = newGateways
314} else {
315delete(n.networkGatewaysBySvc, svc.Hostname)
316}
317
318return gatewaysChanged
319}
320
321// getGatewayDetails returns gateways without the address populated, only the network and (unmapped) port for a given service.
322func (n *networkManager) getGatewayDetails(svc *model.Service) []model.NetworkGateway {
323// TODO should we start checking if svc's Ports contain the gateway port?
324
325// label based gateways
326// TODO label based gateways could support being the gateway for multiple networks
327if nw := svc.Attributes.Labels[label.TopologyNetwork.Name]; nw != "" {
328if gwPortStr := svc.Attributes.Labels[label.NetworkingGatewayPort.Name]; gwPortStr != "" {
329if gwPort, err := strconv.Atoi(gwPortStr); err == nil {
330return []model.NetworkGateway{{Port: uint32(gwPort), Network: network.ID(nw)}}
331}
332log.Warnf("could not parse %q for %s on %s/%s; defaulting to %d",
333gwPortStr, label.NetworkingGatewayPort.Name, svc.Attributes.Namespace, svc.Attributes.Name, DefaultNetworkGatewayPort)
334}
335return []model.NetworkGateway{{Port: DefaultNetworkGatewayPort, Network: network.ID(nw)}}
336}
337
338// meshNetworks registryServiceName+fromRegistry
339if gws, ok := n.registryServiceNameGateways[svc.Hostname]; ok {
340out := append(make([]model.NetworkGateway, 0, len(gws)), gws...)
341return out
342}
343
344return nil
345}
346
347// handleGateway resource adds a NetworkGateway for each combination of address and auto-passthrough listener
348// discovering duplicates from the generated Service is not a huge concern as we de-duplicate in NetworkGateways
349// which returns a set, although it's not totally efficient.
350func (n *networkManager) handleGatewayResource(_ *v1beta1.Gateway, gw *v1beta1.Gateway, event model.Event) error {
351if nw := gw.GetLabels()[label.TopologyNetwork.Name]; nw == "" {
352return nil
353}
354
355// Gateway with istio-remote: only discover this from the config cluster
356// this is a way to reference a gateway that lives in a place that this control plane
357// won't have API server access. Nothing will be deployed for these Gateway resources.
358if !n.discoverRemoteGatewayResources && gw.Spec.GatewayClassName == constants.RemoteGatewayClassName {
359return nil
360}
361
362gatewaysChanged := false
363n.Lock()
364defer func() {
365n.Unlock()
366if gatewaysChanged {
367n.NotifyGatewayHandlers()
368}
369}()
370
371previousGateways := n.gatewaysFromResource[gw.UID]
372
373if event == model.EventDelete {
374gatewaysChanged = len(previousGateways) > 0
375delete(n.gatewaysFromResource, gw.UID)
376return nil
377}
378
379autoPassthrough := func(l v1beta1.Listener) bool {
380return kube.IsAutoPassthrough(gw.GetLabels(), l)
381}
382
383base := model.NetworkGateway{
384Network: network.ID(gw.GetLabels()[label.TopologyNetwork.Name]),
385Cluster: n.clusterID,
386}
387newGateways := model.NetworkGatewaySet{}
388for _, addr := range gw.Spec.Addresses {
389if addr.Type == nil {
390continue
391}
392if addrType := *addr.Type; addrType != v1beta1.IPAddressType && addrType != v1beta1.HostnameAddressType {
393continue
394}
395for _, l := range slices.Filter(gw.Spec.Listeners, autoPassthrough) {
396networkGateway := base
397networkGateway.Addr = addr.Value
398networkGateway.Port = uint32(l.Port)
399newGateways.Insert(networkGateway)
400}
401}
402n.gatewaysFromResource[gw.UID] = newGateways
403
404if len(previousGateways) != len(newGateways) {
405gatewaysChanged = true
406return nil
407}
408
409gatewaysChanged = !newGateways.Equals(previousGateways)
410if len(newGateways) > 0 {
411n.gatewaysFromResource[gw.UID] = newGateways
412} else {
413delete(n.gatewaysFromResource, gw.UID)
414}
415
416return nil
417}
418
419func (n *networkManager) HasSynced() bool {
420if n.gatewayResourceClient == nil {
421return true
422}
423return n.gatewayResourceClient.HasSynced()
424}
425
426// updateServiceNodePortAddresses updates ClusterExternalAddresses for Services of nodePort type
427func (c *Controller) updateServiceNodePortAddresses(svcs ...*model.Service) bool {
428// node event, update all nodePort gateway services
429if len(svcs) == 0 {
430svcs = c.getNodePortGatewayServices()
431}
432// no nodePort gateway service found, no update
433if len(svcs) == 0 {
434return false
435}
436for _, svc := range svcs {
437c.RLock()
438nodeSelector := c.nodeSelectorsForServices[svc.Hostname]
439c.RUnlock()
440// update external address
441var nodeAddresses []string
442for _, n := range c.nodeInfoMap {
443if nodeSelector.SubsetOf(n.labels) {
444nodeAddresses = append(nodeAddresses, n.address)
445}
446}
447if svc.Attributes.ClusterExternalAddresses == nil {
448svc.Attributes.ClusterExternalAddresses = &model.AddressMap{}
449}
450svc.Attributes.ClusterExternalAddresses.SetAddressesFor(c.Cluster(), nodeAddresses)
451// update gateways that use the service
452c.extractGatewaysFromService(svc)
453}
454return true
455}
456
457// getNodePortServices returns nodePort type gateway service
458func (c *Controller) getNodePortGatewayServices() []*model.Service {
459c.RLock()
460defer c.RUnlock()
461out := make([]*model.Service, 0, len(c.nodeSelectorsForServices))
462for hostname := range c.nodeSelectorsForServices {
463svc := c.servicesMap[hostname]
464if svc != nil {
465out = append(out, svc)
466}
467}
468
469return out
470}
471