istio

Форк
0
470 строк · 15.4 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"net"
19
	"strconv"
20
	"sync"
21

22
	"github.com/yl2chen/cidranger"
23
	v1 "k8s.io/api/core/v1"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	"k8s.io/apimachinery/pkg/types"
26
	"sigs.k8s.io/gateway-api/apis/v1beta1"
27

28
	"istio.io/api/label"
29
	"istio.io/istio/pilot/pkg/features"
30
	"istio.io/istio/pilot/pkg/model"
31
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
32
	"istio.io/istio/pkg/cluster"
33
	"istio.io/istio/pkg/config/constants"
34
	"istio.io/istio/pkg/config/host"
35
	"istio.io/istio/pkg/config/mesh"
36
	"istio.io/istio/pkg/config/schema/gvr"
37
	"istio.io/istio/pkg/kube/kclient"
38
	"istio.io/istio/pkg/kube/kubetypes"
39
	"istio.io/istio/pkg/network"
40
	"istio.io/istio/pkg/slices"
41
)
42

43
type networkManager struct {
44
	sync.RWMutex
45
	// CIDR ranger based on path-compressed prefix trie
46
	ranger    cidranger.Ranger
47
	clusterID cluster.ID
48

49
	gatewayResourceClient kclient.Informer[*v1beta1.Gateway]
50
	meshNetworksWatcher   mesh.NetworksWatcher
51

52
	// Network name for to be used when the meshNetworks fromRegistry nor network label on pod is specified
53
	// This is defined by a topology.istio.io/network label on the system namespace.
54
	network network.ID
55
	// Network name for the registry as specified by the MeshNetworks configmap
56
	networkFromMeshConfig network.ID
57
	// map of svc fqdn to partially built network gateways; the actual gateways will be built from these into networkGatewaysBySvc
58
	// this map just enumerates which networks/ports each Service is a gateway for
59
	registryServiceNameGateways map[host.Name][]model.NetworkGateway
60
	// gateways for each service
61
	networkGatewaysBySvc map[host.Name]model.NetworkGatewaySet
62
	// gateways from kubernetes Gateway resources
63
	gatewaysFromResource map[types.UID]model.NetworkGatewaySet
64
	// we don't want to discover gateways with class "istio-remote" from outside cluster's API servers.
65
	discoverRemoteGatewayResources bool
66

67
	// implements NetworkGatewaysWatcher; we need to call c.NotifyGatewayHandlers when our gateways change
68
	model.NetworkGatewaysHandler
69
}
70

71
func initNetworkManager(c *Controller, options Options) *networkManager {
72
	n := &networkManager{
73
		clusterID:           options.ClusterID,
74
		meshNetworksWatcher: options.MeshNetworksWatcher,
75
		// zero values are a workaround structcheck issue: https://github.com/golangci/golangci-lint/issues/826
76
		ranger:                         nil,
77
		network:                        "",
78
		networkFromMeshConfig:          "",
79
		registryServiceNameGateways:    make(map[host.Name][]model.NetworkGateway),
80
		networkGatewaysBySvc:           make(map[host.Name]model.NetworkGatewaySet),
81
		gatewaysFromResource:           make(map[types.UID]model.NetworkGatewaySet),
82
		discoverRemoteGatewayResources: options.ConfigCluster,
83
	}
84
	// initialize the gateway resource client when any feature that uses it is enabled
85
	if features.MultiNetworkGatewayAPI {
86
		n.gatewayResourceClient = kclient.NewDelayedInformer[*v1beta1.Gateway](c.client, gvr.KubernetesGateway, kubetypes.StandardInformer, kubetypes.Filter{})
87
	}
88
	if features.MultiNetworkGatewayAPI {
89
		// conditionally register this handler
90
		registerHandlers(c, n.gatewayResourceClient, "Gateways", n.handleGatewayResource, nil)
91
	}
92
	return n
93
}
94

95
// setNetworkFromNamespace sets network got from system namespace, returns whether it has changed
96
func (n *networkManager) setNetworkFromNamespace(ns *v1.Namespace) bool {
97
	nw := ns.Labels[label.TopologyNetwork.Name]
98
	n.Lock()
99
	defer n.Unlock()
100
	oldDefaultNetwork := n.network
101
	n.network = network.ID(nw)
102
	return oldDefaultNetwork != n.network
103
}
104

105
func (n *networkManager) networkFromSystemNamespace() network.ID {
106
	n.RLock()
107
	defer n.RUnlock()
108
	return n.network
109
}
110

111
func (n *networkManager) networkFromMeshNetworks(endpointIP string) network.ID {
112
	n.RLock()
113
	defer n.RUnlock()
114
	if n.networkFromMeshConfig != "" {
115
		return n.networkFromMeshConfig
116
	}
117

118
	if n.ranger != nil {
119
		ip := net.ParseIP(endpointIP)
120
		if ip == nil {
121
			return ""
122
		}
123
		entries, err := n.ranger.ContainingNetworks(ip)
124
		if err != nil {
125
			log.Errorf("error getting cidr ranger entry from endpoint ip %s", endpointIP)
126
			return ""
127
		}
128
		if len(entries) > 1 {
129
			log.Warnf("Found multiple networks CIDRs matching the endpoint IP: %s. Using the first match.", endpointIP)
130
		}
131
		if len(entries) > 0 {
132
			return (entries[0].(namedRangerEntry)).name
133
		}
134
	}
135
	return ""
136
}
137

138
// namedRangerEntry for holding network's CIDR and name
139
type namedRangerEntry struct {
140
	name    network.ID
141
	network net.IPNet
142
}
143

144
// Network returns the IPNet for the network
145
func (n namedRangerEntry) Network() net.IPNet {
146
	return n.network
147
}
148

149
// onNetworkChange is fired if the default network is changed either via the namespace label or mesh-networks
150
func (c *Controller) onNetworkChange() {
151
	// the network for endpoints are computed when we process the events; this will fix the cache
152
	// NOTE: this must run before the other network watcher handler that creates a force push
153
	if err := c.syncPods(); err != nil {
154
		log.Errorf("one or more errors force-syncing pods: %v", err)
155
	}
156
	if err := c.endpoints.initializeNamespace(metav1.NamespaceAll, true); err != nil {
157
		log.Errorf("one or more errors force-syncing endpoints: %v", err)
158
	}
159
	c.reloadNetworkGateways()
160
	// This is to ensure the ambient workloads are updated dynamically, aligning them with the current network settings.
161
	// With this, the pod do not need to restart when the network configuration changes.
162
	if c.ambientIndex != nil {
163
		c.ambientIndex.SyncAll()
164
	}
165
}
166

167
// reloadMeshNetworks will read the mesh networks configuration to setup
168
// fromRegistry and cidr based network lookups for this registry
169
func (n *networkManager) reloadMeshNetworks() {
170
	n.Lock()
171
	defer n.Unlock()
172
	ranger := cidranger.NewPCTrieRanger()
173

174
	n.networkFromMeshConfig = ""
175
	n.registryServiceNameGateways = make(map[host.Name][]model.NetworkGateway)
176

177
	meshNetworks := n.meshNetworksWatcher.Networks()
178
	if meshNetworks == nil || len(meshNetworks.Networks) == 0 {
179
		return
180
	}
181
	for id, v := range meshNetworks.Networks {
182
		// track endpoints items from this registry are a part of this network
183
		fromRegistry := false
184
		for _, ep := range v.Endpoints {
185
			if ep.GetFromCidr() != "" {
186
				_, nw, err := net.ParseCIDR(ep.GetFromCidr())
187
				if err != nil {
188
					log.Warnf("unable to parse CIDR %q for network %s", ep.GetFromCidr(), id)
189
					continue
190
				}
191
				rangerEntry := namedRangerEntry{
192
					name:    network.ID(id),
193
					network: *nw,
194
				}
195
				_ = ranger.Insert(rangerEntry)
196
			}
197
			if ep.GetFromRegistry() != "" && cluster.ID(ep.GetFromRegistry()) == n.clusterID {
198
				fromRegistry = true
199
			}
200
		}
201

202
		// fromRegistry field specified this cluster
203
		if fromRegistry {
204
			// treat endpoints in this cluster as part of this network
205
			if n.networkFromMeshConfig != "" {
206
				log.Warnf("multiple networks specify %s in fromRegistry; endpoints from %s will continue to be treated as part of %s",
207
					n.clusterID, n.clusterID, n.networkFromMeshConfig)
208
			} else {
209
				n.networkFromMeshConfig = network.ID(id)
210
			}
211

212
			// services in this registry matching the registryServiceName and port are part of this network
213
			for _, gw := range v.Gateways {
214
				if gwSvcName := gw.GetRegistryServiceName(); gwSvcName != "" {
215
					svc := host.Name(gwSvcName)
216
					n.registryServiceNameGateways[svc] = append(n.registryServiceNameGateways[svc], model.NetworkGateway{
217
						Network: network.ID(id),
218
						Cluster: n.clusterID,
219
						Port:    gw.GetPort(),
220
					})
221
				}
222
			}
223
		}
224

225
	}
226
	n.ranger = ranger
227
}
228

229
func (c *Controller) NetworkGateways() []model.NetworkGateway {
230
	c.networkManager.RLock()
231
	defer c.networkManager.RUnlock()
232

233
	// Merge all the gateways into a single set to eliminate duplicates.
234
	out := make(model.NetworkGatewaySet)
235
	for _, gateways := range c.networkGatewaysBySvc {
236
		out.Merge(gateways)
237
	}
238
	for _, gateways := range c.gatewaysFromResource {
239
		out.Merge(gateways)
240
	}
241

242
	unsorted := out.UnsortedList()
243
	return model.SortGateways(unsorted)
244
}
245

246
// extractGatewaysFromService checks if the service is a cross-network gateway
247
// and if it is, updates the controller's gateways.
248
func (c *Controller) extractGatewaysFromService(svc *model.Service) bool {
249
	changed := c.extractGatewaysInner(svc)
250
	if changed {
251
		c.NotifyGatewayHandlers()
252
	}
253
	return changed
254
}
255

256
// reloadNetworkGateways performs extractGatewaysFromService for all services registered with the controller.
257
// It is called only by `onNetworkChange`.
258
// It iterates over all services, because mesh networks can be set with a service name.
259
func (c *Controller) reloadNetworkGateways() {
260
	c.Lock()
261
	gwsChanged := false
262
	for _, svc := range c.servicesMap {
263
		if c.extractGatewaysInner(svc) {
264
			gwsChanged = true
265
			break
266
		}
267
	}
268
	c.Unlock()
269
	if gwsChanged {
270
		c.NotifyGatewayHandlers()
271
		// TODO ConfigUpdate via gateway handler
272
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: model.NewReasonStats(model.NetworksTrigger)})
273
	}
274
}
275

276
// extractGatewaysInner performs the logic for extractGatewaysFromService without locking the controller.
277
// Returns true if any gateways changed.
278
func (n *networkManager) extractGatewaysInner(svc *model.Service) bool {
279
	n.Lock()
280
	defer n.Unlock()
281
	previousGateways := n.networkGatewaysBySvc[svc.Hostname]
282
	gateways := n.getGatewayDetails(svc)
283
	// short circuit for most services.
284
	if len(previousGateways) == 0 && len(gateways) == 0 {
285
		return false
286
	}
287

288
	newGateways := make(model.NetworkGatewaySet)
289
	// check if we have node port mappings
290
	nodePortMap := make(map[uint32]uint32)
291
	if svc.Attributes.ClusterExternalPorts != nil {
292
		if npm, exists := svc.Attributes.ClusterExternalPorts[n.clusterID]; exists {
293
			nodePortMap = npm
294
		}
295
	}
296

297
	for _, addr := range svc.Attributes.ClusterExternalAddresses.GetAddressesFor(n.clusterID) {
298
		for _, gw := range gateways {
299
			// what we now have is a service port. If there is a mapping for cluster external ports,
300
			// look it up and get the node port for the remote port
301
			if nodePort, exists := nodePortMap[gw.Port]; exists {
302
				gw.Port = nodePort
303
			}
304

305
			gw.Cluster = n.clusterID
306
			gw.Addr = addr
307
			newGateways.Insert(gw)
308
		}
309
	}
310

311
	gatewaysChanged := !newGateways.Equals(previousGateways)
312
	if len(newGateways) > 0 {
313
		n.networkGatewaysBySvc[svc.Hostname] = newGateways
314
	} else {
315
		delete(n.networkGatewaysBySvc, svc.Hostname)
316
	}
317

318
	return gatewaysChanged
319
}
320

321
// getGatewayDetails returns gateways without the address populated, only the network and (unmapped) port for a given service.
322
func (n *networkManager) getGatewayDetails(svc *model.Service) []model.NetworkGateway {
323
	// TODO should we start checking if svc's Ports contain the gateway port?
324

325
	// label based gateways
326
	// TODO label based gateways could support being the gateway for multiple networks
327
	if nw := svc.Attributes.Labels[label.TopologyNetwork.Name]; nw != "" {
328
		if gwPortStr := svc.Attributes.Labels[label.NetworkingGatewayPort.Name]; gwPortStr != "" {
329
			if gwPort, err := strconv.Atoi(gwPortStr); err == nil {
330
				return []model.NetworkGateway{{Port: uint32(gwPort), Network: network.ID(nw)}}
331
			}
332
			log.Warnf("could not parse %q for %s on %s/%s; defaulting to %d",
333
				gwPortStr, label.NetworkingGatewayPort.Name, svc.Attributes.Namespace, svc.Attributes.Name, DefaultNetworkGatewayPort)
334
		}
335
		return []model.NetworkGateway{{Port: DefaultNetworkGatewayPort, Network: network.ID(nw)}}
336
	}
337

338
	// meshNetworks registryServiceName+fromRegistry
339
	if gws, ok := n.registryServiceNameGateways[svc.Hostname]; ok {
340
		out := append(make([]model.NetworkGateway, 0, len(gws)), gws...)
341
		return out
342
	}
343

344
	return nil
345
}
346

347
// handleGateway resource adds a NetworkGateway for each combination of address and auto-passthrough listener
348
// discovering duplicates from the generated Service is not a huge concern as we de-duplicate in NetworkGateways
349
// which returns a set, although it's not totally efficient.
350
func (n *networkManager) handleGatewayResource(_ *v1beta1.Gateway, gw *v1beta1.Gateway, event model.Event) error {
351
	if nw := gw.GetLabels()[label.TopologyNetwork.Name]; nw == "" {
352
		return nil
353
	}
354

355
	// Gateway with istio-remote: only discover this from the config cluster
356
	// this is a way to reference a gateway that lives in a place that this control plane
357
	// won't have API server access. Nothing will be deployed for these Gateway resources.
358
	if !n.discoverRemoteGatewayResources && gw.Spec.GatewayClassName == constants.RemoteGatewayClassName {
359
		return nil
360
	}
361

362
	gatewaysChanged := false
363
	n.Lock()
364
	defer func() {
365
		n.Unlock()
366
		if gatewaysChanged {
367
			n.NotifyGatewayHandlers()
368
		}
369
	}()
370

371
	previousGateways := n.gatewaysFromResource[gw.UID]
372

373
	if event == model.EventDelete {
374
		gatewaysChanged = len(previousGateways) > 0
375
		delete(n.gatewaysFromResource, gw.UID)
376
		return nil
377
	}
378

379
	autoPassthrough := func(l v1beta1.Listener) bool {
380
		return kube.IsAutoPassthrough(gw.GetLabels(), l)
381
	}
382

383
	base := model.NetworkGateway{
384
		Network: network.ID(gw.GetLabels()[label.TopologyNetwork.Name]),
385
		Cluster: n.clusterID,
386
	}
387
	newGateways := model.NetworkGatewaySet{}
388
	for _, addr := range gw.Spec.Addresses {
389
		if addr.Type == nil {
390
			continue
391
		}
392
		if addrType := *addr.Type; addrType != v1beta1.IPAddressType && addrType != v1beta1.HostnameAddressType {
393
			continue
394
		}
395
		for _, l := range slices.Filter(gw.Spec.Listeners, autoPassthrough) {
396
			networkGateway := base
397
			networkGateway.Addr = addr.Value
398
			networkGateway.Port = uint32(l.Port)
399
			newGateways.Insert(networkGateway)
400
		}
401
	}
402
	n.gatewaysFromResource[gw.UID] = newGateways
403

404
	if len(previousGateways) != len(newGateways) {
405
		gatewaysChanged = true
406
		return nil
407
	}
408

409
	gatewaysChanged = !newGateways.Equals(previousGateways)
410
	if len(newGateways) > 0 {
411
		n.gatewaysFromResource[gw.UID] = newGateways
412
	} else {
413
		delete(n.gatewaysFromResource, gw.UID)
414
	}
415

416
	return nil
417
}
418

419
func (n *networkManager) HasSynced() bool {
420
	if n.gatewayResourceClient == nil {
421
		return true
422
	}
423
	return n.gatewayResourceClient.HasSynced()
424
}
425

426
// updateServiceNodePortAddresses updates ClusterExternalAddresses for Services of nodePort type
427
func (c *Controller) updateServiceNodePortAddresses(svcs ...*model.Service) bool {
428
	// node event, update all nodePort gateway services
429
	if len(svcs) == 0 {
430
		svcs = c.getNodePortGatewayServices()
431
	}
432
	// no nodePort gateway service found, no update
433
	if len(svcs) == 0 {
434
		return false
435
	}
436
	for _, svc := range svcs {
437
		c.RLock()
438
		nodeSelector := c.nodeSelectorsForServices[svc.Hostname]
439
		c.RUnlock()
440
		// update external address
441
		var nodeAddresses []string
442
		for _, n := range c.nodeInfoMap {
443
			if nodeSelector.SubsetOf(n.labels) {
444
				nodeAddresses = append(nodeAddresses, n.address)
445
			}
446
		}
447
		if svc.Attributes.ClusterExternalAddresses == nil {
448
			svc.Attributes.ClusterExternalAddresses = &model.AddressMap{}
449
		}
450
		svc.Attributes.ClusterExternalAddresses.SetAddressesFor(c.Cluster(), nodeAddresses)
451
		// update gateways that use the service
452
		c.extractGatewaysFromService(svc)
453
	}
454
	return true
455
}
456

457
// getNodePortServices returns nodePort type gateway service
458
func (c *Controller) getNodePortGatewayServices() []*model.Service {
459
	c.RLock()
460
	defer c.RUnlock()
461
	out := make([]*model.Service, 0, len(c.nodeSelectorsForServices))
462
	for hostname := range c.nodeSelectorsForServices {
463
		svc := c.servicesMap[hostname]
464
		if svc != nil {
465
			out = append(out, svc)
466
		}
467
	}
468

469
	return out
470
}
471

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.