22
corev1 "k8s.io/api/core/v1"
23
knetworking "k8s.io/api/networking/v1"
24
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
"k8s.io/apimachinery/pkg/labels"
26
"k8s.io/apimachinery/pkg/types"
28
istiolabels "istio.io/istio/pkg/config/labels"
29
"istio.io/istio/pkg/config/mesh"
30
kubelib "istio.io/istio/pkg/kube"
31
"istio.io/istio/pkg/kube/controllers"
32
"istio.io/istio/pkg/kube/kclient"
33
"istio.io/istio/pkg/log"
34
netutil "istio.io/istio/pkg/util/net"
37
var statusLog = log.RegisterScope("ingress status", "")
40
type StatusSyncer struct {
41
meshConfig mesh.Watcher
43
queue controllers.Queue
44
ingresses kclient.Client[*knetworking.Ingress]
45
ingressClasses kclient.Client[*knetworking.IngressClass]
46
pods kclient.Client[*corev1.Pod]
47
services kclient.Client[*corev1.Service]
48
nodes kclient.Client[*corev1.Node]
52
func (s *StatusSyncer) Run(stopCh <-chan struct{}) {
54
controllers.ShutdownAll(s.services, s.nodes, s.pods, s.ingressClasses, s.ingresses)
58
func NewStatusSyncer(meshHolder mesh.Watcher, kc kubelib.Client) *StatusSyncer {
60
meshConfig: meshHolder,
61
ingresses: kclient.NewFiltered[*knetworking.Ingress](kc, kclient.Filter{ObjectFilter: kc.ObjectFilter()}),
62
ingressClasses: kclient.New[*knetworking.IngressClass](kc),
63
pods: kclient.NewFiltered[*corev1.Pod](kc, kclient.Filter{
64
ObjectFilter: kc.ObjectFilter(),
65
ObjectTransform: kubelib.StripPodUnusedFields,
67
services: kclient.NewFiltered[*corev1.Service](kc, kclient.Filter{ObjectFilter: kc.ObjectFilter()}),
68
nodes: kclient.NewFiltered[*corev1.Node](kc, kclient.Filter{
69
ObjectTransform: kubelib.StripNodeUnusedFields,
72
c.queue = controllers.NewQueue("ingress status",
73
controllers.WithReconciler(c.Reconcile),
74
controllers.WithMaxAttempts(5))
77
c.ingresses.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
79
c.ingressClasses.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
84
c.services.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
85
if o.GetName() == c.meshConfig.Mesh().IngressService && o.GetNamespace() == IngressNamespace {
90
c.pods.AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
91
if c.meshConfig.Mesh().IngressService != "" {
95
ingressSelector := c.meshConfig.Mesh().IngressSelector
98
igSelector := getIngressGatewaySelector(ingressSelector, "")
99
if istiolabels.Instance(igSelector).SubsetOf(o.GetLabels()) {
105
c.meshConfig.AddMeshHandler(c.enqueueAll)
111
func (s *StatusSyncer) runningAddresses() []string {
112
addrs := make([]string, 0)
113
ingressService := s.meshConfig.Mesh().IngressService
114
ingressSelector := s.meshConfig.Mesh().IngressSelector
116
if ingressService != "" {
117
svc := s.services.Get(ingressService, IngressNamespace)
122
if svc.Spec.Type == corev1.ServiceTypeExternalName {
123
addrs = append(addrs, svc.Spec.ExternalName)
128
for _, ip := range svc.Status.LoadBalancer.Ingress {
130
addrs = append(addrs, ip.Hostname)
132
addrs = append(addrs, ip.IP)
136
addrs = append(addrs, svc.Spec.ExternalIPs...)
141
igSelector := getIngressGatewaySelector(ingressSelector, ingressService)
142
igPods := s.pods.List(IngressNamespace, labels.SelectorFromSet(igSelector))
144
for _, pod := range igPods {
146
if pod.Status.Phase != corev1.PodRunning {
151
node := s.nodes.Get(pod.Spec.NodeName, "")
156
for _, address := range node.Status.Addresses {
157
if address.Type == corev1.NodeExternalIP {
158
if address.Address != "" && !addressInSlice(address.Address, addrs) {
159
addrs = append(addrs, address.Address)
168
func addressInSlice(addr string, list []string) bool {
169
for _, v := range list {
179
func sliceToStatus(endpoints []string) []knetworking.IngressLoadBalancerIngress {
180
lbi := make([]knetworking.IngressLoadBalancerIngress, 0, len(endpoints))
181
for _, ep := range endpoints {
182
if !netutil.IsValidIPAddress(ep) {
183
lbi = append(lbi, knetworking.IngressLoadBalancerIngress{Hostname: ep})
185
lbi = append(lbi, knetworking.IngressLoadBalancerIngress{IP: ep})
189
sort.SliceStable(lbi, lessLoadBalancerIngress(lbi))
193
func lessLoadBalancerIngress(addrs []knetworking.IngressLoadBalancerIngress) func(int, int) bool {
194
return func(a, b int) bool {
195
switch strings.Compare(addrs[a].Hostname, addrs[b].Hostname) {
201
return addrs[a].IP < addrs[b].IP
205
func ingressSliceEqual(lhs, rhs []knetworking.IngressLoadBalancerIngress) bool {
206
if len(lhs) != len(rhs) {
211
if lhs[i].IP != rhs[i].IP {
214
if lhs[i].Hostname != rhs[i].Hostname {
222
func (s *StatusSyncer) shouldTargetIngress(ingress *knetworking.Ingress) bool {
223
var ingressClass *knetworking.IngressClass
224
if ingress.Spec.IngressClassName != nil {
225
ingressClass = s.ingressClasses.Get(*ingress.Spec.IngressClassName, "")
227
return shouldProcessIngressWithClass(s.meshConfig.Mesh(), ingress, ingressClass)
230
func (s *StatusSyncer) enqueueAll() {
231
for _, ing := range s.ingresses.List(metav1.NamespaceAll, labels.Everything()) {
232
s.queue.AddObject(ing)
236
func (s *StatusSyncer) Reconcile(key types.NamespacedName) error {
237
log := statusLog.WithLabels("ingress", key)
238
ing := s.ingresses.Get(key.Name, key.Namespace)
240
log.Debugf("ingress removed, no action")
243
shouldTarget := s.shouldTargetIngress(ing)
245
log.Debugf("ingress not selected, no action")
249
curIPs := ing.Status.LoadBalancer.Ingress
250
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
252
wantIPs := sliceToStatus(s.runningAddresses())
254
if ingressSliceEqual(wantIPs, curIPs) {
255
log.Debugf("ingress has no change, no action")
259
log.Infof("updating IPs (%v)", wantIPs)
261
ing.Status.LoadBalancer.Ingress = wantIPs
262
_, err := s.ingresses.UpdateStatus(ing)
264
return fmt.Errorf("error updating ingress status: %v", err)