1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
15
// Package ingress provides a read-only view of Kubernetes ingress resources
16
// as an ingress rule configuration type store
25
corev1 "k8s.io/api/core/v1"
26
knetworking "k8s.io/api/networking/v1"
27
klabels "k8s.io/apimachinery/pkg/labels"
28
"k8s.io/apimachinery/pkg/types"
30
meshconfig "istio.io/api/mesh/v1alpha1"
31
"istio.io/istio/pilot/pkg/model"
32
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
33
"istio.io/istio/pkg/config"
34
"istio.io/istio/pkg/config/constants"
35
"istio.io/istio/pkg/config/mesh"
36
"istio.io/istio/pkg/config/schema/collection"
37
"istio.io/istio/pkg/config/schema/collections"
38
"istio.io/istio/pkg/config/schema/gvk"
39
"istio.io/istio/pkg/env"
40
"istio.io/istio/pkg/kube"
41
"istio.io/istio/pkg/kube/controllers"
42
"istio.io/istio/pkg/kube/kclient"
43
"istio.io/istio/pkg/util/sets"
46
// In 1.0, the Gateway is defined in the namespace where the actual controller runs, and needs to be managed by
48
// The gateway is named by appending "-istio-autogenerated-k8s-ingress" to the name of the ingress.
50
// Currently the gateway namespace is hardcoded to istio-system (model.IstioIngressNamespace)
52
// VirtualServices are also auto-generated in the model.IstioIngressNamespace.
54
// The sync of Ingress objects to IP is done by status.go
55
// the 'ingress service' name is used to get the IP of the Service
56
// If ingress service is empty, it falls back to NodeExternalIP list, selected using the labels.
57
// This is using 'namespace' of pilot - but seems to be broken (never worked), since it uses Pilot's pod labels
58
// instead of the ingress labels.
60
// Follows mesh.IngressControllerMode setting to enable - OFF|STRICT|DEFAULT.
61
// STRICT requires "kubernetes.io/ingress.class" == mesh.IngressClass
62
// DEFAULT allows Ingress without explicit class.
65
// - K8S_INGRESS_NS - namespace of the Gateway that will act as ingress.
66
// - labels of the gateway set to "app=ingressgateway" for node_port, service set to 'ingressgateway' (matching default install)
67
// If we need more flexibility - we can add it (but likely we'll deprecate ingress support first)
70
var schemas = collection.SchemasFor(
71
collections.VirtualService,
74
// Control needs RBAC permissions to write to Pods.
76
type controller struct {
77
meshWatcher mesh.Holder
80
queue controllers.Queue
81
virtualServiceHandlers []model.EventHandler
82
gatewayHandlers []model.EventHandler
85
// processed ingresses
86
ingresses map[types.NamespacedName]*knetworking.Ingress
88
classes kclient.Client[*knetworking.IngressClass]
89
ingress kclient.Client[*knetworking.Ingress]
90
services kclient.Client[*corev1.Service]
93
var IngressNamespace = env.Register("K8S_INGRESS_NS", constants.IstioIngressNamespace, "").Get()
95
var errUnsupportedOp = errors.New("unsupported operation: the ingress config store is a read-only view")
97
// NewController creates a new Kubernetes controller
98
func NewController(client kube.Client, meshWatcher mesh.Holder,
99
options kubecontroller.Options,
100
) model.ConfigStoreController {
101
ingress := kclient.NewFiltered[*knetworking.Ingress](client, kclient.Filter{ObjectFilter: client.ObjectFilter()})
102
classes := kclient.New[*knetworking.IngressClass](client)
103
services := kclient.NewFiltered[*corev1.Service](client, kclient.Filter{ObjectFilter: client.ObjectFilter()})
106
meshWatcher: meshWatcher,
107
domainSuffix: options.DomainSuffix,
108
ingresses: make(map[types.NamespacedName]*knetworking.Ingress),
113
c.queue = controllers.NewQueue("ingress",
114
controllers.WithReconciler(c.onEvent),
115
controllers.WithMaxAttempts(5))
116
c.ingress.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
118
// We watch service changes to detect service port number change to trigger
119
// re-convert ingress to new-vs.
120
c.services.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
127
func (c *controller) Run(stop <-chan struct{}) {
128
kube.WaitForCacheSync("ingress", stop, c.ingress.HasSynced, c.services.HasSynced, c.classes.HasSynced)
130
controllers.ShutdownAll(c.ingress, c.services, c.classes)
133
func (c *controller) shouldProcessIngress(mesh *meshconfig.MeshConfig, i *knetworking.Ingress) bool {
134
var class *knetworking.IngressClass
135
if i.Spec.IngressClassName != nil {
136
c := c.classes.Get(*i.Spec.IngressClassName, "")
142
return shouldProcessIngressWithClass(mesh, i, class)
145
// shouldProcessIngressUpdate checks whether we should renotify registered handlers about an update event
146
func (c *controller) shouldProcessIngressUpdate(ing *knetworking.Ingress) bool {
147
// ingress add/update
148
shouldProcess := c.shouldProcessIngress(c.meshWatcher.Mesh(), ing)
149
item := config.NamespacedName(ing)
151
// record processed ingress
153
c.ingresses[item] = ing
159
_, preProcessed := c.ingresses[item]
160
// previous processed but should not currently, delete it
161
if preProcessed && !shouldProcess {
162
delete(c.ingresses, item)
164
c.ingresses[item] = ing
171
func (c *controller) onEvent(item types.NamespacedName) error {
172
event := model.EventUpdate
173
ing := c.ingress.Get(item.Name, item.Namespace)
175
event = model.EventDelete
177
ing = c.ingresses[item]
178
delete(c.ingresses, item)
181
// It was a delete and we didn't have an existing known ingress, no action
186
// we should check need process only when event is not delete,
187
// if it is delete event, and previously processed, we need to process too.
188
if event != model.EventDelete {
189
shouldProcess := c.shouldProcessIngressUpdate(ing)
195
vsmetadata := config.Meta{
196
Name: item.Name + "-" + "virtualservice",
197
Namespace: item.Namespace,
198
GroupVersionKind: gvk.VirtualService,
199
// Set this label so that we do not compare configs and just push.
200
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
202
gatewaymetadata := config.Meta{
203
Name: item.Name + "-" + "gateway",
204
Namespace: item.Namespace,
205
GroupVersionKind: gvk.Gateway,
206
// Set this label so that we do not compare configs and just push.
207
Labels: map[string]string{constants.AlwaysPushLabel: "true"},
210
// Trigger updates for Gateway and VirtualService
211
// TODO: we could be smarter here and only trigger when real changes were found
212
for _, f := range c.virtualServiceHandlers {
213
f(config.Config{Meta: vsmetadata}, config.Config{Meta: vsmetadata}, event)
215
for _, f := range c.gatewayHandlers {
216
f(config.Config{Meta: gatewaymetadata}, config.Config{Meta: gatewaymetadata}, event)
222
func (c *controller) onServiceEvent(input any) {
223
event := input.(controllers.Event)
224
curSvc := event.Latest().(*corev1.Service)
226
// This is shortcut. We only care about the port number change if we receive service update event.
227
if event.Event == controllers.EventUpdate {
228
oldSvc := event.Old.(*corev1.Service)
229
oldPorts := extractPorts(oldSvc.Spec.Ports)
230
curPorts := extractPorts(curSvc.Spec.Ports)
231
// If the ports don't change, we do nothing.
232
if oldPorts.Equals(curPorts) {
237
// We care about add, delete and ports changed event of services that are referred
238
// by ingress using port name.
239
namespacedName := config.NamespacedName(curSvc).String()
240
for _, ingress := range c.ingress.List(curSvc.GetNamespace(), klabels.Everything()) {
241
referredSvcSet := extractServicesByPortNameType(ingress)
242
if referredSvcSet.Contains(namespacedName) {
243
c.queue.AddObject(ingress)
248
func (c *controller) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
250
case gvk.VirtualService:
251
c.virtualServiceHandlers = append(c.virtualServiceHandlers, f)
253
c.gatewayHandlers = append(c.gatewayHandlers, f)
257
func (c *controller) HasSynced() bool {
258
return c.queue.HasSynced()
261
func (c *controller) Schemas() collection.Schemas {
262
// TODO: are these two config descriptors right?
266
func (c *controller) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
270
// sortIngressByCreationTime sorts the list of config objects in ascending order by their creation time (if available).
271
func sortIngressByCreationTime(ingr []*knetworking.Ingress) []*knetworking.Ingress {
272
sort.Slice(ingr, func(i, j int) bool {
273
// If creation time is the same, then behavior is nondeterministic. In this case, we can
274
// pick an arbitrary but consistent ordering based on name and namespace, which is unique.
275
// CreationTimestamp is stored in seconds, so this is not uncommon.
276
if ingr[i].CreationTimestamp == ingr[j].CreationTimestamp {
277
in := ingr[i].Name + "." + ingr[i].Namespace
278
jn := ingr[j].Name + "." + ingr[j].Namespace
281
return ingr[i].CreationTimestamp.Before(&ingr[j].CreationTimestamp)
286
func (c *controller) List(typ config.GroupVersionKind, namespace string) []config.Config {
287
if typ != gvk.Gateway &&
288
typ != gvk.VirtualService {
292
out := make([]config.Config, 0)
293
ingressByHost := map[string]*config.Config{}
294
for _, ingress := range sortIngressByCreationTime(c.ingress.List(namespace, klabels.Everything())) {
295
process := c.shouldProcessIngress(c.meshWatcher.Mesh(), ingress)
301
case gvk.VirtualService:
302
ConvertIngressVirtualService(*ingress, c.domainSuffix, ingressByHost, c.services)
304
gateways := ConvertIngressV1alpha3(*ingress, c.meshWatcher.Mesh(), c.domainSuffix)
305
out = append(out, gateways)
309
if typ == gvk.VirtualService {
310
for _, obj := range ingressByHost {
311
out = append(out, *obj)
318
// extractServicesByPortNameType extract services that are of port name type in the specified ingress resource.
319
func extractServicesByPortNameType(ingress *knetworking.Ingress) sets.String {
320
services := sets.String{}
321
for _, rule := range ingress.Spec.Rules {
322
if rule.HTTP == nil {
326
for _, route := range rule.HTTP.Paths {
327
if route.Backend.Service == nil {
331
if route.Backend.Service.Port.Name != "" {
332
services.Insert(types.NamespacedName{
333
Namespace: ingress.GetNamespace(),
334
Name: route.Backend.Service.Name,
342
func extractPorts(ports []corev1.ServicePort) sets.String {
343
result := sets.String{}
344
for _, port := range ports {
345
// the format is port number|port name.
346
result.Insert(fmt.Sprintf("%d|%s", port.Port, port.Name))
351
func (c *controller) Create(_ config.Config) (string, error) {
352
return "", errUnsupportedOp
355
func (c *controller) Update(_ config.Config) (string, error) {
356
return "", errUnsupportedOp
359
func (c *controller) UpdateStatus(config.Config) (string, error) {
360
return "", errUnsupportedOp
363
func (c *controller) Patch(_ config.Config, _ config.PatchFunc) (string, error) {
364
return "", errUnsupportedOp
367
func (c *controller) Delete(_ config.GroupVersionKind, _, _ string, _ *string) error {
368
return errUnsupportedOp