istio

Форк
0
2905 строк · 84.9 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"context"
19
	"fmt"
20
	"net"
21
	"reflect"
22
	"sort"
23
	"strconv"
24
	"sync"
25
	"testing"
26
	"time"
27

28
	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
29
	"github.com/google/go-cmp/cmp"
30
	corev1 "k8s.io/api/core/v1"
31
	discovery "k8s.io/api/discovery/v1"
32
	"k8s.io/apimachinery/pkg/api/resource"
33
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
	"k8s.io/apimachinery/pkg/util/intstr"
35

36
	"istio.io/api/annotation"
37
	"istio.io/api/label"
38
	meshconfig "istio.io/api/mesh/v1alpha1"
39
	"istio.io/client-go/pkg/apis/networking/v1alpha3"
40
	"istio.io/istio/pilot/pkg/features"
41
	"istio.io/istio/pilot/pkg/model"
42
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
43
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
44
	labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
45
	"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
46
	"istio.io/istio/pkg/cluster"
47
	"istio.io/istio/pkg/config"
48
	"istio.io/istio/pkg/config/constants"
49
	"istio.io/istio/pkg/config/labels"
50
	"istio.io/istio/pkg/config/mesh"
51
	"istio.io/istio/pkg/config/protocol"
52
	"istio.io/istio/pkg/config/visibility"
53
	kubelib "istio.io/istio/pkg/kube"
54
	"istio.io/istio/pkg/kube/kclient/clienttest"
55
	"istio.io/istio/pkg/network"
56
	"istio.io/istio/pkg/test"
57
	"istio.io/istio/pkg/test/util/assert"
58
	"istio.io/istio/pkg/test/util/retry"
59
	"istio.io/istio/pkg/util/sets"
60
)
61

62
const (
63
	testService = "test"
64
)
65

66
// eventually polls cond until it completes (returns true) or times out (resulting in a test failure).
67
func eventually(t test.Failer, cond func() bool) {
68
	t.Helper()
69
	retry.UntilSuccessOrFail(t, func() error {
70
		if !cond() {
71
			return fmt.Errorf("failed to get positive condition")
72
		}
73
		return nil
74
	}, retry.Timeout(time.Second), retry.Delay(time.Millisecond*10))
75
}
76

77
func TestServices(t *testing.T) {
78
	networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{
79
		Networks: map[string]*meshconfig.Network{
80
			"network1": {
81
				Endpoints: []*meshconfig.Network_NetworkEndpoints{
82
					{
83
						Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
84
							FromCidr: "10.10.1.1/24",
85
						},
86
					},
87
				},
88
			},
89
			"network2": {
90
				Endpoints: []*meshconfig.Network_NetworkEndpoints{
91
					{
92
						Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
93
							FromCidr: "10.11.1.1/24",
94
						},
95
					},
96
				},
97
			},
98
		},
99
	})
100

101
	ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{NetworksWatcher: networksWatcher})
102
	t.Parallel()
103
	ns := "ns-test"
104

105
	hostname := kube.ServiceHostname(testService, ns, defaultFakeDomainSuffix)
106

107
	var sds model.ServiceDiscovery = ctl
108
	// "test", ports: http-example on 80
109
	makeService(testService, ns, ctl, t)
110

111
	eventually(t, func() bool {
112
		out := sds.Services()
113

114
		// Original test was checking for 'protocolTCP' - which is incorrect (the
115
		// port name is 'http'. It was working because the Service was created with
116
		// an invalid protocol, and the code was ignoring that ( not TCP/UDP).
117
		for _, item := range out {
118
			if item.Hostname == hostname &&
119
				len(item.Ports) == 1 &&
120
				item.Ports[0].Protocol == protocol.HTTP {
121
				return true
122
			}
123
		}
124
		return false
125
	})
126

127
	// 2 ports 1001, 2 IPs
128
	createEndpoints(t, ctl, testService, ns, []string{"http-example", "foo"}, []string{"10.10.1.1", "10.11.1.2"}, nil, nil)
129

130
	svc := sds.GetService(hostname)
131
	if svc == nil {
132
		t.Fatalf("GetService(%q) => should exists", hostname)
133
	}
134
	if svc.Hostname != hostname {
135
		t.Fatalf("GetService(%q) => %q", hostname, svc.Hostname)
136
	}
137
	assert.EventuallyEqual(t, func() int {
138
		ep := GetEndpointsForPort(svc, ctl.Endpoints, 80)
139
		return len(ep)
140
	}, 2)
141

142
	ep := GetEndpointsForPort(svc, ctl.Endpoints, 80)
143
	if len(ep) != 2 {
144
		t.Fatalf("Invalid response for GetEndpoints %v", ep)
145
	}
146

147
	if ep[0].Address == "10.10.1.1" && ep[0].Network != "network1" {
148
		t.Fatalf("Endpoint with IP 10.10.1.1 is expected to be in network1 but get: %s", ep[0].Network)
149
	}
150

151
	if ep[1].Address == "10.11.1.2" && ep[1].Network != "network2" {
152
		t.Fatalf("Endpoint with IP 10.11.1.2 is expected to be in network2 but get: %s", ep[1].Network)
153
	}
154

155
	missing := kube.ServiceHostname("does-not-exist", ns, defaultFakeDomainSuffix)
156
	svc = sds.GetService(missing)
157
	if svc != nil {
158
		t.Fatalf("GetService(%q) => %s, should not exist", missing, svc.Hostname)
159
	}
160
}
161

162
func makeService(n, ns string, cl *FakeController, t *testing.T) {
163
	clienttest.Wrap(t, cl.services).Create(&corev1.Service{
164
		ObjectMeta: metav1.ObjectMeta{Name: n, Namespace: ns},
165
		Spec: corev1.ServiceSpec{
166
			Ports: []corev1.ServicePort{
167
				{
168
					Port:     80,
169
					Name:     "http-example",
170
					Protocol: corev1.ProtocolTCP, // Not added automatically by fake
171
				},
172
			},
173
		},
174
	})
175
	log.Infof("Created service %s", n)
176
	cl.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
177
}
178

179
func TestController_GetPodLocality(t *testing.T) {
180
	pod1 := generatePod("128.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
181
	pod2 := generatePod("128.0.1.2", "pod2", "nsB", "", "node2", map[string]string{"app": "prod-app"}, map[string]string{})
182
	podOverride := generatePod("128.0.1.2", "pod2", "nsB", "",
183
		"node1", map[string]string{"app": "prod-app", model.LocalityLabel: "regionOverride.zoneOverride.subzoneOverride"}, map[string]string{})
184
	testCases := []struct {
185
		name   string
186
		pods   []*corev1.Pod
187
		nodes  []*corev1.Node
188
		wantAZ map[*corev1.Pod]string
189
	}{
190
		{
191
			name: "should return correct az for given address",
192
			pods: []*corev1.Pod{pod1, pod2},
193
			nodes: []*corev1.Node{
194
				generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
195
				generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2", label.TopologySubzone.Name: "subzone2"}),
196
			},
197
			wantAZ: map[*corev1.Pod]string{
198
				pod1: "region1/zone1/subzone1",
199
				pod2: "region2/zone2/subzone2",
200
			},
201
		},
202
		{
203
			name: "should return correct az for given address",
204
			pods: []*corev1.Pod{pod1, pod2},
205
			nodes: []*corev1.Node{
206
				generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1"}),
207
				generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2"}),
208
			},
209
			wantAZ: map[*corev1.Pod]string{
210
				pod1: "region1/zone1/",
211
				pod2: "region2/zone2/",
212
			},
213
		},
214
		{
215
			name: "should return false if pod isn't in the cache",
216
			wantAZ: map[*corev1.Pod]string{
217
				pod1: "",
218
				pod2: "",
219
			},
220
		},
221
		{
222
			name: "should return false if node isn't in the cache",
223
			pods: []*corev1.Pod{pod1, pod2},
224
			wantAZ: map[*corev1.Pod]string{
225
				pod1: "",
226
				pod2: "",
227
			},
228
		},
229
		{
230
			name: "should return correct az if node has only region label",
231
			pods: []*corev1.Pod{pod1, pod2},
232
			nodes: []*corev1.Node{
233
				generateNode("node1", map[string]string{NodeRegionLabel: "region1"}),
234
				generateNode("node2", map[string]string{NodeRegionLabel: "region2"}),
235
			},
236
			wantAZ: map[*corev1.Pod]string{
237
				pod1: "region1//",
238
				pod2: "region2//",
239
			},
240
		},
241
		{
242
			name: "should return correct az if node has only zone label",
243
			pods: []*corev1.Pod{pod1, pod2},
244
			nodes: []*corev1.Node{
245
				generateNode("node1", map[string]string{NodeZoneLabel: "zone1"}),
246
				generateNode("node2", map[string]string{NodeZoneLabel: "zone2"}),
247
			},
248
			wantAZ: map[*corev1.Pod]string{
249
				pod1: "/zone1/",
250
				pod2: "/zone2/",
251
			},
252
		},
253
		{
254
			name: "should return correct az if node has only subzone label",
255
			pods: []*corev1.Pod{pod1, pod2},
256
			nodes: []*corev1.Node{
257
				generateNode("node1", map[string]string{label.TopologySubzone.Name: "subzone1"}),
258
				generateNode("node2", map[string]string{label.TopologySubzone.Name: "subzone2"}),
259
			},
260
			wantAZ: map[*corev1.Pod]string{
261
				pod1: "//subzone1",
262
				pod2: "//subzone2",
263
			},
264
		},
265
		{
266
			name: "should return correct az for given address",
267
			pods: []*corev1.Pod{podOverride},
268
			nodes: []*corev1.Node{
269
				generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
270
			},
271
			wantAZ: map[*corev1.Pod]string{
272
				podOverride: "regionOverride/zoneOverride/subzoneOverride",
273
			},
274
		},
275
	}
276

277
	for _, tc := range testCases {
278
		// If using t.Parallel() you must copy the iteration to a new local variable
279
		// https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
280
		tc := tc
281
		t.Run(tc.name, func(t *testing.T) {
282
			t.Parallel()
283
			// Setup kube caches
284
			controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
285

286
			addNodes(t, controller, tc.nodes...)
287
			addPods(t, controller, fx, tc.pods...)
288

289
			// Verify expected existing pod AZs
290
			for pod, wantAZ := range tc.wantAZ {
291
				az := controller.getPodLocality(pod)
292
				if wantAZ != "" {
293
					if !reflect.DeepEqual(az, wantAZ) {
294
						t.Fatalf("Wanted az: %s, got: %s", wantAZ, az)
295
					}
296
				} else {
297
					if az != "" {
298
						t.Fatalf("Unexpectedly found az: %s for pod: %s", az, pod.ObjectMeta.Name)
299
					}
300
				}
301
			}
302
		})
303
	}
304
}
305

306
func TestProxyK8sHostnameLabel(t *testing.T) {
307
	clusterID := cluster.ID("fakeCluster")
308
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
309
		ClusterID: clusterID,
310
	})
311

312
	pod := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
313
	addPods(t, controller, fx, pod)
314

315
	proxy := &model.Proxy{
316
		Type:        model.Router,
317
		IPAddresses: []string{"128.0.0.1"},
318
		ID:          "pod1.nsa",
319
		DNSDomain:   "nsa.svc.cluster.local",
320
		Metadata:    &model.NodeMetadata{Namespace: "nsa", ClusterID: clusterID},
321
	}
322
	got := controller.GetProxyWorkloadLabels(proxy)
323
	if pod.Spec.NodeName != got[labelutil.LabelHostname] {
324
		t.Fatalf("expected node name %v, got %v", pod.Spec.NodeName, got[labelutil.LabelHostname])
325
	}
326
}
327

328
func TestGetProxyServiceTargets(t *testing.T) {
329
	clusterID := cluster.ID("fakeCluster")
330
	networkID := network.ID("fakeNetwork")
331
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
332
		ClusterID: clusterID,
333
	})
334
	// add a network ID to test endpoints include topology.istio.io/network label
335
	controller.network = networkID
336

337
	p := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
338
	addPods(t, controller, fx, p)
339

340
	k8sSaOnVM := "acct4"
341
	canonicalSaOnVM := "acctvm2@gserviceaccount2.com"
342

343
	createServiceWait(controller, "svc1", "nsa", nil,
344
		map[string]string{
345
			annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
346
			annotation.AlphaCanonicalServiceAccounts.Name:  canonicalSaOnVM,
347
		},
348
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
349

350
	// Endpoints are generated by Kubernetes from pod labels and service selectors.
351
	// Here we manually create them for mocking purpose.
352
	svc1Ips := []string{"128.0.0.1"}
353
	portNames := []string{"tcp-port"}
354
	// Create 1 endpoint that refers to a pod in the same namespace.
355
	createEndpoints(t, controller, "svc1", "nsA", portNames, svc1Ips, nil, nil)
356

357
	// Creates 100 endpoints that refers to a pod in a different namespace.
358
	fakeSvcCounts := 100
359
	for i := 0; i < fakeSvcCounts; i++ {
360
		svcName := fmt.Sprintf("svc-fake-%d", i)
361
		createServiceWait(controller, svcName, "nsfake", nil,
362
			map[string]string{
363
				annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
364
				annotation.AlphaCanonicalServiceAccounts.Name:  canonicalSaOnVM,
365
			},
366
			[]int32{8080}, map[string]string{"app": "prod-app"}, t)
367

368
		createEndpoints(t, controller, svcName, "nsfake", portNames, svc1Ips, nil, nil)
369
		fx.WaitOrFail(t, "eds")
370
	}
371

372
	// Create 1 endpoint that refers to a pod in the same namespace.
373
	createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
374
	fx.WaitOrFail(t, "eds")
375

376
	// this can test get pod by proxy ID
377
	svcNode := &model.Proxy{
378
		Type:        model.Router,
379
		IPAddresses: []string{"128.0.0.1"},
380
		ID:          "pod1.nsa",
381
		DNSDomain:   "nsa.svc.cluster.local",
382
		Metadata:    &model.NodeMetadata{Namespace: "nsa", ClusterID: clusterID},
383
	}
384
	serviceInstances := controller.GetProxyServiceTargets(svcNode)
385

386
	if len(serviceInstances) != 1 {
387
		t.Fatalf("GetProxyServiceTargets() expected 1 instance, got %d", len(serviceInstances))
388
	}
389

390
	hostname := kube.ServiceHostname("svc1", "nsa", defaultFakeDomainSuffix)
391
	if serviceInstances[0].Service.Hostname != hostname {
392
		t.Fatalf("GetProxyServiceTargets() wrong service instance returned => hostname %q, want %q",
393
			serviceInstances[0].Service.Hostname, hostname)
394
	}
395

396
	// Test that we can look up instances just by Proxy metadata
397
	metaServices := controller.GetProxyServiceTargets(&model.Proxy{
398
		Type:            "sidecar",
399
		IPAddresses:     []string{"1.1.1.1"},
400
		Locality:        &core.Locality{Region: "r", Zone: "z"},
401
		ConfigNamespace: "nsa",
402
		Labels: map[string]string{
403
			"app":                      "prod-app",
404
			label.SecurityTlsMode.Name: "mutual",
405
		},
406
		Metadata: &model.NodeMetadata{
407
			ServiceAccount: "account",
408
			ClusterID:      clusterID,
409
			Labels: map[string]string{
410
				"app":                      "prod-app",
411
				label.SecurityTlsMode.Name: "mutual",
412
			},
413
		},
414
	})
415

416
	expected := model.ServiceTarget{
417
		Service: &model.Service{
418
			Hostname: "svc1.nsa.svc.company.com",
419
			ClusterVIPs: model.AddressMap{
420
				Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
421
			},
422
			DefaultAddress:  "10.0.0.1",
423
			Ports:           []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
424
			ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
425
			Attributes: model.ServiceAttributes{
426
				ServiceRegistry: provider.Kubernetes,
427
				Name:            "svc1",
428
				Namespace:       "nsa",
429
				LabelSelectors:  map[string]string{"app": "prod-app"},
430
				K8sAttributes: model.K8sAttributes{
431
					Type: string(corev1.ServiceTypeClusterIP),
432
				},
433
			},
434
		},
435
		Port: model.ServiceInstancePort{
436
			ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
437
			TargetPort:  0,
438
		},
439
	}
440

441
	if len(metaServices) != 1 {
442
		t.Fatalf("expected 1 instance, got %v", len(metaServices))
443
	}
444
	if !reflect.DeepEqual(expected, metaServices[0]) {
445
		t.Fatalf("expected instance %v, got %v", expected, metaServices[0])
446
	}
447

448
	// Test that we first look up instances by Proxy pod
449

450
	node := generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"})
451
	addNodes(t, controller, node)
452

453
	// 1. pod without `istio-locality` label, get locality from node label.
454
	p = generatePod("129.0.0.1", "pod2", "nsa", "svcaccount", "node1",
455
		map[string]string{"app": "prod-app"}, nil)
456
	addPods(t, controller, fx, p)
457

458
	// this can test get pod by proxy ip address
459
	podServices := controller.GetProxyServiceTargets(&model.Proxy{
460
		Type:            "sidecar",
461
		IPAddresses:     []string{"129.0.0.1"},
462
		Locality:        &core.Locality{Region: "r", Zone: "z"},
463
		ConfigNamespace: "nsa",
464
		Labels: map[string]string{
465
			"app": "prod-app",
466
		},
467
		Metadata: &model.NodeMetadata{
468
			ServiceAccount: "account",
469
			ClusterID:      clusterID,
470
			Labels: map[string]string{
471
				"app": "prod-app",
472
			},
473
		},
474
	})
475

476
	expected = model.ServiceTarget{
477
		Service: &model.Service{
478
			Hostname: "svc1.nsa.svc.company.com",
479
			ClusterVIPs: model.AddressMap{
480
				Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
481
			},
482
			DefaultAddress:  "10.0.0.1",
483
			Ports:           []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
484
			ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
485
			Attributes: model.ServiceAttributes{
486
				ServiceRegistry: provider.Kubernetes,
487
				Name:            "svc1",
488
				Namespace:       "nsa",
489
				LabelSelectors:  map[string]string{"app": "prod-app"},
490
				K8sAttributes: model.K8sAttributes{
491
					Type: string(corev1.ServiceTypeClusterIP),
492
				},
493
			},
494
		},
495
		Port: model.ServiceInstancePort{
496
			ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
497
			TargetPort:  0,
498
		},
499
	}
500
	if len(podServices) != 1 {
501
		t.Fatalf("expected 1 instance, got %v", len(podServices))
502
	}
503
	if !reflect.DeepEqual(expected, podServices[0]) {
504
		t.Fatalf("expected instance %v, got %v", expected, podServices[0])
505
	}
506

507
	// 2. pod with `istio-locality` label, ignore node label.
508
	p = generatePod("129.0.0.2", "pod3", "nsa", "svcaccount", "node1",
509
		map[string]string{"app": "prod-app", "istio-locality": "region.zone"}, nil)
510
	addPods(t, controller, fx, p)
511

512
	// this can test get pod by proxy ip address
513
	podServices = controller.GetProxyServiceTargets(&model.Proxy{
514
		Type:            "sidecar",
515
		IPAddresses:     []string{"129.0.0.2"},
516
		Locality:        &core.Locality{Region: "r", Zone: "z"},
517
		ConfigNamespace: "nsa",
518
		Labels: map[string]string{
519
			"app": "prod-app",
520
		},
521
		Metadata: &model.NodeMetadata{
522
			ServiceAccount: "account",
523
			ClusterID:      clusterID,
524
			Labels: map[string]string{
525
				"app": "prod-app",
526
			},
527
		},
528
	})
529

530
	expected = model.ServiceTarget{
531
		Service: &model.Service{
532
			Hostname: "svc1.nsa.svc.company.com",
533
			ClusterVIPs: model.AddressMap{
534
				Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
535
			},
536
			DefaultAddress:  "10.0.0.1",
537
			Ports:           []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
538
			ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
539
			Attributes: model.ServiceAttributes{
540
				ServiceRegistry: provider.Kubernetes,
541
				Name:            "svc1",
542
				Namespace:       "nsa",
543
				LabelSelectors:  map[string]string{"app": "prod-app"},
544
				K8sAttributes: model.K8sAttributes{
545
					Type: string(corev1.ServiceTypeClusterIP),
546
				},
547
			},
548
		},
549
		Port: model.ServiceInstancePort{
550
			ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
551
		},
552
	}
553
	if len(podServices) != 1 {
554
		t.Fatalf("expected 1 instance, got %v", len(podServices))
555
	}
556
	if !reflect.DeepEqual(expected, podServices[0]) {
557
		t.Fatalf("expected instance %v, got %v", expected, podServices[0])
558
	}
559

560
	// pod with no services should return no service targets
561
	p = generatePod("130.0.0.1", "pod4", "nsa", "foo", "node1", map[string]string{"app": "no-service-app"}, map[string]string{})
562
	addPods(t, controller, fx, p)
563

564
	podServices = controller.GetProxyServiceTargets(&model.Proxy{
565
		Type:            "sidecar",
566
		IPAddresses:     []string{"130.0.0.1"},
567
		Locality:        &core.Locality{Region: "r", Zone: "z"},
568
		ConfigNamespace: "nsa",
569
		Labels: map[string]string{
570
			"app": "no-service-app",
571
		},
572
		Metadata: &model.NodeMetadata{
573
			ServiceAccount: "account",
574
			ClusterID:      clusterID,
575
			Labels: map[string]string{
576
				"app": "no-service-app",
577
			},
578
		},
579
	})
580
	if len(podServices) != 0 {
581
		t.Fatalf("expect 0 instance, got %v", len(podServices))
582
	}
583
}
584

585
func TestGetProxyServiceTargetsWithMultiIPsAndTargetPorts(t *testing.T) {
586
	pod1 := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
587
	testCases := []struct {
588
		name      string
589
		pods      []*corev1.Pod
590
		ips       []string
591
		ports     []corev1.ServicePort
592
		wantPorts []model.ServiceInstancePort
593
	}{
594
		{
595
			name: "multiple proxy ips single port",
596
			pods: []*corev1.Pod{pod1},
597
			ips:  []string{"128.0.0.1", "192.168.2.6"},
598
			ports: []corev1.ServicePort{
599
				{
600
					Name:       "tcp-port",
601
					Port:       8080,
602
					Protocol:   "http",
603
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
604
				},
605
			},
606
			wantPorts: []model.ServiceInstancePort{
607
				{
608
					ServicePort: &model.Port{
609
						Name:     "tcp-port",
610
						Port:     8080,
611
						Protocol: "TCP",
612
					},
613
					TargetPort: 8080,
614
				},
615
			},
616
		},
617
		{
618
			name: "single proxy ip single port",
619
			pods: []*corev1.Pod{pod1},
620
			ips:  []string{"128.0.0.1"},
621
			ports: []corev1.ServicePort{
622
				{
623
					Name:       "tcp-port",
624
					Port:       8080,
625
					Protocol:   "TCP",
626
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
627
				},
628
			},
629
			wantPorts: []model.ServiceInstancePort{
630
				{
631
					ServicePort: &model.Port{
632
						Name:     "tcp-port",
633
						Port:     8080,
634
						Protocol: "TCP",
635
					},
636
					TargetPort: 8080,
637
				},
638
			},
639
		},
640
		{
641
			name: "multiple proxy ips multiple ports",
642
			pods: []*corev1.Pod{pod1},
643
			ips:  []string{"128.0.0.1", "192.168.2.6"},
644
			ports: []corev1.ServicePort{
645
				{
646
					Name:       "tcp-port-1",
647
					Port:       8080,
648
					Protocol:   "http",
649
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
650
				},
651
				{
652
					Name:       "tcp-port-2",
653
					Port:       9090,
654
					Protocol:   "http",
655
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
656
				},
657
			},
658
			wantPorts: []model.ServiceInstancePort{
659
				{
660
					ServicePort: &model.Port{
661
						Name:     "tcp-port-1",
662
						Port:     8080,
663
						Protocol: "TCP",
664
					},
665
					TargetPort: 8080,
666
				},
667
				{
668
					ServicePort: &model.Port{
669
						Name:     "tcp-port-2",
670
						Port:     9090,
671
						Protocol: "TCP",
672
					},
673
					TargetPort: 9090,
674
				},
675
				{
676
					ServicePort: &model.Port{
677
						Name:     "tcp-port-1",
678
						Port:     7442,
679
						Protocol: "TCP",
680
					},
681
					TargetPort: 7442,
682
				},
683
			},
684
		},
685
		{
686
			name: "single proxy ip multiple ports same target port with different protocols",
687
			pods: []*corev1.Pod{pod1},
688
			ips:  []string{"128.0.0.1"},
689
			ports: []corev1.ServicePort{
690
				{
691
					Name:       "tcp-port",
692
					Port:       8080,
693
					Protocol:   "TCP",
694
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
695
				},
696
				{
697
					Name:       "http-port",
698
					Port:       9090,
699
					Protocol:   "TCP",
700
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
701
				},
702
			},
703
			wantPorts: []model.ServiceInstancePort{
704
				{
705
					ServicePort: &model.Port{
706
						Name:     "tcp-port",
707
						Port:     8080,
708
						Protocol: "TCP",
709
					},
710
					TargetPort: 8080,
711
				},
712
				{
713
					ServicePort: &model.Port{
714
						Name:     "http-port",
715
						Port:     9090,
716
						Protocol: "HTTP",
717
					},
718
					TargetPort: 8080,
719
				},
720
			},
721
		},
722
		{
723
			name: "single proxy ip multiple ports same target port with overlapping protocols",
724
			pods: []*corev1.Pod{pod1},
725
			ips:  []string{"128.0.0.1"},
726
			ports: []corev1.ServicePort{
727
				{
728
					Name:       "http-7442",
729
					Port:       7442,
730
					Protocol:   "TCP",
731
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
732
				},
733
				{
734
					Name:       "tcp-8443",
735
					Port:       8443,
736
					Protocol:   "TCP",
737
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
738
				},
739
				{
740
					Name:       "http-7557",
741
					Port:       7557,
742
					Protocol:   "TCP",
743
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
744
				},
745
			},
746
			wantPorts: []model.ServiceInstancePort{
747
				{
748
					ServicePort: &model.Port{
749
						Name:     "http-7442",
750
						Port:     7442,
751
						Protocol: "HTTP",
752
					},
753
					TargetPort: 7442,
754
				},
755
				{
756
					ServicePort: &model.Port{
757
						Name:     "tcp-8443",
758
						Port:     8443,
759
						Protocol: "TCP",
760
					},
761
					TargetPort: 7442,
762
				},
763
			},
764
		},
765
		{
766
			name: "single proxy ip multiple ports",
767
			pods: []*corev1.Pod{pod1},
768
			ips:  []string{"128.0.0.1"},
769
			ports: []corev1.ServicePort{
770
				{
771
					Name:       "tcp-port",
772
					Port:       8080,
773
					Protocol:   "TCP",
774
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
775
				},
776
				{
777
					Name:       "http-port",
778
					Port:       9090,
779
					Protocol:   "TCP",
780
					TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
781
				},
782
			},
783
			wantPorts: []model.ServiceInstancePort{
784
				{
785
					ServicePort: &model.Port{
786
						Name:     "tcp-port",
787
						Port:     8080,
788
						Protocol: "TCP",
789
					},
790
					TargetPort: 8080,
791
				},
792
				{
793
					ServicePort: &model.Port{
794
						Name:     "http-port",
795
						Port:     9090,
796
						Protocol: "HTTP",
797
					},
798
					TargetPort: 9090,
799
				},
800
			},
801
		},
802
	}
803

804
	for _, c := range testCases {
805
		t.Run(c.name, func(t *testing.T) {
806
			// Setup kube caches
807
			controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
808

809
			addPods(t, controller, fx, c.pods...)
810

811
			createServiceWithTargetPorts(controller, "svc1", "nsa",
812
				map[string]string{
813
					annotation.AlphaKubernetesServiceAccounts.Name: "acct4",
814
					annotation.AlphaCanonicalServiceAccounts.Name:  "acctvm2@gserviceaccount2.com",
815
				},
816
				c.ports, map[string]string{"app": "test-app"}, t)
817

818
			serviceInstances := controller.GetProxyServiceTargets(&model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: c.ips})
819

820
			for i, svc := range serviceInstances {
821
				assert.Equal(t, svc.Port, c.wantPorts[i])
822
			}
823
		})
824
	}
825
}
826

827
func TestGetProxyServiceTargets_WorkloadInstance(t *testing.T) {
828
	ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
829

830
	createServiceWait(ctl, "ratings", "bookinfo-ratings",
831
		map[string]string{},
832
		map[string]string{
833
			annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
834
			annotation.AlphaCanonicalServiceAccounts.Name:  "ratings@gserviceaccount2.com",
835
		},
836
		[]int32{8080}, map[string]string{"app": "ratings"}, t)
837

838
	createServiceWait(ctl, "details", "bookinfo-details",
839
		map[string]string{},
840
		map[string]string{
841
			annotation.AlphaKubernetesServiceAccounts.Name: "details",
842
			annotation.AlphaCanonicalServiceAccounts.Name:  "details@gserviceaccount2.com",
843
		},
844
		[]int32{9090}, map[string]string{"app": "details"}, t)
845

846
	createServiceWait(ctl, "reviews", "bookinfo-reviews",
847
		map[string]string{},
848
		map[string]string{
849
			annotation.AlphaKubernetesServiceAccounts.Name: "reviews",
850
			annotation.AlphaCanonicalServiceAccounts.Name:  "reviews@gserviceaccount2.com",
851
		},
852
		[]int32{7070}, map[string]string{"app": "reviews"}, t)
853

854
	wiRatings1 := &model.WorkloadInstance{
855
		Name:      "ratings-1",
856
		Namespace: "bookinfo-ratings",
857
		Endpoint: &model.IstioEndpoint{
858
			Labels:       labels.Instance{"app": "ratings"},
859
			Address:      "2.2.2.21",
860
			EndpointPort: 8080,
861
		},
862
	}
863

864
	wiDetails1 := &model.WorkloadInstance{
865
		Name:      "details-1",
866
		Namespace: "bookinfo-details",
867
		Endpoint: &model.IstioEndpoint{
868
			Labels:       labels.Instance{"app": "details"},
869
			Address:      "2.2.2.21",
870
			EndpointPort: 9090,
871
		},
872
	}
873

874
	wiReviews1 := &model.WorkloadInstance{
875
		Name:      "reviews-1",
876
		Namespace: "bookinfo-reviews",
877
		Endpoint: &model.IstioEndpoint{
878
			Labels:       labels.Instance{"app": "reviews"},
879
			Address:      "3.3.3.31",
880
			EndpointPort: 7070,
881
		},
882
	}
883

884
	wiReviews2 := &model.WorkloadInstance{
885
		Name:      "reviews-2",
886
		Namespace: "bookinfo-reviews",
887
		Endpoint: &model.IstioEndpoint{
888
			Labels:       labels.Instance{"app": "reviews"},
889
			Address:      "3.3.3.32",
890
			EndpointPort: 7071,
891
		},
892
	}
893

894
	wiProduct1 := &model.WorkloadInstance{
895
		Name:      "productpage-1",
896
		Namespace: "bookinfo-productpage",
897
		Endpoint: &model.IstioEndpoint{
898
			Labels:       labels.Instance{"app": "productpage"},
899
			Address:      "4.4.4.41",
900
			EndpointPort: 6060,
901
		},
902
	}
903

904
	for _, wi := range []*model.WorkloadInstance{wiRatings1, wiDetails1, wiReviews1, wiReviews2, wiProduct1} {
905
		ctl.workloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
906
	}
907

908
	cases := []struct {
909
		name  string
910
		proxy *model.Proxy
911
		want  []model.ServiceTarget
912
	}{
913
		{
914
			name:  "proxy with unspecified IP",
915
			proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: nil},
916
			want:  nil,
917
		},
918
		{
919
			name:  "proxy with IP not in the registry",
920
			proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"1.1.1.1"}},
921
			want:  nil,
922
		},
923
		{
924
			name:  "proxy with IP from the registry, 1 matching WE, but no matching Service",
925
			proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"4.4.4.41"}},
926
			want:  nil,
927
		},
928
		{
929
			name:  "proxy with IP from the registry, 1 matching WE, and matching Service",
930
			proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"3.3.3.31"}},
931
			want: []model.ServiceTarget{{
932
				Service: &model.Service{
933
					Hostname: "reviews.bookinfo-reviews.svc.company.com",
934
				},
935
				Port: model.ServiceInstancePort{
936
					ServicePort: nil,
937
					TargetPort:  7070,
938
				},
939
			}},
940
		},
941
		{
942
			name:  "proxy with IP from the registry, 2 matching WE, and matching Service",
943
			proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"}},
944
			want: []model.ServiceTarget{{
945
				Service: &model.Service{
946
					Hostname: "details.bookinfo-details.svc.company.com",
947
				},
948
				Port: model.ServiceInstancePort{
949
					ServicePort: nil,
950
					TargetPort:  9090,
951
				},
952
			}},
953
		},
954
		{
955
			name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE with a different address",
956
			proxy: &model.Proxy{
957
				Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
958
				ID: "reviews-1.bookinfo-reviews", ConfigNamespace: "bookinfo-reviews",
959
			},
960
			want: []model.ServiceTarget{{
961
				Service: &model.Service{
962
					Hostname: "details.bookinfo-details.svc.company.com",
963
				},
964
				Port: model.ServiceInstancePort{
965
					ServicePort: nil,
966
					TargetPort:  9090,
967
				},
968
			}},
969
		},
970
		{
971
			name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE name, but proxy.ID != proxy.ConfigNamespace",
972
			proxy: &model.Proxy{
973
				Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
974
				ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "wrong-namespace",
975
			},
976
			want: []model.ServiceTarget{{
977
				Service: &model.Service{
978
					Hostname: "details.bookinfo-details.svc.company.com",
979
				},
980
				Port: model.ServiceInstancePort{
981
					ServicePort: nil,
982
					TargetPort:  9090,
983
				},
984
			}},
985
		},
986
		{
987
			name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID == WE name",
988
			proxy: &model.Proxy{
989
				Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
990
				ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
991
			},
992
			want: []model.ServiceTarget{{
993
				Service: &model.Service{
994
					Hostname: "ratings.bookinfo-ratings.svc.company.com",
995
				},
996
				Port: model.ServiceInstancePort{
997
					ServicePort: nil,
998
					TargetPort:  8080,
999
				},
1000
			}},
1001
		},
1002
		{
1003
			name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID != WE name, but proxy.ConfigNamespace == WE namespace",
1004
			proxy: &model.Proxy{
1005
				Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
1006
				ID: "wrong-name.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
1007
			},
1008
			want: []model.ServiceTarget{{
1009
				Service: &model.Service{
1010
					Hostname: "ratings.bookinfo-ratings.svc.company.com",
1011
				},
1012
				Port: model.ServiceInstancePort{
1013
					ServicePort: nil,
1014
					TargetPort:  8080,
1015
				},
1016
			}},
1017
		},
1018
	}
1019

1020
	for _, tc := range cases {
1021
		t.Run(tc.name, func(t *testing.T) {
1022
			got := ctl.GetProxyServiceTargets(tc.proxy)
1023

1024
			if diff := cmp.Diff(len(tc.want), len(got)); diff != "" {
1025
				t.Fatalf("GetProxyServiceTargets() returned unexpected number of service instances (--want/++got): %v", diff)
1026
			}
1027

1028
			for i := range tc.want {
1029
				assert.Equal(t, tc.want[i].Service.Hostname, got[i].Service.Hostname)
1030
				assert.Equal(t, tc.want[i].Port.TargetPort, got[i].Port.TargetPort)
1031
			}
1032
		})
1033
	}
1034
}
1035

1036
func TestController_Service(t *testing.T) {
1037
	controller, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1038

1039
	// Use a timeout to keep the test from hanging.
1040

1041
	createServiceWait(controller, "svc1", "nsA",
1042
		map[string]string{}, map[string]string{},
1043
		[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1044
	createServiceWait(controller, "svc2", "nsA",
1045
		map[string]string{}, map[string]string{},
1046
		[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1047
	createServiceWait(controller, "svc3", "nsA",
1048
		map[string]string{}, map[string]string{},
1049
		[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1050
	createServiceWait(controller, "svc4", "nsA",
1051
		map[string]string{}, map[string]string{},
1052
		[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1053

1054
	expectedSvcList := []*model.Service{
1055
		{
1056
			Hostname:       kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1057
			DefaultAddress: "10.0.0.1",
1058
			Ports: model.PortList{
1059
				&model.Port{
1060
					Name:     "tcp-port",
1061
					Port:     8080,
1062
					Protocol: protocol.TCP,
1063
				},
1064
			},
1065
		},
1066
		{
1067
			Hostname:       kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1068
			DefaultAddress: "10.0.0.1",
1069
			Ports: model.PortList{
1070
				&model.Port{
1071
					Name:     "tcp-port",
1072
					Port:     8081,
1073
					Protocol: protocol.TCP,
1074
				},
1075
			},
1076
		},
1077
		{
1078
			Hostname:       kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
1079
			DefaultAddress: "10.0.0.1",
1080
			Ports: model.PortList{
1081
				&model.Port{
1082
					Name:     "tcp-port",
1083
					Port:     8082,
1084
					Protocol: protocol.TCP,
1085
				},
1086
			},
1087
		},
1088
		{
1089
			Hostname:       kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
1090
			DefaultAddress: "10.0.0.1",
1091
			Ports: model.PortList{
1092
				&model.Port{
1093
					Name:     "tcp-port",
1094
					Port:     8083,
1095
					Protocol: protocol.TCP,
1096
				},
1097
			},
1098
		},
1099
	}
1100

1101
	svcList := controller.Services()
1102
	servicesEqual(svcList, expectedSvcList)
1103
}
1104

1105
func TestController_ServiceWithFixedDiscoveryNamespaces(t *testing.T) {
1106
	meshWatcher := mesh.NewFixedWatcher(&meshconfig.MeshConfig{
1107
		DiscoverySelectors: []*metav1.LabelSelector{
1108
			{
1109
				MatchLabels: map[string]string{
1110
					"pilot-discovery": "enabled",
1111
				},
1112
			},
1113
			{
1114
				MatchExpressions: []metav1.LabelSelectorRequirement{
1115
					{
1116
						Key:      "env",
1117
						Operator: metav1.LabelSelectorOpIn,
1118
						Values:   []string{"test", "dev"},
1119
					},
1120
				},
1121
			},
1122
		},
1123
	})
1124

1125
	svc1 := &model.Service{
1126
		Hostname:       kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1127
		DefaultAddress: "10.0.0.1",
1128
		Ports: model.PortList{
1129
			&model.Port{
1130
				Name:     "tcp-port",
1131
				Port:     8080,
1132
				Protocol: protocol.TCP,
1133
			},
1134
		},
1135
	}
1136
	svc2 := &model.Service{
1137
		Hostname:       kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1138
		DefaultAddress: "10.0.0.1",
1139
		Ports: model.PortList{
1140
			&model.Port{
1141
				Name:     "tcp-port",
1142
				Port:     8081,
1143
				Protocol: protocol.TCP,
1144
			},
1145
		},
1146
	}
1147
	svc3 := &model.Service{
1148
		Hostname:       kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1149
		DefaultAddress: "10.0.0.1",
1150
		Ports: model.PortList{
1151
			&model.Port{
1152
				Name:     "tcp-port",
1153
				Port:     8082,
1154
				Protocol: protocol.TCP,
1155
			},
1156
		},
1157
	}
1158
	svc4 := &model.Service{
1159
		Hostname:       kube.ServiceHostname("svc4", "nsB", defaultFakeDomainSuffix),
1160
		DefaultAddress: "10.0.0.1",
1161
		Ports: model.PortList{
1162
			&model.Port{
1163
				Name:     "tcp-port",
1164
				Port:     8083,
1165
				Protocol: protocol.TCP,
1166
			},
1167
		},
1168
	}
1169

1170
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1171
		MeshWatcher: meshWatcher,
1172
	})
1173

1174
	nsA := "nsA"
1175
	nsB := "nsB"
1176

1177
	// event handlers should only be triggered for services in namespaces selected for discovery
1178
	createNamespace(t, controller.client.Kube(), nsA, map[string]string{"pilot-discovery": "enabled"})
1179
	createNamespace(t, controller.client.Kube(), nsB, map[string]string{})
1180

1181
	// service event handlers should trigger for svc1 and svc2
1182
	createServiceWait(controller, "svc1", nsA,
1183
		map[string]string{}, map[string]string{},
1184
		[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1185
	createServiceWait(controller, "svc2", nsA,
1186
		map[string]string{}, map[string]string{},
1187
		[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1188
	// service event handlers should not trigger for svc3 and svc4
1189
	createService(controller, "svc3", nsB,
1190
		map[string]string{}, map[string]string{},
1191
		[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1192
	createService(controller, "svc4", nsB,
1193
		map[string]string{}, map[string]string{},
1194
		[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1195

1196
	expectedSvcList := []*model.Service{svc1, svc2}
1197
	eventually(t, func() bool {
1198
		svcList := controller.Services()
1199
		return servicesEqual(svcList, expectedSvcList)
1200
	})
1201

1202
	// test updating namespace with adding discovery label
1203
	updateNamespace(t, controller.client.Kube(), nsB, map[string]string{"env": "test"})
1204
	// service event handlers should trigger for svc3 and svc4
1205
	fx.WaitOrFail(t, "service")
1206
	fx.WaitOrFail(t, "service")
1207
	expectedSvcList = []*model.Service{svc1, svc2, svc3, svc4}
1208
	eventually(t, func() bool {
1209
		svcList := controller.Services()
1210
		return servicesEqual(svcList, expectedSvcList)
1211
	})
1212

1213
	// test updating namespace by removing discovery label
1214
	updateNamespace(t, controller.client.Kube(), nsA, map[string]string{"pilot-discovery": "disabled"})
1215
	// service event handlers should trigger for svc1 and svc2
1216
	fx.WaitOrFail(t, "service")
1217
	fx.WaitOrFail(t, "service")
1218
	expectedSvcList = []*model.Service{svc3, svc4}
1219
	eventually(t, func() bool {
1220
		svcList := controller.Services()
1221
		return servicesEqual(svcList, expectedSvcList)
1222
	})
1223
}
1224

1225
func TestController_ServiceWithChangingDiscoveryNamespaces(t *testing.T) {
1226
	svc1 := &model.Service{
1227
		Hostname:       kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1228
		DefaultAddress: "10.0.0.1",
1229
		Ports: model.PortList{
1230
			&model.Port{
1231
				Name:     "tcp-port",
1232
				Port:     8080,
1233
				Protocol: protocol.TCP,
1234
			},
1235
		},
1236
	}
1237
	svc2 := &model.Service{
1238
		Hostname:       kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1239
		DefaultAddress: "10.0.0.1",
1240
		Ports: model.PortList{
1241
			&model.Port{
1242
				Name:     "tcp-port",
1243
				Port:     8081,
1244
				Protocol: protocol.TCP,
1245
			},
1246
		},
1247
	}
1248
	svc3 := &model.Service{
1249
		Hostname:       kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1250
		DefaultAddress: "10.0.0.1",
1251
		Ports: model.PortList{
1252
			&model.Port{
1253
				Name:     "tcp-port",
1254
				Port:     8082,
1255
				Protocol: protocol.TCP,
1256
			},
1257
		},
1258
	}
1259
	svc4 := &model.Service{
1260
		Hostname:       kube.ServiceHostname("svc4", "nsC", defaultFakeDomainSuffix),
1261
		DefaultAddress: "10.0.0.1",
1262
		Ports: model.PortList{
1263
			&model.Port{
1264
				Name:     "tcp-port",
1265
				Port:     8083,
1266
				Protocol: protocol.TCP,
1267
			},
1268
		},
1269
	}
1270

1271
	updateMeshConfig := func(
1272
		meshConfig *meshconfig.MeshConfig,
1273
		expectedSvcList []*model.Service,
1274
		expectedNumSvcEvents int,
1275
		testMeshWatcher *mesh.TestWatcher,
1276
		fx *xdsfake.Updater,
1277
		controller *FakeController,
1278
	) {
1279
		// update meshConfig
1280
		if err := testMeshWatcher.Update(meshConfig, time.Second*5); err != nil {
1281
			t.Fatalf("%v", err)
1282
		}
1283

1284
		// assert firing of service events
1285
		for i := 0; i < expectedNumSvcEvents; i++ {
1286
			fx.WaitOrFail(t, "service")
1287
		}
1288

1289
		eventually(t, func() bool {
1290
			svcList := controller.Services()
1291
			return servicesEqual(svcList, expectedSvcList)
1292
		})
1293
	}
1294

1295
	meshWatcher := mesh.NewTestWatcher(&meshconfig.MeshConfig{})
1296

1297
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1298
		MeshWatcher: meshWatcher,
1299
	})
1300

1301
	nsA := "nsA"
1302
	nsB := "nsB"
1303
	nsC := "nsC"
1304

1305
	createNamespace(t, controller.client.Kube(), nsA, map[string]string{"app": "foo"})
1306
	createNamespace(t, controller.client.Kube(), nsB, map[string]string{"app": "bar"})
1307
	createNamespace(t, controller.client.Kube(), nsC, map[string]string{"app": "baz"})
1308

1309
	// service event handlers should trigger for all svcs
1310
	createServiceWait(controller, "svc1", nsA,
1311
		map[string]string{}, map[string]string{},
1312
		[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1313
	createServiceWait(controller, "svc2", nsA,
1314
		map[string]string{}, map[string]string{},
1315
		[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1316
	createServiceWait(controller, "svc3", nsB,
1317
		map[string]string{}, map[string]string{},
1318
		[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1319
	createServiceWait(controller, "svc4", nsC,
1320
		map[string]string{}, map[string]string{},
1321
		[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1322

1323
	expectedSvcList := []*model.Service{svc1, svc2, svc3, svc4}
1324
	eventually(t, func() bool {
1325
		svcList := controller.Services()
1326
		return servicesEqual(svcList, expectedSvcList)
1327
	})
1328

1329
	// restrict namespaces to nsA (expect 2 delete events for svc3 and svc4)
1330
	updateMeshConfig(
1331
		&meshconfig.MeshConfig{
1332
			DiscoverySelectors: []*metav1.LabelSelector{
1333
				{
1334
					MatchLabels: map[string]string{
1335
						"app": "foo",
1336
					},
1337
				},
1338
			},
1339
		},
1340
		[]*model.Service{svc1, svc2},
1341
		2,
1342
		meshWatcher,
1343
		fx,
1344
		controller,
1345
	)
1346

1347
	// restrict namespaces to nsB (1 create event should trigger for nsB service and 2 delete events for nsA services)
1348
	updateMeshConfig(
1349
		&meshconfig.MeshConfig{
1350
			DiscoverySelectors: []*metav1.LabelSelector{
1351
				{
1352
					MatchLabels: map[string]string{
1353
						"app": "bar",
1354
					},
1355
				},
1356
			},
1357
		},
1358
		[]*model.Service{svc3},
1359
		3,
1360
		meshWatcher,
1361
		fx,
1362
		controller,
1363
	)
1364

1365
	// expand namespaces to nsA and nsB with selectors (2 create events should trigger for nsA services)
1366
	updateMeshConfig(
1367
		&meshconfig.MeshConfig{
1368
			DiscoverySelectors: []*metav1.LabelSelector{
1369
				{
1370
					MatchExpressions: []metav1.LabelSelectorRequirement{
1371
						{
1372
							Key:      "app",
1373
							Operator: metav1.LabelSelectorOpIn,
1374
							Values:   []string{"foo", "bar"},
1375
						},
1376
					},
1377
				},
1378
			},
1379
		},
1380
		[]*model.Service{svc1, svc2, svc3},
1381
		2,
1382
		meshWatcher,
1383
		fx,
1384
		controller,
1385
	)
1386

1387
	// permit all discovery namespaces by omitting discovery selectors (1 create event should trigger for the nsC service)
1388
	updateMeshConfig(
1389
		&meshconfig.MeshConfig{
1390
			DiscoverySelectors: []*metav1.LabelSelector{},
1391
		},
1392
		[]*model.Service{svc1, svc2, svc3, svc4},
1393
		1,
1394
		meshWatcher,
1395
		fx,
1396
		controller,
1397
	)
1398
}
1399

1400
func TestControllerResourceScoping(t *testing.T) {
1401
	svc1 := &model.Service{
1402
		Hostname:       kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1403
		DefaultAddress: "10.0.0.1",
1404
		Ports: model.PortList{
1405
			&model.Port{
1406
				Name:     "tcp-port",
1407
				Port:     8080,
1408
				Protocol: protocol.TCP,
1409
			},
1410
		},
1411
	}
1412
	svc2 := &model.Service{
1413
		Hostname:       kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1414
		DefaultAddress: "10.0.0.1",
1415
		Ports: model.PortList{
1416
			&model.Port{
1417
				Name:     "tcp-port",
1418
				Port:     8081,
1419
				Protocol: protocol.TCP,
1420
			},
1421
		},
1422
	}
1423
	svc3 := &model.Service{
1424
		Hostname:       kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1425
		DefaultAddress: "10.0.0.1",
1426
		Ports: model.PortList{
1427
			&model.Port{
1428
				Name:     "tcp-port",
1429
				Port:     8082,
1430
				Protocol: protocol.TCP,
1431
			},
1432
		},
1433
	}
1434
	svc4 := &model.Service{
1435
		Hostname:       kube.ServiceHostname("svc4", "nsC", defaultFakeDomainSuffix),
1436
		DefaultAddress: "10.0.0.1",
1437
		Ports: model.PortList{
1438
			&model.Port{
1439
				Name:     "tcp-port",
1440
				Port:     8083,
1441
				Protocol: protocol.TCP,
1442
			},
1443
		},
1444
	}
1445

1446
	updateMeshConfig := func(
1447
		meshConfig *meshconfig.MeshConfig,
1448
		expectedSvcList []*model.Service,
1449
		expectedNumSvcEvents int,
1450
		testMeshWatcher *mesh.TestWatcher,
1451
		fx *xdsfake.Updater,
1452
		controller *FakeController,
1453
	) {
1454
		t.Helper()
1455
		// update meshConfig
1456
		if err := testMeshWatcher.Update(meshConfig, time.Second*5); err != nil {
1457
			t.Fatalf("%v", err)
1458
		}
1459

1460
		// assert firing of service events
1461
		for i := 0; i < expectedNumSvcEvents; i++ {
1462
			fx.WaitOrFail(t, "service")
1463
		}
1464

1465
		eventually(t, func() bool {
1466
			svcList := controller.Services()
1467
			return servicesEqual(svcList, expectedSvcList)
1468
		})
1469
	}
1470

1471
	client := kubelib.NewFakeClient()
1472
	t.Cleanup(client.Shutdown)
1473
	meshWatcher := mesh.NewTestWatcher(&meshconfig.MeshConfig{})
1474

1475
	nsA := "nsA"
1476
	nsB := "nsB"
1477
	nsC := "nsC"
1478

1479
	createNamespace(t, client.Kube(), nsA, map[string]string{"app": "foo"})
1480
	createNamespace(t, client.Kube(), nsB, map[string]string{"app": "bar"})
1481
	createNamespace(t, client.Kube(), nsC, map[string]string{"app": "baz"})
1482

1483
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1484
		Client:      client,
1485
		MeshWatcher: meshWatcher,
1486
	})
1487

1488
	// service event handlers should trigger for all svcs
1489
	createServiceWait(controller, "svc1", nsA,
1490
		map[string]string{},
1491
		map[string]string{},
1492
		[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1493

1494
	createServiceWait(controller, "svc2", nsA,
1495
		map[string]string{},
1496
		map[string]string{},
1497
		[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1498

1499
	createServiceWait(controller, "svc3", nsB,
1500
		map[string]string{},
1501
		map[string]string{},
1502
		[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1503

1504
	createServiceWait(controller, "svc4", nsC,
1505
		map[string]string{},
1506
		map[string]string{},
1507
		[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1508

1509
	expectedSvcList := []*model.Service{svc1, svc2, svc3, svc4}
1510
	eventually(t, func() bool {
1511
		svcList := controller.Services()
1512
		return servicesEqual(svcList, expectedSvcList)
1513
	})
1514

1515
	fx.Clear()
1516

1517
	// restrict namespaces to nsA (expect 2 delete events for svc3 and svc4)
1518
	updateMeshConfig(
1519
		&meshconfig.MeshConfig{
1520
			DiscoverySelectors: []*metav1.LabelSelector{
1521
				{
1522
					MatchLabels: map[string]string{
1523
						"app": "foo",
1524
					},
1525
				},
1526
			},
1527
		},
1528
		[]*model.Service{svc1, svc2},
1529
		2,
1530
		meshWatcher,
1531
		fx,
1532
		controller,
1533
	)
1534

1535
	// namespace nsB, nsC deselected
1536
	fx.AssertEmpty(t, 0)
1537

1538
	// create vs1 in nsA
1539
	createVirtualService(controller, "vs1", nsA, map[string]string{}, t)
1540

1541
	// create vs1 in nsB
1542
	createVirtualService(controller, "vs2", nsB, map[string]string{}, t)
1543

1544
	// expand namespaces to nsA and nsB with selectors (expect events svc3 and a full push event for nsB selected)
1545
	updateMeshConfig(
1546
		&meshconfig.MeshConfig{
1547
			DiscoverySelectors: []*metav1.LabelSelector{
1548
				{
1549
					MatchExpressions: []metav1.LabelSelectorRequirement{
1550
						{
1551
							Key:      "app",
1552
							Operator: metav1.LabelSelectorOpIn,
1553
							Values:   []string{"foo", "bar"},
1554
						},
1555
					},
1556
				},
1557
			},
1558
		},
1559
		[]*model.Service{svc1, svc2, svc3},
1560
		1,
1561
		meshWatcher,
1562
		fx,
1563
		controller,
1564
	)
1565

1566
	// namespace nsB selected
1567
	fx.AssertEmpty(t, 0)
1568
}
1569

1570
func TestEndpoints_WorkloadInstances(t *testing.T) {
1571
	ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1572

1573
	createServiceWithTargetPorts(ctl, "ratings", "bookinfo-ratings",
1574
		map[string]string{
1575
			annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
1576
			annotation.AlphaCanonicalServiceAccounts.Name:  "ratings@gserviceaccount2.com",
1577
		},
1578
		[]corev1.ServicePort{
1579
			{
1580
				Name:       "http-port",
1581
				Port:       8080,
1582
				Protocol:   "TCP",
1583
				TargetPort: intstr.IntOrString{Type: intstr.String, StrVal: "http"},
1584
			},
1585
		},
1586
		map[string]string{"app": "ratings"}, t)
1587

1588
	wiRatings1 := &model.WorkloadInstance{
1589
		Name:      "ratings-1",
1590
		Namespace: "bookinfo-ratings",
1591
		Endpoint: &model.IstioEndpoint{
1592
			Labels:       labels.Instance{"app": "ratings"},
1593
			Address:      "2.2.2.2",
1594
			EndpointPort: 8081, // should be ignored since it doesn't define PortMap
1595
		},
1596
	}
1597

1598
	wiRatings2 := &model.WorkloadInstance{
1599
		Name:      "ratings-2",
1600
		Namespace: "bookinfo-ratings",
1601
		Endpoint: &model.IstioEndpoint{
1602
			Labels:  labels.Instance{"app": "ratings"},
1603
			Address: "2.2.2.2",
1604
		},
1605
		PortMap: map[string]uint32{
1606
			"http": 8082, // should be used
1607
		},
1608
	}
1609

1610
	wiRatings3 := &model.WorkloadInstance{
1611
		Name:      "ratings-3",
1612
		Namespace: "bookinfo-ratings",
1613
		Endpoint: &model.IstioEndpoint{
1614
			Labels:  labels.Instance{"app": "ratings"},
1615
			Address: "2.2.2.2",
1616
		},
1617
		PortMap: map[string]uint32{
1618
			"http": 8083, // should be used
1619
		},
1620
	}
1621

1622
	for _, wi := range []*model.WorkloadInstance{wiRatings1, wiRatings2, wiRatings3} {
1623
		ctl.workloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
1624
	}
1625

1626
	// get service object
1627
	svcs := ctl.Services()
1628
	if len(svcs) != 1 {
1629
		t.Fatalf("failed to get services (%v)", svcs)
1630
	}
1631

1632
	endpoints := GetEndpoints(svcs[0], ctl.Endpoints)
1633

1634
	want := []string{"2.2.2.2:8082", "2.2.2.2:8083"} // expect both WorkloadEntries even though they have the same IP
1635

1636
	got := make([]string, 0, len(endpoints))
1637
	for _, instance := range endpoints {
1638
		got = append(got, net.JoinHostPort(instance.Address, strconv.Itoa(int(instance.EndpointPort))))
1639
	}
1640
	sort.Strings(got)
1641

1642
	assert.Equal(t, want, got)
1643
}
1644

1645
func TestExternalNameServiceInstances(t *testing.T) {
1646
	t.Run("alias", func(t *testing.T) {
1647
		test.SetForTest(t, &features.EnableExternalNameAlias, true)
1648
		controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1649
		createExternalNameService(controller, "svc5", "nsA",
1650
			[]int32{1, 2, 3}, "foo.co", t, fx)
1651

1652
		converted := controller.Services()
1653
		assert.Equal(t, len(converted), 1)
1654

1655
		eps := GetEndpointsForPort(converted[0], controller.Endpoints, 1)
1656
		assert.Equal(t, len(eps), 0)
1657
		assert.Equal(t, converted[0].Attributes, model.ServiceAttributes{
1658
			ServiceRegistry:          "Kubernetes",
1659
			Name:                     "svc5",
1660
			Namespace:                "nsA",
1661
			Labels:                   nil,
1662
			ExportTo:                 nil,
1663
			LabelSelectors:           nil,
1664
			Aliases:                  nil,
1665
			ClusterExternalAddresses: nil,
1666
			ClusterExternalPorts:     nil,
1667
			K8sAttributes: model.K8sAttributes{
1668
				Type:         string(corev1.ServiceTypeExternalName),
1669
				ExternalName: "foo.co",
1670
			},
1671
		})
1672
	})
1673
	t.Run("no alias", func(t *testing.T) {
1674
		test.SetForTest(t, &features.EnableExternalNameAlias, false)
1675
		controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1676
		createExternalNameService(controller, "svc5", "nsA",
1677
			[]int32{1, 2, 3}, "foo.co", t, fx)
1678

1679
		converted := controller.Services()
1680
		assert.Equal(t, len(converted), 1)
1681
		eps := GetEndpointsForPort(converted[0], controller.Endpoints, 1)
1682
		assert.Equal(t, len(eps), 1)
1683
		assert.Equal(t, eps[0], &model.IstioEndpoint{
1684
			Address:               "foo.co",
1685
			ServicePortName:       "tcp-port-1",
1686
			EndpointPort:          1,
1687
			DiscoverabilityPolicy: model.AlwaysDiscoverable,
1688
		})
1689
	})
1690
}
1691

1692
func TestController_ExternalNameService(t *testing.T) {
1693
	test.SetForTest(t, &features.EnableExternalNameAlias, false)
1694
	deleteWg := sync.WaitGroup{}
1695
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1696
		ServiceHandler: func(_, _ *model.Service, e model.Event) {
1697
			if e == model.EventDelete {
1698
				deleteWg.Done()
1699
			}
1700
		},
1701
	})
1702

1703
	k8sSvcs := []*corev1.Service{
1704
		createExternalNameService(controller, "svc1", "nsA",
1705
			[]int32{8080}, "test-app-1.test.svc."+defaultFakeDomainSuffix, t, fx),
1706
		createExternalNameService(controller, "svc2", "nsA",
1707
			[]int32{8081}, "test-app-2.test.svc."+defaultFakeDomainSuffix, t, fx),
1708
		createExternalNameService(controller, "svc3", "nsA",
1709
			[]int32{8082}, "test-app-3.test.pod."+defaultFakeDomainSuffix, t, fx),
1710
		createExternalNameService(controller, "svc4", "nsA",
1711
			[]int32{8083}, "g.co", t, fx),
1712
	}
1713

1714
	expectedSvcList := []*model.Service{
1715
		{
1716
			Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1717
			Ports: model.PortList{
1718
				&model.Port{
1719
					Name:     "tcp-port-8080",
1720
					Port:     8080,
1721
					Protocol: protocol.TCP,
1722
				},
1723
			},
1724
			MeshExternal: true,
1725
			Resolution:   model.DNSLB,
1726
		},
1727
		{
1728
			Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1729
			Ports: model.PortList{
1730
				&model.Port{
1731
					Name:     "tcp-port-8081",
1732
					Port:     8081,
1733
					Protocol: protocol.TCP,
1734
				},
1735
			},
1736
			MeshExternal: true,
1737
			Resolution:   model.DNSLB,
1738
		},
1739
		{
1740
			Hostname: kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
1741
			Ports: model.PortList{
1742
				&model.Port{
1743
					Name:     "tcp-port-8082",
1744
					Port:     8082,
1745
					Protocol: protocol.TCP,
1746
				},
1747
			},
1748
			MeshExternal: true,
1749
			Resolution:   model.DNSLB,
1750
		},
1751
		{
1752
			Hostname: kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
1753
			Ports: model.PortList{
1754
				&model.Port{
1755
					Name:     "tcp-port-8083",
1756
					Port:     8083,
1757
					Protocol: protocol.TCP,
1758
				},
1759
			},
1760
			MeshExternal: true,
1761
			Resolution:   model.DNSLB,
1762
		},
1763
	}
1764

1765
	svcList := controller.Services()
1766
	if len(svcList) != len(expectedSvcList) {
1767
		t.Fatalf("Expecting %d service but got %d\r\n", len(expectedSvcList), len(svcList))
1768
	}
1769
	for i, exp := range expectedSvcList {
1770
		if exp.Hostname != svcList[i].Hostname {
1771
			t.Fatalf("got hostname of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Hostname, exp.Hostname)
1772
		}
1773
		if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
1774
			t.Fatalf("got ports of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Ports, exp.Ports)
1775
		}
1776
		if svcList[i].MeshExternal != exp.MeshExternal {
1777
			t.Fatalf("i=%v, MeshExternal==%v, should be %v: externalName='%s'", i+1, exp.MeshExternal, svcList[i].MeshExternal, k8sSvcs[i].Spec.ExternalName)
1778
		}
1779
		if svcList[i].Resolution != exp.Resolution {
1780
			t.Fatalf("i=%v, Resolution=='%v', should be '%v'", i+1, svcList[i].Resolution, exp.Resolution)
1781
		}
1782
		endpoints := GetEndpoints(svcList[i], controller.Endpoints)
1783
		assert.Equal(t, len(endpoints), 1)
1784
		assert.Equal(t, endpoints[0].Address, k8sSvcs[i].Spec.ExternalName)
1785
	}
1786

1787
	deleteWg.Add(len(k8sSvcs))
1788
	for _, s := range k8sSvcs {
1789
		deleteExternalNameService(controller, s.Name, s.Namespace, t, fx)
1790
	}
1791
	deleteWg.Wait()
1792

1793
	svcList = controller.Services()
1794
	if len(svcList) != 0 {
1795
		t.Fatalf("Should have 0 services at this point")
1796
	}
1797
	for _, exp := range expectedSvcList {
1798
		endpoints := GetEndpoints(exp, controller.Endpoints)
1799
		assert.Equal(t, len(endpoints), 0)
1800
	}
1801
}
1802

1803
func createEndpoints(t *testing.T, controller *FakeController, name, namespace string,
1804
	portNames, ips []string, refs []*corev1.ObjectReference, labels map[string]string,
1805
) {
1806
	if labels == nil {
1807
		labels = make(map[string]string)
1808
	}
1809
	// Add the reference to the service. Used by EndpointSlice logic only.
1810
	labels[discovery.LabelServiceName] = name
1811

1812
	if refs == nil {
1813
		refs = make([]*corev1.ObjectReference, len(ips))
1814
	}
1815
	var portNum int32 = 1001
1816
	eas := make([]corev1.EndpointAddress, 0)
1817
	for i, ip := range ips {
1818
		eas = append(eas, corev1.EndpointAddress{IP: ip, TargetRef: refs[i]})
1819
	}
1820

1821
	eps := make([]corev1.EndpointPort, 0)
1822
	for _, name := range portNames {
1823
		eps = append(eps, corev1.EndpointPort{Name: name, Port: portNum})
1824
	}
1825

1826
	endpoint := &corev1.Endpoints{
1827
		ObjectMeta: metav1.ObjectMeta{
1828
			Name:      name,
1829
			Namespace: namespace,
1830
			Labels:    labels,
1831
		},
1832
		Subsets: []corev1.EndpointSubset{{
1833
			Addresses: eas,
1834
			Ports:     eps,
1835
		}},
1836
	}
1837
	clienttest.NewWriter[*corev1.Endpoints](t, controller.client).CreateOrUpdate(endpoint)
1838

1839
	// Create endpoint slice as well
1840
	esps := make([]discovery.EndpointPort, 0)
1841
	for _, name := range portNames {
1842
		n := name // Create a stable reference to take the pointer from
1843
		esps = append(esps, discovery.EndpointPort{Name: &n, Port: &portNum})
1844
	}
1845

1846
	sliceEndpoint := make([]discovery.Endpoint, 0, len(ips))
1847
	for i, ip := range ips {
1848
		sliceEndpoint = append(sliceEndpoint, discovery.Endpoint{
1849
			Addresses: []string{ip},
1850
			TargetRef: refs[i],
1851
		})
1852
	}
1853
	endpointSlice := &discovery.EndpointSlice{
1854
		ObjectMeta: metav1.ObjectMeta{
1855
			Name:      name,
1856
			Namespace: namespace,
1857
			Labels:    labels,
1858
		},
1859
		Endpoints: sliceEndpoint,
1860
		Ports:     esps,
1861
	}
1862
	clienttest.NewWriter[*discovery.EndpointSlice](t, controller.client).CreateOrUpdate(endpointSlice)
1863
}
1864

1865
func updateEndpoints(controller *FakeController, name, namespace string, portNames, ips []string, t *testing.T) {
1866
	var portNum int32 = 1001
1867
	eas := make([]corev1.EndpointAddress, 0)
1868
	for _, ip := range ips {
1869
		eas = append(eas, corev1.EndpointAddress{IP: ip})
1870
	}
1871

1872
	eps := make([]corev1.EndpointPort, 0)
1873
	for _, name := range portNames {
1874
		eps = append(eps, corev1.EndpointPort{Name: name, Port: portNum})
1875
	}
1876

1877
	endpoint := &corev1.Endpoints{
1878
		ObjectMeta: metav1.ObjectMeta{
1879
			Name:      name,
1880
			Namespace: namespace,
1881
		},
1882
		Subsets: []corev1.EndpointSubset{{
1883
			Addresses: eas,
1884
			Ports:     eps,
1885
		}},
1886
	}
1887
	if _, err := controller.client.Kube().CoreV1().Endpoints(namespace).Update(context.TODO(), endpoint, metav1.UpdateOptions{}); err != nil {
1888
		t.Fatalf("failed to update endpoints %s in namespace %s (error %v)", name, namespace, err)
1889
	}
1890

1891
	// Update endpoint slice as well
1892
	esps := make([]discovery.EndpointPort, 0)
1893
	for i := range portNames {
1894
		esps = append(esps, discovery.EndpointPort{Name: &portNames[i], Port: &portNum})
1895
	}
1896
	endpointSlice := &discovery.EndpointSlice{
1897
		ObjectMeta: metav1.ObjectMeta{
1898
			Name:      name,
1899
			Namespace: namespace,
1900
			Labels: map[string]string{
1901
				discovery.LabelServiceName: name,
1902
			},
1903
		},
1904
		Endpoints: []discovery.Endpoint{
1905
			{
1906
				Addresses: ips,
1907
			},
1908
		},
1909
		Ports: esps,
1910
	}
1911
	if _, err := controller.client.Kube().DiscoveryV1().EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}); err != nil {
1912
		t.Errorf("failed to create endpoint slice %s in namespace %s (error %v)", name, namespace, err)
1913
	}
1914
}
1915

1916
func createServiceWithTargetPorts(controller *FakeController, name, namespace string, annotations map[string]string,
1917
	svcPorts []corev1.ServicePort, selector map[string]string, t *testing.T,
1918
) {
1919
	service := &corev1.Service{
1920
		ObjectMeta: metav1.ObjectMeta{
1921
			Name:        name,
1922
			Namespace:   namespace,
1923
			Annotations: annotations,
1924
		},
1925
		Spec: corev1.ServiceSpec{
1926
			ClusterIP: "10.0.0.1", // FIXME: generate?
1927
			Ports:     svcPorts,
1928
			Selector:  selector,
1929
			Type:      corev1.ServiceTypeClusterIP,
1930
		},
1931
	}
1932

1933
	clienttest.Wrap(t, controller.services).Create(service)
1934
	controller.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
1935
}
1936

1937
func createServiceWait(controller *FakeController, name, namespace string, labels, annotations map[string]string,
1938
	ports []int32, selector map[string]string, t *testing.T,
1939
) {
1940
	t.Helper()
1941
	createService(controller, name, namespace, labels, annotations, ports, selector, t)
1942
	controller.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
1943
}
1944

1945
func createService(controller *FakeController, name, namespace string, labels, annotations map[string]string,
1946
	ports []int32, selector map[string]string, t *testing.T,
1947
) {
1948
	service := generateService(name, namespace, labels, annotations, ports, selector, "10.0.0.1")
1949
	clienttest.Wrap(t, controller.services).CreateOrUpdate(service)
1950
}
1951

1952
func generateService(name, namespace string, labels, annotations map[string]string,
1953
	ports []int32, selector map[string]string, ip string,
1954
) *corev1.Service {
1955
	svcPorts := make([]corev1.ServicePort, 0)
1956
	for _, p := range ports {
1957
		svcPorts = append(svcPorts, corev1.ServicePort{
1958
			Name:     "tcp-port",
1959
			Port:     p,
1960
			Protocol: "http",
1961
		})
1962
	}
1963

1964
	return &corev1.Service{
1965
		ObjectMeta: metav1.ObjectMeta{
1966
			Name:        name,
1967
			Namespace:   namespace,
1968
			Annotations: annotations,
1969
			Labels:      labels,
1970
		},
1971
		Spec: corev1.ServiceSpec{
1972
			ClusterIP: ip,
1973
			Ports:     svcPorts,
1974
			Selector:  selector,
1975
			Type:      corev1.ServiceTypeClusterIP,
1976
		},
1977
	}
1978
}
1979

1980
func createVirtualService(controller *FakeController, name, namespace string,
1981
	annotations map[string]string,
1982
	t *testing.T,
1983
) {
1984
	vs := &v1alpha3.VirtualService{
1985
		ObjectMeta: metav1.ObjectMeta{
1986
			Name:        name,
1987
			Namespace:   namespace,
1988
			Annotations: annotations,
1989
		},
1990
	}
1991

1992
	clienttest.NewWriter[*v1alpha3.VirtualService](t, controller.client).Create(vs)
1993
}
1994

1995
func getService(controller *FakeController, name, namespace string, t *testing.T) *corev1.Service {
1996
	svc, err := controller.client.Kube().CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
1997
	if err != nil {
1998
		t.Fatalf("Cannot get service %s in namespace %s (error: %v)", name, namespace, err)
1999
	}
2000
	return svc
2001
}
2002

2003
func updateService(controller *FakeController, svc *corev1.Service, t *testing.T) *corev1.Service {
2004
	svcUpdated, err := controller.client.Kube().CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
2005
	if err != nil {
2006
		t.Fatalf("Cannot update service %s in namespace %s (error: %v)", svc.Name, svc.Namespace, err)
2007
	}
2008
	return svcUpdated
2009
}
2010

2011
func createServiceWithoutClusterIP(controller *FakeController, name, namespace string, annotations map[string]string,
2012
	ports []int32, selector map[string]string, t *testing.T,
2013
) {
2014
	svcPorts := make([]corev1.ServicePort, 0)
2015
	for _, p := range ports {
2016
		svcPorts = append(svcPorts, corev1.ServicePort{
2017
			Name:     "tcp-port",
2018
			Port:     p,
2019
			Protocol: "http",
2020
		})
2021
	}
2022
	service := &corev1.Service{
2023
		ObjectMeta: metav1.ObjectMeta{
2024
			Name:        name,
2025
			Namespace:   namespace,
2026
			Annotations: annotations,
2027
		},
2028
		Spec: corev1.ServiceSpec{
2029
			ClusterIP: corev1.ClusterIPNone,
2030
			Ports:     svcPorts,
2031
			Selector:  selector,
2032
			Type:      corev1.ServiceTypeClusterIP,
2033
		},
2034
	}
2035

2036
	clienttest.Wrap(t, controller.services).Create(service)
2037
}
2038

2039
// nolint: unparam
2040
func createExternalNameService(controller *FakeController, name, namespace string,
2041
	ports []int32, externalName string, t *testing.T, xdsEvents *xdsfake.Updater,
2042
) *corev1.Service {
2043
	svcPorts := make([]corev1.ServicePort, 0)
2044
	for _, p := range ports {
2045
		svcPorts = append(svcPorts, corev1.ServicePort{
2046
			Name:     fmt.Sprintf("tcp-port-%d", p),
2047
			Port:     p,
2048
			Protocol: "http",
2049
		})
2050
	}
2051
	service := &corev1.Service{
2052
		ObjectMeta: metav1.ObjectMeta{
2053
			Name:      name,
2054
			Namespace: namespace,
2055
		},
2056
		Spec: corev1.ServiceSpec{
2057
			Ports:        svcPorts,
2058
			Type:         corev1.ServiceTypeExternalName,
2059
			ExternalName: externalName,
2060
		},
2061
	}
2062

2063
	clienttest.Wrap(t, controller.services).Create(service)
2064
	if features.EnableExternalNameAlias {
2065
		xdsEvents.MatchOrFail(t, xdsfake.Event{Type: "service"})
2066
	} else {
2067
		xdsEvents.MatchOrFail(t, xdsfake.Event{Type: "service"}, xdsfake.Event{Type: "eds cache"})
2068
	}
2069
	return service
2070
}
2071

2072
func deleteExternalNameService(controller *FakeController, name, namespace string, t *testing.T, xdsEvents *xdsfake.Updater) {
2073
	clienttest.Wrap(t, controller.services).Delete(name, namespace)
2074
	xdsEvents.WaitOrFail(t, "service")
2075
}
2076

2077
func servicesEqual(svcList, expectedSvcList []*model.Service) bool {
2078
	if len(svcList) != len(expectedSvcList) {
2079
		return false
2080
	}
2081
	for i, exp := range expectedSvcList {
2082
		if exp.Hostname != svcList[i].Hostname {
2083
			return false
2084
		}
2085
		if exp.DefaultAddress != svcList[i].DefaultAddress {
2086
			return false
2087
		}
2088
		if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
2089
			return false
2090
		}
2091
	}
2092
	return true
2093
}
2094

2095
func addPods(t *testing.T, controller *FakeController, fx *xdsfake.Updater, pods ...*corev1.Pod) {
2096
	pc := clienttest.Wrap(t, controller.podsClient)
2097
	for _, pod := range pods {
2098
		newPod := pc.CreateOrUpdate(pod)
2099
		setPodReady(newPod)
2100
		// Apiserver doesn't allow Create/Update to modify the pod status. Creating doesn't result in
2101
		// events - since PodIP will be "".
2102
		newPod.Status.PodIP = pod.Status.PodIP
2103
		newPod.Status.Phase = corev1.PodRunning
2104
		pc.UpdateStatus(newPod)
2105
		waitForPod(t, controller, pod.Status.PodIP)
2106
		// pod first time occur will trigger proxy push
2107
		fx.WaitOrFail(t, "proxy")
2108
	}
2109
}
2110

2111
func setPodReady(pod *corev1.Pod) {
2112
	pod.Status.Conditions = []corev1.PodCondition{
2113
		{
2114
			Type:               corev1.PodReady,
2115
			Status:             corev1.ConditionTrue,
2116
			LastTransitionTime: metav1.Now(),
2117
		},
2118
	}
2119
}
2120

2121
func generatePod(ip, name, namespace, saName, node string, labels map[string]string, annotations map[string]string) *corev1.Pod {
2122
	automount := false
2123
	return &corev1.Pod{
2124
		ObjectMeta: metav1.ObjectMeta{
2125
			Name:        name,
2126
			Labels:      labels,
2127
			Annotations: annotations,
2128
			Namespace:   namespace,
2129
		},
2130
		Spec: corev1.PodSpec{
2131
			ServiceAccountName:           saName,
2132
			NodeName:                     node,
2133
			AutomountServiceAccountToken: &automount,
2134
			// Validation requires this
2135
			Containers: []corev1.Container{
2136
				{
2137
					Name:  "test",
2138
					Image: "ununtu",
2139
				},
2140
			},
2141
		},
2142
		// The cache controller uses this as key, required by our impl.
2143
		Status: corev1.PodStatus{
2144
			Conditions: []corev1.PodCondition{
2145
				{
2146
					Type:               corev1.PodReady,
2147
					Status:             corev1.ConditionTrue,
2148
					LastTransitionTime: metav1.Now(),
2149
				},
2150
			},
2151
			PodIP:  ip,
2152
			HostIP: ip,
2153
			PodIPs: []corev1.PodIP{
2154
				{
2155
					IP: ip,
2156
				},
2157
			},
2158
			Phase: corev1.PodRunning,
2159
		},
2160
	}
2161
}
2162

2163
func generateNode(name string, labels map[string]string) *corev1.Node {
2164
	return &corev1.Node{
2165
		TypeMeta: metav1.TypeMeta{
2166
			Kind:       "Node",
2167
			APIVersion: "v1",
2168
		},
2169
		ObjectMeta: metav1.ObjectMeta{
2170
			Name:   name,
2171
			Labels: labels,
2172
		},
2173
	}
2174
}
2175

2176
func addNodes(t *testing.T, controller *FakeController, nodes ...*corev1.Node) {
2177
	for _, node := range nodes {
2178
		clienttest.Wrap(t, controller.nodes).CreateOrUpdate(node)
2179
		waitForNode(t, controller, node.Name)
2180
	}
2181
}
2182

2183
func TestEndpointUpdate(t *testing.T) {
2184
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2185

2186
	pod1 := generatePod("128.0.0.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2187
	pods := []*corev1.Pod{pod1}
2188
	addPods(t, controller, fx, pods...)
2189

2190
	// 1. incremental eds for normal service endpoint update
2191
	createServiceWait(controller, "svc1", "nsa", nil, nil,
2192
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2193

2194
	// Endpoints are generated by Kubernetes from pod labels and service selectors.
2195
	// Here we manually create them for mocking purpose.
2196
	svc1Ips := []string{"128.0.0.1"}
2197
	portNames := []string{"tcp-port"}
2198
	// Create 1 endpoint that refers to a pod in the same namespace.
2199
	createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
2200
	fx.WaitOrFail(t, "eds")
2201

2202
	// delete normal service
2203
	clienttest.Wrap(t, controller.services).Delete("svc1", "nsa")
2204
	fx.WaitOrFail(t, "service")
2205

2206
	// 2. full xds push request for headless service endpoint update
2207

2208
	// create a headless service
2209
	createServiceWithoutClusterIP(controller, "svc1", "nsa", nil,
2210
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2211
	fx.WaitOrFail(t, "service")
2212

2213
	// Create 1 endpoint that refers to a pod in the same namespace.
2214
	svc1Ips = append(svc1Ips, "128.0.0.2")
2215
	updateEndpoints(controller, "svc1", "nsa", portNames, svc1Ips, t)
2216
	host := string(kube.ServiceHostname("svc1", "nsa", controller.opts.DomainSuffix))
2217
	fx.MatchOrFail(t, xdsfake.Event{Type: "xds full", ID: host})
2218
}
2219

2220
// Validates that when Pilot sees Endpoint before the corresponding Pod, it triggers endpoint event on pod event.
2221
func TestEndpointUpdateBeforePodUpdate(t *testing.T) {
2222
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2223

2224
	addNodes(t, controller, generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}))
2225
	// Setup help functions to make the test more explicit
2226
	addPod := func(name, ip string) {
2227
		pod := generatePod(ip, name, "nsA", name, "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2228
		addPods(t, controller, fx, pod)
2229
	}
2230
	deletePod := func(name, ip string) {
2231
		if err := controller.client.Kube().CoreV1().Pods("nsA").Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
2232
			t.Fatal(err)
2233
		}
2234
		retry.UntilSuccessOrFail(t, func() error {
2235
			controller.pods.RLock()
2236
			defer controller.pods.RUnlock()
2237
			if _, ok := controller.pods.podsByIP[ip]; ok {
2238
				return fmt.Errorf("pod still present")
2239
			}
2240
			return nil
2241
		}, retry.Timeout(time.Second))
2242
	}
2243
	addService := func(name string) {
2244
		// create service
2245
		createServiceWait(controller, name, "nsA", nil, nil,
2246
			[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2247
	}
2248
	addEndpoint := func(svcName string, ips []string, pods []string) {
2249
		var refs []*corev1.ObjectReference
2250
		for _, pod := range pods {
2251
			if pod == "" {
2252
				refs = append(refs, nil)
2253
			} else {
2254
				refs = append(refs, &corev1.ObjectReference{
2255
					Kind:      "Pod",
2256
					Namespace: "nsA",
2257
					Name:      pod,
2258
				})
2259
			}
2260
		}
2261
		createEndpoints(t, controller, svcName, "nsA", []string{"tcp-port"}, ips, refs, nil)
2262
	}
2263
	assertEndpointsEvent := func(ips []string, pods []string) {
2264
		t.Helper()
2265
		ev := fx.WaitOrFail(t, "eds")
2266
		var gotIps []string
2267
		for _, e := range ev.Endpoints {
2268
			gotIps = append(gotIps, e.Address)
2269
		}
2270
		var gotSA []string
2271
		var expectedSa []string
2272
		for _, e := range pods {
2273
			if e == "" {
2274
				expectedSa = append(expectedSa, "")
2275
			} else {
2276
				expectedSa = append(expectedSa, "spiffe://cluster.local/ns/nsA/sa/"+e)
2277
			}
2278
		}
2279

2280
		for _, e := range ev.Endpoints {
2281
			gotSA = append(gotSA, e.ServiceAccount)
2282
		}
2283
		if !reflect.DeepEqual(gotIps, ips) {
2284
			t.Fatalf("expected ips %v, got %v", ips, gotIps)
2285
		}
2286
		if !reflect.DeepEqual(gotSA, expectedSa) {
2287
			t.Fatalf("expected SAs %v, got %v", expectedSa, gotSA)
2288
		}
2289
	}
2290
	assertPendingResync := func(expected int) {
2291
		t.Helper()
2292
		retry.UntilSuccessOrFail(t, func() error {
2293
			controller.pods.RLock()
2294
			defer controller.pods.RUnlock()
2295
			if len(controller.pods.needResync) != expected {
2296
				return fmt.Errorf("expected %d pods needing resync, got %d", expected, len(controller.pods.needResync))
2297
			}
2298
			return nil
2299
		}, retry.Timeout(time.Second))
2300
	}
2301

2302
	// standard ordering
2303
	addService("svc")
2304
	addPod("pod1", "172.0.1.1")
2305
	addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
2306
	assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2307
	fx.Clear()
2308

2309
	// Create the endpoint, then later add the pod. Should eventually get an update for the endpoint
2310
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2311
	assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2312
	fx.Clear()
2313
	addPod("pod2", "172.0.1.2")
2314
	assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2315
	fx.Clear()
2316

2317
	// Create the endpoint without a pod reference. We should see it immediately
2318
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
2319
	assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
2320
	fx.Clear()
2321

2322
	// Delete a pod before the endpoint
2323
	addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
2324
	deletePod("pod2", "172.0.1.2")
2325
	assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2326
	fx.Clear()
2327

2328
	// add another service
2329
	addService("other")
2330
	// Add endpoints for the new service, and the old one. Both should be missing the last IP
2331
	addEndpoint("other", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2332
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2333
	assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2334
	assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2335
	fx.Clear()
2336
	// Add the pod, expect the endpoints update for both
2337
	addPod("pod2", "172.0.1.2")
2338
	assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2339
	assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2340

2341
	// Check for memory leaks
2342
	assertPendingResync(0)
2343
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
2344
	// This is really an implementation detail here - but checking to sanity check our test
2345
	assertPendingResync(1)
2346
	// Remove the endpoint again, with no pod events in between. Should have no memory leaks
2347
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2348
	// TODO this case would leak
2349
	// assertPendingResync(0)
2350

2351
	// completely remove the endpoint
2352
	addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
2353
	assertPendingResync(1)
2354
	if err := controller.client.Kube().CoreV1().Endpoints("nsA").Delete(context.TODO(), "svc", metav1.DeleteOptions{}); err != nil {
2355
		t.Fatal(err)
2356
	}
2357
	if err := controller.client.Kube().DiscoveryV1().EndpointSlices("nsA").Delete(context.TODO(), "svc", metav1.DeleteOptions{}); err != nil {
2358
		t.Fatal(err)
2359
	}
2360
	assertPendingResync(0)
2361
}
2362

2363
func TestWorkloadInstanceHandlerMultipleEndpoints(t *testing.T) {
2364
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2365

2366
	// Create an initial pod with a service, and endpoint.
2367
	pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2368
	pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2369
	pods := []*corev1.Pod{pod1, pod2}
2370
	nodes := []*corev1.Node{
2371
		generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
2372
	}
2373
	addNodes(t, controller, nodes...)
2374
	addPods(t, controller, fx, pods...)
2375
	createServiceWait(controller, "svc1", "nsA", nil, nil,
2376
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2377
	pod1Ips := []string{"172.0.1.1"}
2378
	portNames := []string{"tcp-port"}
2379
	createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
2380
	fx.WaitOrFail(t, "eds")
2381

2382
	// Simulate adding a workload entry (fired through invocation of WorkloadInstanceHandler)
2383
	controller.workloadInstanceHandler(&model.WorkloadInstance{
2384
		Namespace: "nsA",
2385
		Endpoint: &model.IstioEndpoint{
2386
			Labels:         labels.Instance{"app": "prod-app"},
2387
			ServiceAccount: "account",
2388
			Address:        "2.2.2.2",
2389
			EndpointPort:   8080,
2390
		},
2391
	}, model.EventAdd)
2392

2393
	expectedEndpointIPs := []string{"172.0.1.1", "2.2.2.2"}
2394
	// Check if an EDS event is fired
2395
	ev := fx.WaitOrFail(t, "eds")
2396
	// check if the hostname matches that of k8s service svc1.nsA
2397
	if ev.ID != "svc1.nsA.svc.company.com" {
2398
		t.Fatalf("eds event for workload entry addition did not match the expected service. got %s, want %s",
2399
			ev.ID, "svc1.nsA.svc.company.com")
2400
	}
2401
	// we should have the pod IP and the workload Entry's IP in the endpoints..
2402
	// the first endpoint should be that of the k8s pod and the second one should be the workload entry
2403

2404
	gotEndpointIPs := make([]string, 0, len(ev.Endpoints))
2405
	for _, ep := range ev.Endpoints {
2406
		gotEndpointIPs = append(gotEndpointIPs, ep.Address)
2407
	}
2408
	if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2409
		t.Fatalf("eds update after adding workload entry did not match expected list. got %v, want %v",
2410
			gotEndpointIPs, expectedEndpointIPs)
2411
	}
2412

2413
	// Check if InstancesByPort returns the same list
2414
	converted := controller.Services()
2415
	if len(converted) != 1 {
2416
		t.Fatalf("failed to get services (%v), converted", converted)
2417
	}
2418
	endpoints := GetEndpoints(converted[0], controller.Endpoints)
2419
	gotEndpointIPs = []string{}
2420
	for _, instance := range endpoints {
2421
		gotEndpointIPs = append(gotEndpointIPs, instance.Address)
2422
	}
2423
	if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2424
		t.Fatalf("InstancesByPort after adding workload entry did not match expected list. got %v, want %v",
2425
			gotEndpointIPs, expectedEndpointIPs)
2426
	}
2427

2428
	// Now add a k8s pod to the service and ensure that eds updates contain both pod IPs and workload entry IPs.
2429
	updateEndpoints(controller, "svc1", "nsA", portNames, []string{"172.0.1.1", "172.0.1.2"}, t)
2430
	ev = fx.WaitOrFail(t, "eds")
2431
	gotEndpointIPs = []string{}
2432
	for _, ep := range ev.Endpoints {
2433
		gotEndpointIPs = append(gotEndpointIPs, ep.Address)
2434
	}
2435
	expectedEndpointIPs = []string{"172.0.1.1", "172.0.1.2", "2.2.2.2"}
2436
	if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2437
		t.Fatalf("eds update after adding pod did not match expected list. got %v, want %v",
2438
			gotEndpointIPs, expectedEndpointIPs)
2439
	}
2440
}
2441

2442
func TestWorkloadInstanceHandler_WorkloadInstanceIndex(t *testing.T) {
2443
	ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2444

2445
	verifyGetByIP := func(address string, want []*model.WorkloadInstance) {
2446
		t.Helper()
2447
		got := ctl.workloadInstancesIndex.GetByIP(address)
2448

2449
		assert.Equal(t, want, got)
2450
	}
2451

2452
	wi1 := &model.WorkloadInstance{
2453
		Name:      "ratings-1",
2454
		Namespace: "bookinfo",
2455
		Endpoint: &model.IstioEndpoint{
2456
			Labels:       labels.Instance{"app": "ratings"},
2457
			Address:      "2.2.2.2",
2458
			EndpointPort: 8080,
2459
		},
2460
	}
2461

2462
	// simulate adding a workload entry
2463
	ctl.workloadInstanceHandler(wi1, model.EventAdd)
2464

2465
	verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2466

2467
	wi2 := &model.WorkloadInstance{
2468
		Name:      "details-1",
2469
		Namespace: "bookinfo",
2470
		Endpoint: &model.IstioEndpoint{
2471
			Labels:       labels.Instance{"app": "details"},
2472
			Address:      "3.3.3.3",
2473
			EndpointPort: 9090,
2474
		},
2475
	}
2476

2477
	// simulate adding a workload entry
2478
	ctl.workloadInstanceHandler(wi2, model.EventAdd)
2479

2480
	verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2481
	verifyGetByIP("3.3.3.3", []*model.WorkloadInstance{wi2})
2482

2483
	wi3 := &model.WorkloadInstance{
2484
		Name:      "details-1",
2485
		Namespace: "bookinfo",
2486
		Endpoint: &model.IstioEndpoint{
2487
			Labels:       labels.Instance{"app": "details"},
2488
			Address:      "2.2.2.2", // update IP
2489
			EndpointPort: 9090,
2490
		},
2491
	}
2492

2493
	// simulate updating a workload entry
2494
	ctl.workloadInstanceHandler(wi3, model.EventUpdate)
2495

2496
	verifyGetByIP("3.3.3.3", nil)
2497
	verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi3, wi1})
2498

2499
	// simulate deleting a workload entry
2500
	ctl.workloadInstanceHandler(wi3, model.EventDelete)
2501

2502
	verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2503

2504
	// simulate deleting a workload entry
2505
	ctl.workloadInstanceHandler(wi1, model.EventDelete)
2506

2507
	verifyGetByIP("2.2.2.2", nil)
2508
}
2509

2510
func TestUpdateEdsCacheOnServiceUpdate(t *testing.T) {
2511
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2512

2513
	// Create an initial pod with a service, and endpoint.
2514
	pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2515
	pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2516
	pods := []*corev1.Pod{pod1, pod2}
2517
	nodes := []*corev1.Node{
2518
		generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
2519
	}
2520
	addNodes(t, controller, nodes...)
2521
	addPods(t, controller, fx, pods...)
2522
	createServiceWait(controller, "svc1", "nsA", nil, nil,
2523
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2524

2525
	pod1Ips := []string{"172.0.1.1"}
2526
	portNames := []string{"tcp-port"}
2527
	createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
2528
	fx.WaitOrFail(t, "eds")
2529

2530
	// update service selector
2531
	svc := getService(controller, "svc1", "nsA", t)
2532
	svc.Spec.Selector = map[string]string{
2533
		"app": "prod-app",
2534
		"foo": "bar",
2535
	}
2536
	// set `K8SServiceSelectWorkloadEntries` to false temporarily
2537
	tmp := features.EnableK8SServiceSelectWorkloadEntries
2538
	features.EnableK8SServiceSelectWorkloadEntries = false
2539
	defer func() {
2540
		features.EnableK8SServiceSelectWorkloadEntries = tmp
2541
	}()
2542
	svc = updateService(controller, svc, t)
2543
	// don't update eds cache if `K8S_SELECT_WORKLOAD_ENTRIES` is disabled
2544
	fx.WaitOrFail(t, "service")
2545
	fx.AssertEmpty(t, 0)
2546

2547
	features.EnableK8SServiceSelectWorkloadEntries = true
2548
	svc.Spec.Selector = map[string]string{
2549
		"app": "prod-app",
2550
	}
2551
	updateService(controller, svc, t)
2552
	// update eds cache if `K8S_SELECT_WORKLOAD_ENTRIES` is enabled
2553
	fx.WaitOrFail(t, "eds cache")
2554
}
2555

2556
func TestDiscoverySelector(t *testing.T) {
2557
	networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{
2558
		Networks: map[string]*meshconfig.Network{
2559
			"network1": {
2560
				Endpoints: []*meshconfig.Network_NetworkEndpoints{
2561
					{
2562
						Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
2563
							FromCidr: "10.10.1.1/24",
2564
						},
2565
					},
2566
				},
2567
			},
2568
			"network2": {
2569
				Endpoints: []*meshconfig.Network_NetworkEndpoints{
2570
					{
2571
						Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
2572
							FromCidr: "10.11.1.1/24",
2573
						},
2574
					},
2575
				},
2576
			},
2577
		},
2578
	})
2579
	ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{NetworksWatcher: networksWatcher})
2580
	t.Parallel()
2581
	ns := "ns-test"
2582

2583
	hostname := kube.ServiceHostname(testService, ns, defaultFakeDomainSuffix)
2584

2585
	var sds model.ServiceDiscovery = ctl
2586
	// "test", ports: http-example on 80
2587
	makeService(testService, ns, ctl, t)
2588

2589
	eventually(t, func() bool {
2590
		out := sds.Services()
2591

2592
		// Original test was checking for 'protocolTCP' - which is incorrect (the
2593
		// port name is 'http'. It was working because the Service was created with
2594
		// an invalid protocol, and the code was ignoring that ( not TCP/UDP).
2595
		for _, item := range out {
2596
			if item.Hostname == hostname &&
2597
				len(item.Ports) == 1 &&
2598
				item.Ports[0].Protocol == protocol.HTTP {
2599
				return true
2600
			}
2601
		}
2602
		return false
2603
	})
2604

2605
	svc := sds.GetService(hostname)
2606
	if svc == nil {
2607
		t.Fatalf("GetService(%q) => should exists", hostname)
2608
	}
2609
	if svc.Hostname != hostname {
2610
		t.Fatalf("GetService(%q) => %q", hostname, svc.Hostname)
2611
	}
2612

2613
	missing := kube.ServiceHostname("does-not-exist", ns, defaultFakeDomainSuffix)
2614
	svc = sds.GetService(missing)
2615
	if svc != nil {
2616
		t.Fatalf("GetService(%q) => %s, should not exist", missing, svc.Hostname)
2617
	}
2618
}
2619

2620
func TestStripNodeUnusedFields(t *testing.T) {
2621
	inputNode := &corev1.Node{
2622
		TypeMeta: metav1.TypeMeta{
2623
			Kind:       "Node",
2624
			APIVersion: "v1",
2625
		},
2626
		ObjectMeta: metav1.ObjectMeta{
2627
			Name: "test",
2628
			Labels: map[string]string{
2629
				NodeZoneLabel:              "zone1",
2630
				NodeRegionLabel:            "region1",
2631
				label.TopologySubzone.Name: "subzone1",
2632
			},
2633
			Annotations: map[string]string{
2634
				"annotation1": "foo",
2635
				"annotation2": "bar",
2636
			},
2637
			ManagedFields: []metav1.ManagedFieldsEntry{
2638
				{
2639
					Manager: "test",
2640
				},
2641
			},
2642
			OwnerReferences: []metav1.OwnerReference{
2643
				{
2644
					Name: "test",
2645
				},
2646
			},
2647
		},
2648
		Status: corev1.NodeStatus{
2649
			Allocatable: map[corev1.ResourceName]resource.Quantity{
2650
				"cpu": {
2651
					Format: "500m",
2652
				},
2653
			},
2654
			Capacity: map[corev1.ResourceName]resource.Quantity{
2655
				"cpu": {
2656
					Format: "500m",
2657
				},
2658
			},
2659
			Images: []corev1.ContainerImage{
2660
				{
2661
					Names: []string{"test"},
2662
				},
2663
			},
2664
			Conditions: []corev1.NodeCondition{
2665
				{
2666
					Type: corev1.NodeMemoryPressure,
2667
				},
2668
			},
2669
		},
2670
	}
2671

2672
	expectNode := &corev1.Node{
2673
		TypeMeta: metav1.TypeMeta{
2674
			Kind:       "Node",
2675
			APIVersion: "v1",
2676
		},
2677
		ObjectMeta: metav1.ObjectMeta{
2678
			Name: "test",
2679
			Labels: map[string]string{
2680
				NodeZoneLabel:              "zone1",
2681
				NodeRegionLabel:            "region1",
2682
				label.TopologySubzone.Name: "subzone1",
2683
			},
2684
		},
2685
	}
2686

2687
	controller, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2688
	addNodes(t, controller, inputNode)
2689

2690
	assert.Equal(t, expectNode, controller.nodes.Get(inputNode.Name, ""))
2691
}
2692

2693
func TestStripPodUnusedFields(t *testing.T) {
2694
	inputPod := &corev1.Pod{
2695
		TypeMeta: metav1.TypeMeta{
2696
			Kind:       "Pod",
2697
			APIVersion: "v1",
2698
		},
2699
		ObjectMeta: metav1.ObjectMeta{
2700
			Name:      "test",
2701
			Namespace: "default",
2702
			Labels: map[string]string{
2703
				"app": "test",
2704
			},
2705
			Annotations: map[string]string{
2706
				"annotation1": "foo",
2707
				"annotation2": "bar",
2708
			},
2709
			ManagedFields: []metav1.ManagedFieldsEntry{
2710
				{
2711
					Manager: "test",
2712
				},
2713
			},
2714
		},
2715
		Spec: corev1.PodSpec{
2716
			InitContainers: []corev1.Container{
2717
				{
2718
					Name: "init-container",
2719
				},
2720
			},
2721
			Containers: []corev1.Container{
2722
				{
2723
					Name: "container-1",
2724
					Ports: []corev1.ContainerPort{
2725
						{
2726
							Name: "http",
2727
						},
2728
					},
2729
				},
2730
				{
2731
					Name: "container-2",
2732
				},
2733
			},
2734
			Volumes: []corev1.Volume{
2735
				{
2736
					Name: "test",
2737
				},
2738
			},
2739
		},
2740
		Status: corev1.PodStatus{
2741
			InitContainerStatuses: []corev1.ContainerStatus{
2742
				{
2743
					Name: "init-container",
2744
				},
2745
			},
2746
			ContainerStatuses: []corev1.ContainerStatus{
2747
				{
2748
					Name: "container-1",
2749
				},
2750
				{
2751
					Name: "container-2",
2752
				},
2753
			},
2754
			PodIP:  "1.1.1.1",
2755
			HostIP: "1.1.1.1",
2756
			Phase:  corev1.PodRunning,
2757
		},
2758
	}
2759

2760
	expectPod := &corev1.Pod{
2761
		TypeMeta: metav1.TypeMeta{
2762
			Kind:       "Pod",
2763
			APIVersion: "v1",
2764
		},
2765
		ObjectMeta: metav1.ObjectMeta{
2766
			Name:      "test",
2767
			Namespace: "default",
2768
			Labels: map[string]string{
2769
				"app": "test",
2770
			},
2771
			Annotations: map[string]string{
2772
				"annotation1": "foo",
2773
				"annotation2": "bar",
2774
			},
2775
		},
2776
		Spec: corev1.PodSpec{
2777
			Containers: []corev1.Container{
2778
				{
2779
					Ports: []corev1.ContainerPort{
2780
						{
2781
							Name: "http",
2782
						},
2783
					},
2784
				},
2785
			},
2786
		},
2787
		Status: corev1.PodStatus{
2788
			PodIP:  "1.1.1.1",
2789
			HostIP: "1.1.1.1",
2790
			Phase:  corev1.PodRunning,
2791
		},
2792
	}
2793

2794
	controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2795
	addPods(t, controller, fx, inputPod)
2796

2797
	output := controller.pods.getPodByKey(config.NamespacedName(expectPod))
2798
	// The final pod status conditions will be determined by the function addPods.
2799
	// So we assign these status conditions to expect pod.
2800
	expectPod.Status.Conditions = output.Status.Conditions
2801
	if !reflect.DeepEqual(expectPod, output) {
2802
		t.Fatalf("Wanted: %v\n. Got: %v", expectPod, output)
2803
	}
2804
}
2805

2806
func TestServiceUpdateNeedsPush(t *testing.T) {
2807
	newService := func(exportTo visibility.Instance, ports []int) *model.Service {
2808
		s := &model.Service{
2809
			Attributes: model.ServiceAttributes{
2810
				ExportTo: sets.New(exportTo),
2811
			},
2812
		}
2813
		for _, port := range ports {
2814
			s.Ports = append(s.Ports, &model.Port{
2815
				Port: port,
2816
			})
2817
		}
2818
		return s
2819
	}
2820

2821
	type testcase struct {
2822
		name     string
2823
		prev     *corev1.Service
2824
		curr     *corev1.Service
2825
		prevConv *model.Service
2826
		currConv *model.Service
2827
		expect   bool
2828
	}
2829

2830
	tests := []testcase{
2831
		{
2832
			name:     "no change",
2833
			prevConv: newService(visibility.Public, []int{80}),
2834
			currConv: newService(visibility.Public, []int{80}),
2835
			expect:   false,
2836
		},
2837
		{
2838
			name:     "new service",
2839
			prevConv: nil,
2840
			currConv: newService(visibility.Public, []int{80}),
2841
			expect:   true,
2842
		},
2843
		{
2844
			name:     "new service with none visibility",
2845
			prevConv: nil,
2846
			currConv: newService(visibility.None, []int{80}),
2847
			expect:   false,
2848
		},
2849
		{
2850
			name:     "public visibility, spec change",
2851
			prevConv: newService(visibility.Public, []int{80}),
2852
			currConv: newService(visibility.Public, []int{80, 443}),
2853
			expect:   true,
2854
		},
2855
		{
2856
			name:     "none visibility, spec change",
2857
			prevConv: newService(visibility.None, []int{80}),
2858
			currConv: newService(visibility.None, []int{80, 443}),
2859
			expect:   false,
2860
		},
2861
	}
2862

2863
	svc := corev1.Service{
2864
		ObjectMeta: metav1.ObjectMeta{
2865
			Name:      "foo",
2866
			Namespace: "bar",
2867
		},
2868
		Spec: corev1.ServiceSpec{
2869
			Ports: []corev1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080)}},
2870
		},
2871
	}
2872
	updatedSvc := corev1.Service{
2873
		ObjectMeta: metav1.ObjectMeta{
2874
			Name:      "foo",
2875
			Namespace: "bar",
2876
		},
2877
		Spec: corev1.ServiceSpec{
2878
			Ports: []corev1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8081)}},
2879
		},
2880
	}
2881
	tests = append(tests,
2882
		testcase{
2883
			name:     "target ports changed",
2884
			prev:     &svc,
2885
			curr:     &updatedSvc,
2886
			prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2887
			currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, ""),
2888
			expect:   true,
2889
		},
2890
		testcase{
2891
			name:     "target ports unchanged",
2892
			prev:     &svc,
2893
			curr:     &svc,
2894
			prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2895
			currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2896
			expect:   false,
2897
		})
2898

2899
	for _, test := range tests {
2900
		actual := serviceUpdateNeedsPush(test.prev, test.curr, test.prevConv, test.currConv)
2901
		if actual != test.expect {
2902
			t.Fatalf("%s: expected %v, got %v", test.name, test.expect, actual)
2903
		}
2904
	}
2905
}
2906

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.