istio

Форк
0
/
controller.go 
369 строк · 11.8 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package ingress provides a read-only view of Kubernetes ingress resources
16
// as an ingress rule configuration type store
17
package ingress
18

19
import (
20
	"errors"
21
	"fmt"
22
	"sort"
23
	"sync"
24

25
	corev1 "k8s.io/api/core/v1"
26
	knetworking "k8s.io/api/networking/v1"
27
	klabels "k8s.io/apimachinery/pkg/labels"
28
	"k8s.io/apimachinery/pkg/types"
29

30
	meshconfig "istio.io/api/mesh/v1alpha1"
31
	"istio.io/istio/pilot/pkg/model"
32
	kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
33
	"istio.io/istio/pkg/config"
34
	"istio.io/istio/pkg/config/constants"
35
	"istio.io/istio/pkg/config/mesh"
36
	"istio.io/istio/pkg/config/schema/collection"
37
	"istio.io/istio/pkg/config/schema/collections"
38
	"istio.io/istio/pkg/config/schema/gvk"
39
	"istio.io/istio/pkg/env"
40
	"istio.io/istio/pkg/kube"
41
	"istio.io/istio/pkg/kube/controllers"
42
	"istio.io/istio/pkg/kube/kclient"
43
	"istio.io/istio/pkg/util/sets"
44
)
45

46
// In 1.0, the Gateway is defined in the namespace where the actual controller runs, and needs to be managed by
47
// user.
48
// The gateway is named by appending "-istio-autogenerated-k8s-ingress" to the name of the ingress.
49
//
50
// Currently the gateway namespace is hardcoded to istio-system (model.IstioIngressNamespace)
51
//
52
// VirtualServices are also auto-generated in the model.IstioIngressNamespace.
53
//
54
// The sync of Ingress objects to IP is done by status.go
55
// the 'ingress service' name is used to get the IP of the Service
56
// If ingress service is empty, it falls back to NodeExternalIP list, selected using the labels.
57
// This is using 'namespace' of pilot - but seems to be broken (never worked), since it uses Pilot's pod labels
58
// instead of the ingress labels.
59

60
// Follows mesh.IngressControllerMode setting to enable - OFF|STRICT|DEFAULT.
61
// STRICT requires "kubernetes.io/ingress.class" == mesh.IngressClass
62
// DEFAULT allows Ingress without explicit class.
63

64
// In 1.1:
65
// - K8S_INGRESS_NS - namespace of the Gateway that will act as ingress.
66
// - labels of the gateway set to "app=ingressgateway" for node_port, service set to 'ingressgateway' (matching default install)
67
//   If we need more flexibility - we can add it (but likely we'll deprecate ingress support first)
68
// -
69

70
var schemas = collection.SchemasFor(
71
	collections.VirtualService,
72
	collections.Gateway)
73

74
// Control needs RBAC permissions to write to Pods.
75

76
type controller struct {
77
	meshWatcher  mesh.Holder
78
	domainSuffix string
79

80
	queue                  controllers.Queue
81
	virtualServiceHandlers []model.EventHandler
82
	gatewayHandlers        []model.EventHandler
83

84
	mutex sync.RWMutex
85
	// processed ingresses
86
	ingresses map[types.NamespacedName]*knetworking.Ingress
87

88
	classes  kclient.Client[*knetworking.IngressClass]
89
	ingress  kclient.Client[*knetworking.Ingress]
90
	services kclient.Client[*corev1.Service]
91
}
92

93
var IngressNamespace = env.Register("K8S_INGRESS_NS", constants.IstioIngressNamespace, "").Get()
94

95
var errUnsupportedOp = errors.New("unsupported operation: the ingress config store is a read-only view")
96

97
// NewController creates a new Kubernetes controller
98
func NewController(client kube.Client, meshWatcher mesh.Holder,
99
	options kubecontroller.Options,
100
) model.ConfigStoreController {
101
	ingress := kclient.NewFiltered[*knetworking.Ingress](client, kclient.Filter{ObjectFilter: client.ObjectFilter()})
102
	classes := kclient.New[*knetworking.IngressClass](client)
103
	services := kclient.NewFiltered[*corev1.Service](client, kclient.Filter{ObjectFilter: client.ObjectFilter()})
104

105
	c := &controller{
106
		meshWatcher:  meshWatcher,
107
		domainSuffix: options.DomainSuffix,
108
		ingresses:    make(map[types.NamespacedName]*knetworking.Ingress),
109
		ingress:      ingress,
110
		classes:      classes,
111
		services:     services,
112
	}
113
	c.queue = controllers.NewQueue("ingress",
114
		controllers.WithReconciler(c.onEvent),
115
		controllers.WithMaxAttempts(5))
116
	c.ingress.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
117

118
	// We watch service changes to detect service port number change to trigger
119
	// re-convert ingress to new-vs.
120
	c.services.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
121
		c.onServiceEvent(o)
122
	}))
123

124
	return c
125
}
126

127
func (c *controller) Run(stop <-chan struct{}) {
128
	kube.WaitForCacheSync("ingress", stop, c.ingress.HasSynced, c.services.HasSynced, c.classes.HasSynced)
129
	c.queue.Run(stop)
130
	controllers.ShutdownAll(c.ingress, c.services, c.classes)
131
}
132

133
func (c *controller) shouldProcessIngress(mesh *meshconfig.MeshConfig, i *knetworking.Ingress) bool {
134
	var class *knetworking.IngressClass
135
	if i.Spec.IngressClassName != nil {
136
		c := c.classes.Get(*i.Spec.IngressClassName, "")
137
		if c == nil {
138
			return false
139
		}
140
		class = c
141
	}
142
	return shouldProcessIngressWithClass(mesh, i, class)
143
}
144

145
// shouldProcessIngressUpdate checks whether we should renotify registered handlers about an update event
146
func (c *controller) shouldProcessIngressUpdate(ing *knetworking.Ingress) bool {
147
	// ingress add/update
148
	shouldProcess := c.shouldProcessIngress(c.meshWatcher.Mesh(), ing)
149
	item := config.NamespacedName(ing)
150
	if shouldProcess {
151
		// record processed ingress
152
		c.mutex.Lock()
153
		c.ingresses[item] = ing
154
		c.mutex.Unlock()
155
		return true
156
	}
157

158
	c.mutex.Lock()
159
	_, preProcessed := c.ingresses[item]
160
	// previous processed but should not currently, delete it
161
	if preProcessed && !shouldProcess {
162
		delete(c.ingresses, item)
163
	} else {
164
		c.ingresses[item] = ing
165
	}
166
	c.mutex.Unlock()
167

168
	return preProcessed
169
}
170

171
func (c *controller) onEvent(item types.NamespacedName) error {
172
	event := model.EventUpdate
173
	ing := c.ingress.Get(item.Name, item.Namespace)
174
	if ing == nil {
175
		event = model.EventDelete
176
		c.mutex.Lock()
177
		ing = c.ingresses[item]
178
		delete(c.ingresses, item)
179
		c.mutex.Unlock()
180
		if ing == nil {
181
			// It was a delete and we didn't have an existing known ingress, no action
182
			return nil
183
		}
184
	}
185

186
	// we should check need process only when event is not delete,
187
	// if it is delete event, and previously processed, we need to process too.
188
	if event != model.EventDelete {
189
		shouldProcess := c.shouldProcessIngressUpdate(ing)
190
		if !shouldProcess {
191
			return nil
192
		}
193
	}
194

195
	vsmetadata := config.Meta{
196
		Name:             item.Name + "-" + "virtualservice",
197
		Namespace:        item.Namespace,
198
		GroupVersionKind: gvk.VirtualService,
199
		// Set this label so that we do not compare configs and just push.
200
		Labels: map[string]string{constants.AlwaysPushLabel: "true"},
201
	}
202
	gatewaymetadata := config.Meta{
203
		Name:             item.Name + "-" + "gateway",
204
		Namespace:        item.Namespace,
205
		GroupVersionKind: gvk.Gateway,
206
		// Set this label so that we do not compare configs and just push.
207
		Labels: map[string]string{constants.AlwaysPushLabel: "true"},
208
	}
209

210
	// Trigger updates for Gateway and VirtualService
211
	// TODO: we could be smarter here and only trigger when real changes were found
212
	for _, f := range c.virtualServiceHandlers {
213
		f(config.Config{Meta: vsmetadata}, config.Config{Meta: vsmetadata}, event)
214
	}
215
	for _, f := range c.gatewayHandlers {
216
		f(config.Config{Meta: gatewaymetadata}, config.Config{Meta: gatewaymetadata}, event)
217
	}
218

219
	return nil
220
}
221

222
func (c *controller) onServiceEvent(input any) {
223
	event := input.(controllers.Event)
224
	curSvc := event.Latest().(*corev1.Service)
225

226
	// This is shortcut. We only care about the port number change if we receive service update event.
227
	if event.Event == controllers.EventUpdate {
228
		oldSvc := event.Old.(*corev1.Service)
229
		oldPorts := extractPorts(oldSvc.Spec.Ports)
230
		curPorts := extractPorts(curSvc.Spec.Ports)
231
		// If the ports don't change, we do nothing.
232
		if oldPorts.Equals(curPorts) {
233
			return
234
		}
235
	}
236

237
	// We care about add, delete and ports changed event of services that are referred
238
	// by ingress using port name.
239
	namespacedName := config.NamespacedName(curSvc).String()
240
	for _, ingress := range c.ingress.List(curSvc.GetNamespace(), klabels.Everything()) {
241
		referredSvcSet := extractServicesByPortNameType(ingress)
242
		if referredSvcSet.Contains(namespacedName) {
243
			c.queue.AddObject(ingress)
244
		}
245
	}
246
}
247

248
func (c *controller) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
249
	switch kind {
250
	case gvk.VirtualService:
251
		c.virtualServiceHandlers = append(c.virtualServiceHandlers, f)
252
	case gvk.Gateway:
253
		c.gatewayHandlers = append(c.gatewayHandlers, f)
254
	}
255
}
256

257
func (c *controller) HasSynced() bool {
258
	return c.queue.HasSynced()
259
}
260

261
func (c *controller) Schemas() collection.Schemas {
262
	// TODO: are these two config descriptors right?
263
	return schemas
264
}
265

266
func (c *controller) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
267
	return nil
268
}
269

270
// sortIngressByCreationTime sorts the list of config objects in ascending order by their creation time (if available).
271
func sortIngressByCreationTime(ingr []*knetworking.Ingress) []*knetworking.Ingress {
272
	sort.Slice(ingr, func(i, j int) bool {
273
		// If creation time is the same, then behavior is nondeterministic. In this case, we can
274
		// pick an arbitrary but consistent ordering based on name and namespace, which is unique.
275
		// CreationTimestamp is stored in seconds, so this is not uncommon.
276
		if ingr[i].CreationTimestamp == ingr[j].CreationTimestamp {
277
			in := ingr[i].Name + "." + ingr[i].Namespace
278
			jn := ingr[j].Name + "." + ingr[j].Namespace
279
			return in < jn
280
		}
281
		return ingr[i].CreationTimestamp.Before(&ingr[j].CreationTimestamp)
282
	})
283
	return ingr
284
}
285

286
func (c *controller) List(typ config.GroupVersionKind, namespace string) []config.Config {
287
	if typ != gvk.Gateway &&
288
		typ != gvk.VirtualService {
289
		return nil
290
	}
291

292
	out := make([]config.Config, 0)
293
	ingressByHost := map[string]*config.Config{}
294
	for _, ingress := range sortIngressByCreationTime(c.ingress.List(namespace, klabels.Everything())) {
295
		process := c.shouldProcessIngress(c.meshWatcher.Mesh(), ingress)
296
		if !process {
297
			continue
298
		}
299

300
		switch typ {
301
		case gvk.VirtualService:
302
			ConvertIngressVirtualService(*ingress, c.domainSuffix, ingressByHost, c.services)
303
		case gvk.Gateway:
304
			gateways := ConvertIngressV1alpha3(*ingress, c.meshWatcher.Mesh(), c.domainSuffix)
305
			out = append(out, gateways)
306
		}
307
	}
308

309
	if typ == gvk.VirtualService {
310
		for _, obj := range ingressByHost {
311
			out = append(out, *obj)
312
		}
313
	}
314

315
	return out
316
}
317

318
// extractServicesByPortNameType extract services that are of port name type in the specified ingress resource.
319
func extractServicesByPortNameType(ingress *knetworking.Ingress) sets.String {
320
	services := sets.String{}
321
	for _, rule := range ingress.Spec.Rules {
322
		if rule.HTTP == nil {
323
			continue
324
		}
325

326
		for _, route := range rule.HTTP.Paths {
327
			if route.Backend.Service == nil {
328
				continue
329
			}
330

331
			if route.Backend.Service.Port.Name != "" {
332
				services.Insert(types.NamespacedName{
333
					Namespace: ingress.GetNamespace(),
334
					Name:      route.Backend.Service.Name,
335
				}.String())
336
			}
337
		}
338
	}
339
	return services
340
}
341

342
func extractPorts(ports []corev1.ServicePort) sets.String {
343
	result := sets.String{}
344
	for _, port := range ports {
345
		// the format is port number|port name.
346
		result.Insert(fmt.Sprintf("%d|%s", port.Port, port.Name))
347
	}
348
	return result
349
}
350

351
func (c *controller) Create(_ config.Config) (string, error) {
352
	return "", errUnsupportedOp
353
}
354

355
func (c *controller) Update(_ config.Config) (string, error) {
356
	return "", errUnsupportedOp
357
}
358

359
func (c *controller) UpdateStatus(config.Config) (string, error) {
360
	return "", errUnsupportedOp
361
}
362

363
func (c *controller) Patch(_ config.Config, _ config.PatchFunc) (string, error) {
364
	return "", errUnsupportedOp
365
}
366

367
func (c *controller) Delete(_ config.GroupVersionKind, _, _ string, _ *string) error {
368
	return errUnsupportedOp
369
}
370

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.