istio

Форк
0
/
controller.go 
402 строки · 13.5 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package gateway
16

17
import (
18
	"fmt"
19
	"sync"
20
	"time"
21

22
	"go.uber.org/atomic"
23
	corev1 "k8s.io/api/core/v1"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	klabels "k8s.io/apimachinery/pkg/labels"
26
	"k8s.io/apimachinery/pkg/runtime/schema"
27

28
	"istio.io/istio/pilot/pkg/credentials"
29
	"istio.io/istio/pilot/pkg/features"
30
	"istio.io/istio/pilot/pkg/model"
31
	"istio.io/istio/pilot/pkg/model/kstatus"
32
	"istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
33
	"istio.io/istio/pilot/pkg/status"
34
	"istio.io/istio/pkg/cluster"
35
	"istio.io/istio/pkg/config"
36
	"istio.io/istio/pkg/config/labels"
37
	"istio.io/istio/pkg/config/schema/collection"
38
	"istio.io/istio/pkg/config/schema/collections"
39
	"istio.io/istio/pkg/config/schema/gvk"
40
	"istio.io/istio/pkg/config/schema/gvr"
41
	"istio.io/istio/pkg/config/schema/kind"
42
	"istio.io/istio/pkg/kube"
43
	"istio.io/istio/pkg/kube/controllers"
44
	"istio.io/istio/pkg/kube/kclient"
45
	"istio.io/istio/pkg/kube/kubetypes"
46
	istiolog "istio.io/istio/pkg/log"
47
	"istio.io/istio/pkg/maps"
48
	"istio.io/istio/pkg/slices"
49
	"istio.io/istio/pkg/util/sets"
50
)
51

52
var log = istiolog.RegisterScope("gateway", "gateway-api controller")
53

54
var errUnsupportedOp = fmt.Errorf("unsupported operation: the gateway config store is a read-only view")
55

56
// Controller defines the controller for the gateway-api. The controller acts a bit different from most.
57
// Rather than watching the CRs directly, we depend on the existing model.ConfigStoreController which
58
// already watches all CRs. When there are updates, a new PushContext will be computed, which will eventually
59
// call Controller.Reconcile(). Once this happens, we will inspect the current state of the world, and transform
60
// gateway-api types into Istio types (Gateway/VirtualService). Future calls to Get/List will return these
61
// Istio types. These are not stored in the cluster at all, and are purely internal; they can be seen on /debug/configz.
62
// During Reconcile(), the status on all gateway-api types is also tracked. Once completed, if the status
63
// has changed at all, it is queued to asynchronously update the status of the object in Kubernetes.
64
type Controller struct {
65
	// client for accessing Kubernetes
66
	client kube.Client
67
	// cache provides access to the underlying gateway-configs
68
	cache model.ConfigStoreController
69

70
	// Gateway-api types reference namespace labels directly, so we need access to these
71
	namespaces       kclient.Client[*corev1.Namespace]
72
	namespaceHandler model.EventHandler
73

74
	// Gateway-api types reference secrets directly, so we need access to these
75
	credentialsController credentials.MulticlusterController
76
	secretHandler         model.EventHandler
77

78
	// the cluster where the gateway-api controller runs
79
	cluster cluster.ID
80
	// domain stores the cluster domain, typically cluster.local
81
	domain string
82

83
	// state is our computed Istio resources. Access is guarded by stateMu. This is updated from Reconcile().
84
	state   IstioResources
85
	stateMu sync.RWMutex
86

87
	// statusController controls the status working queue. Status will only be written if statusEnabled is true, which
88
	// is only the case when we are the leader.
89
	statusController *atomic.Pointer[status.Controller]
90

91
	waitForCRD func(class schema.GroupVersionResource, stop <-chan struct{}) bool
92
}
93

94
var _ model.GatewayController = &Controller{}
95

96
func NewController(
97
	kc kube.Client,
98
	c model.ConfigStoreController,
99
	waitForCRD func(class schema.GroupVersionResource, stop <-chan struct{}) bool,
100
	credsController credentials.MulticlusterController,
101
	options controller.Options,
102
) *Controller {
103
	var ctl *status.Controller
104

105
	namespaces := kclient.NewFiltered[*corev1.Namespace](kc, kubetypes.Filter{ObjectFilter: kc.ObjectFilter()})
106
	gatewayController := &Controller{
107
		client:                kc,
108
		cache:                 c,
109
		namespaces:            namespaces,
110
		credentialsController: credsController,
111
		cluster:               options.ClusterID,
112
		domain:                options.DomainSuffix,
113
		statusController:      atomic.NewPointer(ctl),
114
		waitForCRD:            waitForCRD,
115
	}
116

117
	namespaces.AddEventHandler(controllers.EventHandler[*corev1.Namespace]{
118
		UpdateFunc: func(oldNs, newNs *corev1.Namespace) {
119
			if !labels.Instance(oldNs.Labels).Equals(newNs.Labels) {
120
				gatewayController.namespaceEvent(oldNs, newNs)
121
			}
122
		},
123
	})
124

125
	if credsController != nil {
126
		credsController.AddSecretHandler(gatewayController.secretEvent)
127
	}
128

129
	return gatewayController
130
}
131

132
func (c *Controller) Schemas() collection.Schemas {
133
	return collection.SchemasFor(
134
		collections.VirtualService,
135
		collections.Gateway,
136
	)
137
}
138

139
func (c *Controller) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
140
	return nil
141
}
142

143
func (c *Controller) List(typ config.GroupVersionKind, namespace string) []config.Config {
144
	if typ != gvk.Gateway && typ != gvk.VirtualService {
145
		return nil
146
	}
147

148
	c.stateMu.RLock()
149
	defer c.stateMu.RUnlock()
150
	switch typ {
151
	case gvk.Gateway:
152
		return filterNamespace(c.state.Gateway, namespace)
153
	case gvk.VirtualService:
154
		return filterNamespace(c.state.VirtualService, namespace)
155
	default:
156
		return nil
157
	}
158
}
159

160
func (c *Controller) SetStatusWrite(enabled bool, statusManager *status.Manager) {
161
	if enabled && features.EnableGatewayAPIStatus && statusManager != nil {
162
		c.statusController.Store(
163
			statusManager.CreateGenericController(func(status any, context any) status.GenerationProvider {
164
				return &gatewayGeneration{context}
165
			}),
166
		)
167
	} else {
168
		c.statusController.Store(nil)
169
	}
170
}
171

172
// Reconcile takes in a current snapshot of the gateway-api configs, and regenerates our internal state.
173
// Any status updates required will be enqueued as well.
174
func (c *Controller) Reconcile(ps *model.PushContext) error {
175
	t0 := time.Now()
176
	defer func() {
177
		log.Debugf("reconcile complete in %v", time.Since(t0))
178
	}()
179
	gatewayClass := c.cache.List(gvk.GatewayClass, metav1.NamespaceAll)
180
	gateway := c.cache.List(gvk.KubernetesGateway, metav1.NamespaceAll)
181
	httpRoute := c.cache.List(gvk.HTTPRoute, metav1.NamespaceAll)
182
	grpcRoute := c.cache.List(gvk.GRPCRoute, metav1.NamespaceAll)
183
	tcpRoute := c.cache.List(gvk.TCPRoute, metav1.NamespaceAll)
184
	tlsRoute := c.cache.List(gvk.TLSRoute, metav1.NamespaceAll)
185
	referenceGrant := c.cache.List(gvk.ReferenceGrant, metav1.NamespaceAll)
186
	serviceEntry := c.cache.List(gvk.ServiceEntry, metav1.NamespaceAll) // TODO lazy load only referenced SEs?
187

188
	input := GatewayResources{
189
		GatewayClass:   deepCopyStatus(gatewayClass),
190
		Gateway:        deepCopyStatus(gateway),
191
		HTTPRoute:      deepCopyStatus(httpRoute),
192
		GRPCRoute:      deepCopyStatus(grpcRoute),
193
		TCPRoute:       deepCopyStatus(tcpRoute),
194
		TLSRoute:       deepCopyStatus(tlsRoute),
195
		ReferenceGrant: referenceGrant,
196
		ServiceEntry:   serviceEntry,
197
		Domain:         c.domain,
198
		Context:        NewGatewayContext(ps, c.cluster),
199
	}
200

201
	if !input.hasResources() {
202
		// Early exit for common case of no gateway-api used.
203
		c.stateMu.Lock()
204
		defer c.stateMu.Unlock()
205
		// make sure we clear out the state, to handle the last gateway-api resource being removed
206
		c.state = IstioResources{}
207
		return nil
208
	}
209

210
	nsl := c.namespaces.List("", klabels.Everything())
211
	namespaces := make(map[string]*corev1.Namespace, len(nsl))
212
	for _, ns := range nsl {
213
		namespaces[ns.Name] = ns
214
	}
215
	input.Namespaces = namespaces
216

217
	if c.credentialsController != nil {
218
		credentials, err := c.credentialsController.ForCluster(c.cluster)
219
		if err != nil {
220
			return fmt.Errorf("failed to get credentials: %v", err)
221
		}
222
		input.Credentials = credentials
223
	}
224

225
	output := convertResources(input)
226

227
	// Handle all status updates
228
	c.QueueStatusUpdates(input)
229

230
	c.stateMu.Lock()
231
	defer c.stateMu.Unlock()
232
	c.state = output
233
	return nil
234
}
235

236
func (c *Controller) QueueStatusUpdates(r GatewayResources) {
237
	c.handleStatusUpdates(r.GatewayClass)
238
	c.handleStatusUpdates(r.Gateway)
239
	c.handleStatusUpdates(r.HTTPRoute)
240
	c.handleStatusUpdates(r.GRPCRoute)
241
	c.handleStatusUpdates(r.TCPRoute)
242
	c.handleStatusUpdates(r.TLSRoute)
243
}
244

245
func (c *Controller) handleStatusUpdates(configs []config.Config) {
246
	statusController := c.statusController.Load()
247
	if statusController == nil {
248
		return
249
	}
250
	for _, cfg := range configs {
251
		ws := cfg.Status.(*kstatus.WrappedStatus)
252
		if ws.Dirty {
253
			res := status.ResourceFromModelConfig(cfg)
254
			statusController.EnqueueStatusUpdateResource(ws.Unwrap(), res)
255
		}
256
	}
257
}
258

259
func (c *Controller) Create(config config.Config) (revision string, err error) {
260
	return "", errUnsupportedOp
261
}
262

263
func (c *Controller) Update(config config.Config) (newRevision string, err error) {
264
	return "", errUnsupportedOp
265
}
266

267
func (c *Controller) UpdateStatus(config config.Config) (newRevision string, err error) {
268
	return "", errUnsupportedOp
269
}
270

271
func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
272
	return "", errUnsupportedOp
273
}
274

275
func (c *Controller) Delete(typ config.GroupVersionKind, name, namespace string, _ *string) error {
276
	return errUnsupportedOp
277
}
278

279
func (c *Controller) RegisterEventHandler(typ config.GroupVersionKind, handler model.EventHandler) {
280
	switch typ {
281
	case gvk.Namespace:
282
		c.namespaceHandler = handler
283
	case gvk.Secret:
284
		c.secretHandler = handler
285
	}
286
	// For all other types, do nothing as c.cache has been registered
287
}
288

289
func (c *Controller) Run(stop <-chan struct{}) {
290
	if features.EnableGatewayAPIGatewayClassController {
291
		go func() {
292
			if c.waitForCRD(gvr.GatewayClass, stop) {
293
				gcc := NewClassController(c.client)
294
				c.client.RunAndWait(stop)
295
				gcc.Run(stop)
296
			}
297
		}()
298
	}
299
}
300

301
func (c *Controller) HasSynced() bool {
302
	return c.cache.HasSynced() && c.namespaces.HasSynced()
303
}
304

305
func (c *Controller) SecretAllowed(resourceName string, namespace string) bool {
306
	c.stateMu.RLock()
307
	defer c.stateMu.RUnlock()
308
	return c.state.AllowedReferences.SecretAllowed(resourceName, namespace)
309
}
310

311
// namespaceEvent handles a namespace add/update. Gateway's can select routes by label, so we need to handle
312
// when the labels change.
313
// Note: we don't handle delete as a delete would also clean up any relevant gateway-api types which will
314
// trigger its own event.
315
func (c *Controller) namespaceEvent(oldNs, newNs *corev1.Namespace) {
316
	// First, find all the label keys on the old/new namespace. We include NamespaceNameLabel
317
	// since we have special logic to always allow this on namespace.
318
	touchedNamespaceLabels := sets.New(NamespaceNameLabel)
319
	touchedNamespaceLabels.InsertAll(getLabelKeys(oldNs)...)
320
	touchedNamespaceLabels.InsertAll(getLabelKeys(newNs)...)
321

322
	// Next, we find all keys our Gateways actually reference.
323
	c.stateMu.RLock()
324
	intersection := touchedNamespaceLabels.Intersection(c.state.ReferencedNamespaceKeys)
325
	c.stateMu.RUnlock()
326

327
	// If there was any overlap, then a relevant namespace label may have changed, and we trigger a
328
	// push. A more exact check could actually determine if the label selection result actually changed.
329
	// However, this is a much simpler approach that is likely to scale well enough for now.
330
	if !intersection.IsEmpty() && c.namespaceHandler != nil {
331
		log.Debugf("namespace labels changed, triggering namespace handler: %v", intersection.UnsortedList())
332
		c.namespaceHandler(config.Config{}, config.Config{}, model.EventUpdate)
333
	}
334
}
335

336
// getLabelKeys extracts all label keys from a namespace object.
337
func getLabelKeys(ns *corev1.Namespace) []string {
338
	if ns == nil {
339
		return nil
340
	}
341
	return maps.Keys(ns.Labels)
342
}
343

344
func (c *Controller) secretEvent(name, namespace string) {
345
	var impactedConfigs []model.ConfigKey
346
	c.stateMu.RLock()
347
	impactedConfigs = c.state.ResourceReferences[model.ConfigKey{
348
		Kind:      kind.Secret,
349
		Namespace: namespace,
350
		Name:      name,
351
	}]
352
	c.stateMu.RUnlock()
353
	if len(impactedConfigs) > 0 {
354
		log.Debugf("secret %s/%s changed, triggering secret handler", namespace, name)
355
		for _, cfg := range impactedConfigs {
356
			gw := config.Config{
357
				Meta: config.Meta{
358
					GroupVersionKind: gvk.KubernetesGateway,
359
					Namespace:        cfg.Namespace,
360
					Name:             cfg.Name,
361
				},
362
			}
363
			c.secretHandler(gw, gw, model.EventUpdate)
364
		}
365
	}
366
}
367

368
// deepCopyStatus creates a copy of all configs, with a copy of the status field that we can mutate.
369
// This allows our functions to call Status.Mutate, and then we can later persist all changes into the
370
// API server.
371
func deepCopyStatus(configs []config.Config) []config.Config {
372
	return slices.Map(configs, func(c config.Config) config.Config {
373
		return config.Config{
374
			Meta:   c.Meta,
375
			Spec:   c.Spec,
376
			Status: kstatus.Wrap(c.Status),
377
		}
378
	})
379
}
380

381
// filterNamespace allows filtering out configs to only a specific namespace. This allows implementing the
382
// List call which can specify a specific namespace.
383
func filterNamespace(cfgs []config.Config, namespace string) []config.Config {
384
	if namespace == metav1.NamespaceAll {
385
		return cfgs
386
	}
387
	return slices.Filter(cfgs, func(c config.Config) bool {
388
		return c.Namespace == namespace
389
	})
390
}
391

392
// hasResources determines if there are any gateway-api resources created at all.
393
// If not, we can short circuit all processing to avoid excessive work.
394
func (kr GatewayResources) hasResources() bool {
395
	return len(kr.GatewayClass) > 0 ||
396
		len(kr.Gateway) > 0 ||
397
		len(kr.HTTPRoute) > 0 ||
398
		len(kr.GRPCRoute) > 0 ||
399
		len(kr.TCPRoute) > 0 ||
400
		len(kr.TLSRoute) > 0 ||
401
		len(kr.ReferenceGrant) > 0
402
}
403

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.