istio

Форк
0
267 строк · 7.9 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package ingress
16

17
import (
18
	"fmt"
19
	"sort"
20
	"strings"
21

22
	corev1 "k8s.io/api/core/v1"
23
	knetworking "k8s.io/api/networking/v1"
24
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
	"k8s.io/apimachinery/pkg/labels"
26
	"k8s.io/apimachinery/pkg/types"
27

28
	istiolabels "istio.io/istio/pkg/config/labels"
29
	"istio.io/istio/pkg/config/mesh"
30
	kubelib "istio.io/istio/pkg/kube"
31
	"istio.io/istio/pkg/kube/controllers"
32
	"istio.io/istio/pkg/kube/kclient"
33
	"istio.io/istio/pkg/log"
34
	netutil "istio.io/istio/pkg/util/net"
35
)
36

37
var statusLog = log.RegisterScope("ingress status", "")
38

39
// StatusSyncer keeps the status IP in each Ingress resource updated
40
type StatusSyncer struct {
41
	meshConfig mesh.Watcher
42

43
	queue          controllers.Queue
44
	ingresses      kclient.Client[*knetworking.Ingress]
45
	ingressClasses kclient.Client[*knetworking.IngressClass]
46
	pods           kclient.Client[*corev1.Pod]
47
	services       kclient.Client[*corev1.Service]
48
	nodes          kclient.Client[*corev1.Node]
49
}
50

51
// Run the syncer until stopCh is closed
52
func (s *StatusSyncer) Run(stopCh <-chan struct{}) {
53
	s.queue.Run(stopCh)
54
	controllers.ShutdownAll(s.services, s.nodes, s.pods, s.ingressClasses, s.ingresses)
55
}
56

57
// NewStatusSyncer creates a new instance
58
func NewStatusSyncer(meshHolder mesh.Watcher, kc kubelib.Client) *StatusSyncer {
59
	c := &StatusSyncer{
60
		meshConfig:     meshHolder,
61
		ingresses:      kclient.NewFiltered[*knetworking.Ingress](kc, kclient.Filter{ObjectFilter: kc.ObjectFilter()}),
62
		ingressClasses: kclient.New[*knetworking.IngressClass](kc),
63
		pods: kclient.NewFiltered[*corev1.Pod](kc, kclient.Filter{
64
			ObjectFilter:    kc.ObjectFilter(),
65
			ObjectTransform: kubelib.StripPodUnusedFields,
66
		}),
67
		services: kclient.NewFiltered[*corev1.Service](kc, kclient.Filter{ObjectFilter: kc.ObjectFilter()}),
68
		nodes: kclient.NewFiltered[*corev1.Node](kc, kclient.Filter{
69
			ObjectTransform: kubelib.StripNodeUnusedFields,
70
		}),
71
	}
72
	c.queue = controllers.NewQueue("ingress status",
73
		controllers.WithReconciler(c.Reconcile),
74
		controllers.WithMaxAttempts(5))
75

76
	// For any ingress change, enqueue it - we may need to update the status.
77
	c.ingresses.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
78
	// For any class change, sync all ingress; the handler will filter non-matching ones already
79
	c.ingressClasses.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
80
		// Just sync them all
81
		c.enqueueAll()
82
	}))
83
	// For services, we queue all Ingress if its the ingress service
84
	c.services.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
85
		if o.GetName() == c.meshConfig.Mesh().IngressService && o.GetNamespace() == IngressNamespace {
86
			c.enqueueAll()
87
		}
88
	}))
89
	// For pods, we enqueue all Ingress if its part of the ingress service
90
	c.pods.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
91
		if c.meshConfig.Mesh().IngressService != "" {
92
			// Ingress Service takes precedence
93
			return
94
		}
95
		ingressSelector := c.meshConfig.Mesh().IngressSelector
96

97
		// get all pods acting as ingress gateways
98
		igSelector := getIngressGatewaySelector(ingressSelector, "")
99
		if istiolabels.Instance(igSelector).SubsetOf(o.GetLabels()) {
100
			// Ingress selector matches this pod, enqueue everything
101
			c.enqueueAll()
102
		}
103
	}))
104
	// Mesh may have changed ingress fields, enqueue everything
105
	c.meshConfig.AddMeshHandler(c.enqueueAll)
106
	return c
107
}
108

109
// runningAddresses returns a list of IP addresses and/or FQDN in the namespace
110
// where the ingress controller is currently running
111
func (s *StatusSyncer) runningAddresses() []string {
112
	addrs := make([]string, 0)
113
	ingressService := s.meshConfig.Mesh().IngressService
114
	ingressSelector := s.meshConfig.Mesh().IngressSelector
115

116
	if ingressService != "" {
117
		svc := s.services.Get(ingressService, IngressNamespace)
118
		if svc == nil {
119
			return nil
120
		}
121

122
		if svc.Spec.Type == corev1.ServiceTypeExternalName {
123
			addrs = append(addrs, svc.Spec.ExternalName)
124

125
			return addrs
126
		}
127

128
		for _, ip := range svc.Status.LoadBalancer.Ingress {
129
			if ip.IP == "" {
130
				addrs = append(addrs, ip.Hostname)
131
			} else {
132
				addrs = append(addrs, ip.IP)
133
			}
134
		}
135

136
		addrs = append(addrs, svc.Spec.ExternalIPs...)
137
		return addrs
138
	}
139

140
	// get all pods acting as ingress gateways
141
	igSelector := getIngressGatewaySelector(ingressSelector, ingressService)
142
	igPods := s.pods.List(IngressNamespace, labels.SelectorFromSet(igSelector))
143

144
	for _, pod := range igPods {
145
		// only Running pods are valid
146
		if pod.Status.Phase != corev1.PodRunning {
147
			continue
148
		}
149

150
		// Find node external IP
151
		node := s.nodes.Get(pod.Spec.NodeName, "")
152
		if node == nil {
153
			continue
154
		}
155

156
		for _, address := range node.Status.Addresses {
157
			if address.Type == corev1.NodeExternalIP {
158
				if address.Address != "" && !addressInSlice(address.Address, addrs) {
159
					addrs = append(addrs, address.Address)
160
				}
161
			}
162
		}
163
	}
164

165
	return addrs
166
}
167

168
func addressInSlice(addr string, list []string) bool {
169
	for _, v := range list {
170
		if v == addr {
171
			return true
172
		}
173
	}
174

175
	return false
176
}
177

178
// sliceToStatus converts a slice of IP and/or hostnames to LoadBalancerIngress
179
func sliceToStatus(endpoints []string) []knetworking.IngressLoadBalancerIngress {
180
	lbi := make([]knetworking.IngressLoadBalancerIngress, 0, len(endpoints))
181
	for _, ep := range endpoints {
182
		if !netutil.IsValidIPAddress(ep) {
183
			lbi = append(lbi, knetworking.IngressLoadBalancerIngress{Hostname: ep})
184
		} else {
185
			lbi = append(lbi, knetworking.IngressLoadBalancerIngress{IP: ep})
186
		}
187
	}
188

189
	sort.SliceStable(lbi, lessLoadBalancerIngress(lbi))
190
	return lbi
191
}
192

193
func lessLoadBalancerIngress(addrs []knetworking.IngressLoadBalancerIngress) func(int, int) bool {
194
	return func(a, b int) bool {
195
		switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
196
		case -1:
197
			return true
198
		case 1:
199
			return false
200
		}
201
		return addrs[a].IP < addrs[b].IP
202
	}
203
}
204

205
func ingressSliceEqual(lhs, rhs []knetworking.IngressLoadBalancerIngress) bool {
206
	if len(lhs) != len(rhs) {
207
		return false
208
	}
209

210
	for i := range lhs {
211
		if lhs[i].IP != rhs[i].IP {
212
			return false
213
		}
214
		if lhs[i].Hostname != rhs[i].Hostname {
215
			return false
216
		}
217
	}
218
	return true
219
}
220

221
// shouldTargetIngress determines whether the status watcher should target a given ingress resource
222
func (s *StatusSyncer) shouldTargetIngress(ingress *knetworking.Ingress) bool {
223
	var ingressClass *knetworking.IngressClass
224
	if ingress.Spec.IngressClassName != nil {
225
		ingressClass = s.ingressClasses.Get(*ingress.Spec.IngressClassName, "")
226
	}
227
	return shouldProcessIngressWithClass(s.meshConfig.Mesh(), ingress, ingressClass)
228
}
229

230
func (s *StatusSyncer) enqueueAll() {
231
	for _, ing := range s.ingresses.List(metav1.NamespaceAll, labels.Everything()) {
232
		s.queue.AddObject(ing)
233
	}
234
}
235

236
func (s *StatusSyncer) Reconcile(key types.NamespacedName) error {
237
	log := statusLog.WithLabels("ingress", key)
238
	ing := s.ingresses.Get(key.Name, key.Namespace)
239
	if ing == nil {
240
		log.Debugf("ingress removed, no action")
241
		return nil
242
	}
243
	shouldTarget := s.shouldTargetIngress(ing)
244
	if !shouldTarget {
245
		log.Debugf("ingress not selected, no action")
246
		return nil
247
	}
248

249
	curIPs := ing.Status.LoadBalancer.Ingress
250
	sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
251

252
	wantIPs := sliceToStatus(s.runningAddresses())
253

254
	if ingressSliceEqual(wantIPs, curIPs) {
255
		log.Debugf("ingress has no change, no action")
256
		return nil
257
	}
258

259
	log.Infof("updating IPs (%v)", wantIPs)
260
	ing = ing.DeepCopy()
261
	ing.Status.LoadBalancer.Ingress = wantIPs
262
	_, err := s.ingresses.UpdateStatus(ing)
263
	if err != nil {
264
		return fmt.Errorf("error updating ingress status: %v", err)
265
	}
266
	return nil
267
}
268

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.