istio
2905 строк · 84.9 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"context"
19"fmt"
20"net"
21"reflect"
22"sort"
23"strconv"
24"sync"
25"testing"
26"time"
27
28core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
29"github.com/google/go-cmp/cmp"
30corev1 "k8s.io/api/core/v1"
31discovery "k8s.io/api/discovery/v1"
32"k8s.io/apimachinery/pkg/api/resource"
33metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34"k8s.io/apimachinery/pkg/util/intstr"
35
36"istio.io/api/annotation"
37"istio.io/api/label"
38meshconfig "istio.io/api/mesh/v1alpha1"
39"istio.io/client-go/pkg/apis/networking/v1alpha3"
40"istio.io/istio/pilot/pkg/features"
41"istio.io/istio/pilot/pkg/model"
42"istio.io/istio/pilot/pkg/serviceregistry/kube"
43"istio.io/istio/pilot/pkg/serviceregistry/provider"
44labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
45"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
46"istio.io/istio/pkg/cluster"
47"istio.io/istio/pkg/config"
48"istio.io/istio/pkg/config/constants"
49"istio.io/istio/pkg/config/labels"
50"istio.io/istio/pkg/config/mesh"
51"istio.io/istio/pkg/config/protocol"
52"istio.io/istio/pkg/config/visibility"
53kubelib "istio.io/istio/pkg/kube"
54"istio.io/istio/pkg/kube/kclient/clienttest"
55"istio.io/istio/pkg/network"
56"istio.io/istio/pkg/test"
57"istio.io/istio/pkg/test/util/assert"
58"istio.io/istio/pkg/test/util/retry"
59"istio.io/istio/pkg/util/sets"
60)
61
62const (
63testService = "test"
64)
65
66// eventually polls cond until it completes (returns true) or times out (resulting in a test failure).
67func eventually(t test.Failer, cond func() bool) {
68t.Helper()
69retry.UntilSuccessOrFail(t, func() error {
70if !cond() {
71return fmt.Errorf("failed to get positive condition")
72}
73return nil
74}, retry.Timeout(time.Second), retry.Delay(time.Millisecond*10))
75}
76
77func TestServices(t *testing.T) {
78networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{
79Networks: map[string]*meshconfig.Network{
80"network1": {
81Endpoints: []*meshconfig.Network_NetworkEndpoints{
82{
83Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
84FromCidr: "10.10.1.1/24",
85},
86},
87},
88},
89"network2": {
90Endpoints: []*meshconfig.Network_NetworkEndpoints{
91{
92Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
93FromCidr: "10.11.1.1/24",
94},
95},
96},
97},
98},
99})
100
101ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{NetworksWatcher: networksWatcher})
102t.Parallel()
103ns := "ns-test"
104
105hostname := kube.ServiceHostname(testService, ns, defaultFakeDomainSuffix)
106
107var sds model.ServiceDiscovery = ctl
108// "test", ports: http-example on 80
109makeService(testService, ns, ctl, t)
110
111eventually(t, func() bool {
112out := sds.Services()
113
114// Original test was checking for 'protocolTCP' - which is incorrect (the
115// port name is 'http'. It was working because the Service was created with
116// an invalid protocol, and the code was ignoring that ( not TCP/UDP).
117for _, item := range out {
118if item.Hostname == hostname &&
119len(item.Ports) == 1 &&
120item.Ports[0].Protocol == protocol.HTTP {
121return true
122}
123}
124return false
125})
126
127// 2 ports 1001, 2 IPs
128createEndpoints(t, ctl, testService, ns, []string{"http-example", "foo"}, []string{"10.10.1.1", "10.11.1.2"}, nil, nil)
129
130svc := sds.GetService(hostname)
131if svc == nil {
132t.Fatalf("GetService(%q) => should exists", hostname)
133}
134if svc.Hostname != hostname {
135t.Fatalf("GetService(%q) => %q", hostname, svc.Hostname)
136}
137assert.EventuallyEqual(t, func() int {
138ep := GetEndpointsForPort(svc, ctl.Endpoints, 80)
139return len(ep)
140}, 2)
141
142ep := GetEndpointsForPort(svc, ctl.Endpoints, 80)
143if len(ep) != 2 {
144t.Fatalf("Invalid response for GetEndpoints %v", ep)
145}
146
147if ep[0].Address == "10.10.1.1" && ep[0].Network != "network1" {
148t.Fatalf("Endpoint with IP 10.10.1.1 is expected to be in network1 but get: %s", ep[0].Network)
149}
150
151if ep[1].Address == "10.11.1.2" && ep[1].Network != "network2" {
152t.Fatalf("Endpoint with IP 10.11.1.2 is expected to be in network2 but get: %s", ep[1].Network)
153}
154
155missing := kube.ServiceHostname("does-not-exist", ns, defaultFakeDomainSuffix)
156svc = sds.GetService(missing)
157if svc != nil {
158t.Fatalf("GetService(%q) => %s, should not exist", missing, svc.Hostname)
159}
160}
161
162func makeService(n, ns string, cl *FakeController, t *testing.T) {
163clienttest.Wrap(t, cl.services).Create(&corev1.Service{
164ObjectMeta: metav1.ObjectMeta{Name: n, Namespace: ns},
165Spec: corev1.ServiceSpec{
166Ports: []corev1.ServicePort{
167{
168Port: 80,
169Name: "http-example",
170Protocol: corev1.ProtocolTCP, // Not added automatically by fake
171},
172},
173},
174})
175log.Infof("Created service %s", n)
176cl.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
177}
178
179func TestController_GetPodLocality(t *testing.T) {
180pod1 := generatePod("128.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
181pod2 := generatePod("128.0.1.2", "pod2", "nsB", "", "node2", map[string]string{"app": "prod-app"}, map[string]string{})
182podOverride := generatePod("128.0.1.2", "pod2", "nsB", "",
183"node1", map[string]string{"app": "prod-app", model.LocalityLabel: "regionOverride.zoneOverride.subzoneOverride"}, map[string]string{})
184testCases := []struct {
185name string
186pods []*corev1.Pod
187nodes []*corev1.Node
188wantAZ map[*corev1.Pod]string
189}{
190{
191name: "should return correct az for given address",
192pods: []*corev1.Pod{pod1, pod2},
193nodes: []*corev1.Node{
194generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
195generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2", label.TopologySubzone.Name: "subzone2"}),
196},
197wantAZ: map[*corev1.Pod]string{
198pod1: "region1/zone1/subzone1",
199pod2: "region2/zone2/subzone2",
200},
201},
202{
203name: "should return correct az for given address",
204pods: []*corev1.Pod{pod1, pod2},
205nodes: []*corev1.Node{
206generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1"}),
207generateNode("node2", map[string]string{NodeZoneLabel: "zone2", NodeRegionLabel: "region2"}),
208},
209wantAZ: map[*corev1.Pod]string{
210pod1: "region1/zone1/",
211pod2: "region2/zone2/",
212},
213},
214{
215name: "should return false if pod isn't in the cache",
216wantAZ: map[*corev1.Pod]string{
217pod1: "",
218pod2: "",
219},
220},
221{
222name: "should return false if node isn't in the cache",
223pods: []*corev1.Pod{pod1, pod2},
224wantAZ: map[*corev1.Pod]string{
225pod1: "",
226pod2: "",
227},
228},
229{
230name: "should return correct az if node has only region label",
231pods: []*corev1.Pod{pod1, pod2},
232nodes: []*corev1.Node{
233generateNode("node1", map[string]string{NodeRegionLabel: "region1"}),
234generateNode("node2", map[string]string{NodeRegionLabel: "region2"}),
235},
236wantAZ: map[*corev1.Pod]string{
237pod1: "region1//",
238pod2: "region2//",
239},
240},
241{
242name: "should return correct az if node has only zone label",
243pods: []*corev1.Pod{pod1, pod2},
244nodes: []*corev1.Node{
245generateNode("node1", map[string]string{NodeZoneLabel: "zone1"}),
246generateNode("node2", map[string]string{NodeZoneLabel: "zone2"}),
247},
248wantAZ: map[*corev1.Pod]string{
249pod1: "/zone1/",
250pod2: "/zone2/",
251},
252},
253{
254name: "should return correct az if node has only subzone label",
255pods: []*corev1.Pod{pod1, pod2},
256nodes: []*corev1.Node{
257generateNode("node1", map[string]string{label.TopologySubzone.Name: "subzone1"}),
258generateNode("node2", map[string]string{label.TopologySubzone.Name: "subzone2"}),
259},
260wantAZ: map[*corev1.Pod]string{
261pod1: "//subzone1",
262pod2: "//subzone2",
263},
264},
265{
266name: "should return correct az for given address",
267pods: []*corev1.Pod{podOverride},
268nodes: []*corev1.Node{
269generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
270},
271wantAZ: map[*corev1.Pod]string{
272podOverride: "regionOverride/zoneOverride/subzoneOverride",
273},
274},
275}
276
277for _, tc := range testCases {
278// If using t.Parallel() you must copy the iteration to a new local variable
279// https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables
280tc := tc
281t.Run(tc.name, func(t *testing.T) {
282t.Parallel()
283// Setup kube caches
284controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
285
286addNodes(t, controller, tc.nodes...)
287addPods(t, controller, fx, tc.pods...)
288
289// Verify expected existing pod AZs
290for pod, wantAZ := range tc.wantAZ {
291az := controller.getPodLocality(pod)
292if wantAZ != "" {
293if !reflect.DeepEqual(az, wantAZ) {
294t.Fatalf("Wanted az: %s, got: %s", wantAZ, az)
295}
296} else {
297if az != "" {
298t.Fatalf("Unexpectedly found az: %s for pod: %s", az, pod.ObjectMeta.Name)
299}
300}
301}
302})
303}
304}
305
306func TestProxyK8sHostnameLabel(t *testing.T) {
307clusterID := cluster.ID("fakeCluster")
308controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
309ClusterID: clusterID,
310})
311
312pod := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
313addPods(t, controller, fx, pod)
314
315proxy := &model.Proxy{
316Type: model.Router,
317IPAddresses: []string{"128.0.0.1"},
318ID: "pod1.nsa",
319DNSDomain: "nsa.svc.cluster.local",
320Metadata: &model.NodeMetadata{Namespace: "nsa", ClusterID: clusterID},
321}
322got := controller.GetProxyWorkloadLabels(proxy)
323if pod.Spec.NodeName != got[labelutil.LabelHostname] {
324t.Fatalf("expected node name %v, got %v", pod.Spec.NodeName, got[labelutil.LabelHostname])
325}
326}
327
328func TestGetProxyServiceTargets(t *testing.T) {
329clusterID := cluster.ID("fakeCluster")
330networkID := network.ID("fakeNetwork")
331controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
332ClusterID: clusterID,
333})
334// add a network ID to test endpoints include topology.istio.io/network label
335controller.network = networkID
336
337p := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
338addPods(t, controller, fx, p)
339
340k8sSaOnVM := "acct4"
341canonicalSaOnVM := "acctvm2@gserviceaccount2.com"
342
343createServiceWait(controller, "svc1", "nsa", nil,
344map[string]string{
345annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
346annotation.AlphaCanonicalServiceAccounts.Name: canonicalSaOnVM,
347},
348[]int32{8080}, map[string]string{"app": "prod-app"}, t)
349
350// Endpoints are generated by Kubernetes from pod labels and service selectors.
351// Here we manually create them for mocking purpose.
352svc1Ips := []string{"128.0.0.1"}
353portNames := []string{"tcp-port"}
354// Create 1 endpoint that refers to a pod in the same namespace.
355createEndpoints(t, controller, "svc1", "nsA", portNames, svc1Ips, nil, nil)
356
357// Creates 100 endpoints that refers to a pod in a different namespace.
358fakeSvcCounts := 100
359for i := 0; i < fakeSvcCounts; i++ {
360svcName := fmt.Sprintf("svc-fake-%d", i)
361createServiceWait(controller, svcName, "nsfake", nil,
362map[string]string{
363annotation.AlphaKubernetesServiceAccounts.Name: k8sSaOnVM,
364annotation.AlphaCanonicalServiceAccounts.Name: canonicalSaOnVM,
365},
366[]int32{8080}, map[string]string{"app": "prod-app"}, t)
367
368createEndpoints(t, controller, svcName, "nsfake", portNames, svc1Ips, nil, nil)
369fx.WaitOrFail(t, "eds")
370}
371
372// Create 1 endpoint that refers to a pod in the same namespace.
373createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
374fx.WaitOrFail(t, "eds")
375
376// this can test get pod by proxy ID
377svcNode := &model.Proxy{
378Type: model.Router,
379IPAddresses: []string{"128.0.0.1"},
380ID: "pod1.nsa",
381DNSDomain: "nsa.svc.cluster.local",
382Metadata: &model.NodeMetadata{Namespace: "nsa", ClusterID: clusterID},
383}
384serviceInstances := controller.GetProxyServiceTargets(svcNode)
385
386if len(serviceInstances) != 1 {
387t.Fatalf("GetProxyServiceTargets() expected 1 instance, got %d", len(serviceInstances))
388}
389
390hostname := kube.ServiceHostname("svc1", "nsa", defaultFakeDomainSuffix)
391if serviceInstances[0].Service.Hostname != hostname {
392t.Fatalf("GetProxyServiceTargets() wrong service instance returned => hostname %q, want %q",
393serviceInstances[0].Service.Hostname, hostname)
394}
395
396// Test that we can look up instances just by Proxy metadata
397metaServices := controller.GetProxyServiceTargets(&model.Proxy{
398Type: "sidecar",
399IPAddresses: []string{"1.1.1.1"},
400Locality: &core.Locality{Region: "r", Zone: "z"},
401ConfigNamespace: "nsa",
402Labels: map[string]string{
403"app": "prod-app",
404label.SecurityTlsMode.Name: "mutual",
405},
406Metadata: &model.NodeMetadata{
407ServiceAccount: "account",
408ClusterID: clusterID,
409Labels: map[string]string{
410"app": "prod-app",
411label.SecurityTlsMode.Name: "mutual",
412},
413},
414})
415
416expected := model.ServiceTarget{
417Service: &model.Service{
418Hostname: "svc1.nsa.svc.company.com",
419ClusterVIPs: model.AddressMap{
420Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
421},
422DefaultAddress: "10.0.0.1",
423Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
424ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
425Attributes: model.ServiceAttributes{
426ServiceRegistry: provider.Kubernetes,
427Name: "svc1",
428Namespace: "nsa",
429LabelSelectors: map[string]string{"app": "prod-app"},
430K8sAttributes: model.K8sAttributes{
431Type: string(corev1.ServiceTypeClusterIP),
432},
433},
434},
435Port: model.ServiceInstancePort{
436ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
437TargetPort: 0,
438},
439}
440
441if len(metaServices) != 1 {
442t.Fatalf("expected 1 instance, got %v", len(metaServices))
443}
444if !reflect.DeepEqual(expected, metaServices[0]) {
445t.Fatalf("expected instance %v, got %v", expected, metaServices[0])
446}
447
448// Test that we first look up instances by Proxy pod
449
450node := generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"})
451addNodes(t, controller, node)
452
453// 1. pod without `istio-locality` label, get locality from node label.
454p = generatePod("129.0.0.1", "pod2", "nsa", "svcaccount", "node1",
455map[string]string{"app": "prod-app"}, nil)
456addPods(t, controller, fx, p)
457
458// this can test get pod by proxy ip address
459podServices := controller.GetProxyServiceTargets(&model.Proxy{
460Type: "sidecar",
461IPAddresses: []string{"129.0.0.1"},
462Locality: &core.Locality{Region: "r", Zone: "z"},
463ConfigNamespace: "nsa",
464Labels: map[string]string{
465"app": "prod-app",
466},
467Metadata: &model.NodeMetadata{
468ServiceAccount: "account",
469ClusterID: clusterID,
470Labels: map[string]string{
471"app": "prod-app",
472},
473},
474})
475
476expected = model.ServiceTarget{
477Service: &model.Service{
478Hostname: "svc1.nsa.svc.company.com",
479ClusterVIPs: model.AddressMap{
480Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
481},
482DefaultAddress: "10.0.0.1",
483Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
484ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
485Attributes: model.ServiceAttributes{
486ServiceRegistry: provider.Kubernetes,
487Name: "svc1",
488Namespace: "nsa",
489LabelSelectors: map[string]string{"app": "prod-app"},
490K8sAttributes: model.K8sAttributes{
491Type: string(corev1.ServiceTypeClusterIP),
492},
493},
494},
495Port: model.ServiceInstancePort{
496ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
497TargetPort: 0,
498},
499}
500if len(podServices) != 1 {
501t.Fatalf("expected 1 instance, got %v", len(podServices))
502}
503if !reflect.DeepEqual(expected, podServices[0]) {
504t.Fatalf("expected instance %v, got %v", expected, podServices[0])
505}
506
507// 2. pod with `istio-locality` label, ignore node label.
508p = generatePod("129.0.0.2", "pod3", "nsa", "svcaccount", "node1",
509map[string]string{"app": "prod-app", "istio-locality": "region.zone"}, nil)
510addPods(t, controller, fx, p)
511
512// this can test get pod by proxy ip address
513podServices = controller.GetProxyServiceTargets(&model.Proxy{
514Type: "sidecar",
515IPAddresses: []string{"129.0.0.2"},
516Locality: &core.Locality{Region: "r", Zone: "z"},
517ConfigNamespace: "nsa",
518Labels: map[string]string{
519"app": "prod-app",
520},
521Metadata: &model.NodeMetadata{
522ServiceAccount: "account",
523ClusterID: clusterID,
524Labels: map[string]string{
525"app": "prod-app",
526},
527},
528})
529
530expected = model.ServiceTarget{
531Service: &model.Service{
532Hostname: "svc1.nsa.svc.company.com",
533ClusterVIPs: model.AddressMap{
534Addresses: map[cluster.ID][]string{clusterID: {"10.0.0.1"}},
535},
536DefaultAddress: "10.0.0.1",
537Ports: []*model.Port{{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP}},
538ServiceAccounts: []string{"acctvm2@gserviceaccount2.com", "spiffe://cluster.local/ns/nsa/sa/acct4"},
539Attributes: model.ServiceAttributes{
540ServiceRegistry: provider.Kubernetes,
541Name: "svc1",
542Namespace: "nsa",
543LabelSelectors: map[string]string{"app": "prod-app"},
544K8sAttributes: model.K8sAttributes{
545Type: string(corev1.ServiceTypeClusterIP),
546},
547},
548},
549Port: model.ServiceInstancePort{
550ServicePort: &model.Port{Name: "tcp-port", Port: 8080, Protocol: protocol.TCP},
551},
552}
553if len(podServices) != 1 {
554t.Fatalf("expected 1 instance, got %v", len(podServices))
555}
556if !reflect.DeepEqual(expected, podServices[0]) {
557t.Fatalf("expected instance %v, got %v", expected, podServices[0])
558}
559
560// pod with no services should return no service targets
561p = generatePod("130.0.0.1", "pod4", "nsa", "foo", "node1", map[string]string{"app": "no-service-app"}, map[string]string{})
562addPods(t, controller, fx, p)
563
564podServices = controller.GetProxyServiceTargets(&model.Proxy{
565Type: "sidecar",
566IPAddresses: []string{"130.0.0.1"},
567Locality: &core.Locality{Region: "r", Zone: "z"},
568ConfigNamespace: "nsa",
569Labels: map[string]string{
570"app": "no-service-app",
571},
572Metadata: &model.NodeMetadata{
573ServiceAccount: "account",
574ClusterID: clusterID,
575Labels: map[string]string{
576"app": "no-service-app",
577},
578},
579})
580if len(podServices) != 0 {
581t.Fatalf("expect 0 instance, got %v", len(podServices))
582}
583}
584
585func TestGetProxyServiceTargetsWithMultiIPsAndTargetPorts(t *testing.T) {
586pod1 := generatePod("128.0.0.1", "pod1", "nsa", "foo", "node1", map[string]string{"app": "test-app"}, map[string]string{})
587testCases := []struct {
588name string
589pods []*corev1.Pod
590ips []string
591ports []corev1.ServicePort
592wantPorts []model.ServiceInstancePort
593}{
594{
595name: "multiple proxy ips single port",
596pods: []*corev1.Pod{pod1},
597ips: []string{"128.0.0.1", "192.168.2.6"},
598ports: []corev1.ServicePort{
599{
600Name: "tcp-port",
601Port: 8080,
602Protocol: "http",
603TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
604},
605},
606wantPorts: []model.ServiceInstancePort{
607{
608ServicePort: &model.Port{
609Name: "tcp-port",
610Port: 8080,
611Protocol: "TCP",
612},
613TargetPort: 8080,
614},
615},
616},
617{
618name: "single proxy ip single port",
619pods: []*corev1.Pod{pod1},
620ips: []string{"128.0.0.1"},
621ports: []corev1.ServicePort{
622{
623Name: "tcp-port",
624Port: 8080,
625Protocol: "TCP",
626TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
627},
628},
629wantPorts: []model.ServiceInstancePort{
630{
631ServicePort: &model.Port{
632Name: "tcp-port",
633Port: 8080,
634Protocol: "TCP",
635},
636TargetPort: 8080,
637},
638},
639},
640{
641name: "multiple proxy ips multiple ports",
642pods: []*corev1.Pod{pod1},
643ips: []string{"128.0.0.1", "192.168.2.6"},
644ports: []corev1.ServicePort{
645{
646Name: "tcp-port-1",
647Port: 8080,
648Protocol: "http",
649TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
650},
651{
652Name: "tcp-port-2",
653Port: 9090,
654Protocol: "http",
655TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
656},
657},
658wantPorts: []model.ServiceInstancePort{
659{
660ServicePort: &model.Port{
661Name: "tcp-port-1",
662Port: 8080,
663Protocol: "TCP",
664},
665TargetPort: 8080,
666},
667{
668ServicePort: &model.Port{
669Name: "tcp-port-2",
670Port: 9090,
671Protocol: "TCP",
672},
673TargetPort: 9090,
674},
675{
676ServicePort: &model.Port{
677Name: "tcp-port-1",
678Port: 7442,
679Protocol: "TCP",
680},
681TargetPort: 7442,
682},
683},
684},
685{
686name: "single proxy ip multiple ports same target port with different protocols",
687pods: []*corev1.Pod{pod1},
688ips: []string{"128.0.0.1"},
689ports: []corev1.ServicePort{
690{
691Name: "tcp-port",
692Port: 8080,
693Protocol: "TCP",
694TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
695},
696{
697Name: "http-port",
698Port: 9090,
699Protocol: "TCP",
700TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
701},
702},
703wantPorts: []model.ServiceInstancePort{
704{
705ServicePort: &model.Port{
706Name: "tcp-port",
707Port: 8080,
708Protocol: "TCP",
709},
710TargetPort: 8080,
711},
712{
713ServicePort: &model.Port{
714Name: "http-port",
715Port: 9090,
716Protocol: "HTTP",
717},
718TargetPort: 8080,
719},
720},
721},
722{
723name: "single proxy ip multiple ports same target port with overlapping protocols",
724pods: []*corev1.Pod{pod1},
725ips: []string{"128.0.0.1"},
726ports: []corev1.ServicePort{
727{
728Name: "http-7442",
729Port: 7442,
730Protocol: "TCP",
731TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
732},
733{
734Name: "tcp-8443",
735Port: 8443,
736Protocol: "TCP",
737TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
738},
739{
740Name: "http-7557",
741Port: 7557,
742Protocol: "TCP",
743TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 7442},
744},
745},
746wantPorts: []model.ServiceInstancePort{
747{
748ServicePort: &model.Port{
749Name: "http-7442",
750Port: 7442,
751Protocol: "HTTP",
752},
753TargetPort: 7442,
754},
755{
756ServicePort: &model.Port{
757Name: "tcp-8443",
758Port: 8443,
759Protocol: "TCP",
760},
761TargetPort: 7442,
762},
763},
764},
765{
766name: "single proxy ip multiple ports",
767pods: []*corev1.Pod{pod1},
768ips: []string{"128.0.0.1"},
769ports: []corev1.ServicePort{
770{
771Name: "tcp-port",
772Port: 8080,
773Protocol: "TCP",
774TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
775},
776{
777Name: "http-port",
778Port: 9090,
779Protocol: "TCP",
780TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 9090},
781},
782},
783wantPorts: []model.ServiceInstancePort{
784{
785ServicePort: &model.Port{
786Name: "tcp-port",
787Port: 8080,
788Protocol: "TCP",
789},
790TargetPort: 8080,
791},
792{
793ServicePort: &model.Port{
794Name: "http-port",
795Port: 9090,
796Protocol: "HTTP",
797},
798TargetPort: 9090,
799},
800},
801},
802}
803
804for _, c := range testCases {
805t.Run(c.name, func(t *testing.T) {
806// Setup kube caches
807controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
808
809addPods(t, controller, fx, c.pods...)
810
811createServiceWithTargetPorts(controller, "svc1", "nsa",
812map[string]string{
813annotation.AlphaKubernetesServiceAccounts.Name: "acct4",
814annotation.AlphaCanonicalServiceAccounts.Name: "acctvm2@gserviceaccount2.com",
815},
816c.ports, map[string]string{"app": "test-app"}, t)
817
818serviceInstances := controller.GetProxyServiceTargets(&model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: c.ips})
819
820for i, svc := range serviceInstances {
821assert.Equal(t, svc.Port, c.wantPorts[i])
822}
823})
824}
825}
826
827func TestGetProxyServiceTargets_WorkloadInstance(t *testing.T) {
828ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
829
830createServiceWait(ctl, "ratings", "bookinfo-ratings",
831map[string]string{},
832map[string]string{
833annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
834annotation.AlphaCanonicalServiceAccounts.Name: "ratings@gserviceaccount2.com",
835},
836[]int32{8080}, map[string]string{"app": "ratings"}, t)
837
838createServiceWait(ctl, "details", "bookinfo-details",
839map[string]string{},
840map[string]string{
841annotation.AlphaKubernetesServiceAccounts.Name: "details",
842annotation.AlphaCanonicalServiceAccounts.Name: "details@gserviceaccount2.com",
843},
844[]int32{9090}, map[string]string{"app": "details"}, t)
845
846createServiceWait(ctl, "reviews", "bookinfo-reviews",
847map[string]string{},
848map[string]string{
849annotation.AlphaKubernetesServiceAccounts.Name: "reviews",
850annotation.AlphaCanonicalServiceAccounts.Name: "reviews@gserviceaccount2.com",
851},
852[]int32{7070}, map[string]string{"app": "reviews"}, t)
853
854wiRatings1 := &model.WorkloadInstance{
855Name: "ratings-1",
856Namespace: "bookinfo-ratings",
857Endpoint: &model.IstioEndpoint{
858Labels: labels.Instance{"app": "ratings"},
859Address: "2.2.2.21",
860EndpointPort: 8080,
861},
862}
863
864wiDetails1 := &model.WorkloadInstance{
865Name: "details-1",
866Namespace: "bookinfo-details",
867Endpoint: &model.IstioEndpoint{
868Labels: labels.Instance{"app": "details"},
869Address: "2.2.2.21",
870EndpointPort: 9090,
871},
872}
873
874wiReviews1 := &model.WorkloadInstance{
875Name: "reviews-1",
876Namespace: "bookinfo-reviews",
877Endpoint: &model.IstioEndpoint{
878Labels: labels.Instance{"app": "reviews"},
879Address: "3.3.3.31",
880EndpointPort: 7070,
881},
882}
883
884wiReviews2 := &model.WorkloadInstance{
885Name: "reviews-2",
886Namespace: "bookinfo-reviews",
887Endpoint: &model.IstioEndpoint{
888Labels: labels.Instance{"app": "reviews"},
889Address: "3.3.3.32",
890EndpointPort: 7071,
891},
892}
893
894wiProduct1 := &model.WorkloadInstance{
895Name: "productpage-1",
896Namespace: "bookinfo-productpage",
897Endpoint: &model.IstioEndpoint{
898Labels: labels.Instance{"app": "productpage"},
899Address: "4.4.4.41",
900EndpointPort: 6060,
901},
902}
903
904for _, wi := range []*model.WorkloadInstance{wiRatings1, wiDetails1, wiReviews1, wiReviews2, wiProduct1} {
905ctl.workloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
906}
907
908cases := []struct {
909name string
910proxy *model.Proxy
911want []model.ServiceTarget
912}{
913{
914name: "proxy with unspecified IP",
915proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: nil},
916want: nil,
917},
918{
919name: "proxy with IP not in the registry",
920proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"1.1.1.1"}},
921want: nil,
922},
923{
924name: "proxy with IP from the registry, 1 matching WE, but no matching Service",
925proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"4.4.4.41"}},
926want: nil,
927},
928{
929name: "proxy with IP from the registry, 1 matching WE, and matching Service",
930proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"3.3.3.31"}},
931want: []model.ServiceTarget{{
932Service: &model.Service{
933Hostname: "reviews.bookinfo-reviews.svc.company.com",
934},
935Port: model.ServiceInstancePort{
936ServicePort: nil,
937TargetPort: 7070,
938},
939}},
940},
941{
942name: "proxy with IP from the registry, 2 matching WE, and matching Service",
943proxy: &model.Proxy{Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"}},
944want: []model.ServiceTarget{{
945Service: &model.Service{
946Hostname: "details.bookinfo-details.svc.company.com",
947},
948Port: model.ServiceInstancePort{
949ServicePort: nil,
950TargetPort: 9090,
951},
952}},
953},
954{
955name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE with a different address",
956proxy: &model.Proxy{
957Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
958ID: "reviews-1.bookinfo-reviews", ConfigNamespace: "bookinfo-reviews",
959},
960want: []model.ServiceTarget{{
961Service: &model.Service{
962Hostname: "details.bookinfo-details.svc.company.com",
963},
964Port: model.ServiceInstancePort{
965ServicePort: nil,
966TargetPort: 9090,
967},
968}},
969},
970{
971name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy ID equal to WE name, but proxy.ID != proxy.ConfigNamespace",
972proxy: &model.Proxy{
973Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
974ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "wrong-namespace",
975},
976want: []model.ServiceTarget{{
977Service: &model.Service{
978Hostname: "details.bookinfo-details.svc.company.com",
979},
980Port: model.ServiceInstancePort{
981ServicePort: nil,
982TargetPort: 9090,
983},
984}},
985},
986{
987name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID == WE name",
988proxy: &model.Proxy{
989Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
990ID: "ratings-1.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
991},
992want: []model.ServiceTarget{{
993Service: &model.Service{
994Hostname: "ratings.bookinfo-ratings.svc.company.com",
995},
996Port: model.ServiceInstancePort{
997ServicePort: nil,
998TargetPort: 8080,
999},
1000}},
1001},
1002{
1003name: "proxy with IP from the registry, 2 matching WE, and matching Service, and proxy.ID != WE name, but proxy.ConfigNamespace == WE namespace",
1004proxy: &model.Proxy{
1005Metadata: &model.NodeMetadata{}, IPAddresses: []string{"2.2.2.21"},
1006ID: "wrong-name.bookinfo-ratings", ConfigNamespace: "bookinfo-ratings",
1007},
1008want: []model.ServiceTarget{{
1009Service: &model.Service{
1010Hostname: "ratings.bookinfo-ratings.svc.company.com",
1011},
1012Port: model.ServiceInstancePort{
1013ServicePort: nil,
1014TargetPort: 8080,
1015},
1016}},
1017},
1018}
1019
1020for _, tc := range cases {
1021t.Run(tc.name, func(t *testing.T) {
1022got := ctl.GetProxyServiceTargets(tc.proxy)
1023
1024if diff := cmp.Diff(len(tc.want), len(got)); diff != "" {
1025t.Fatalf("GetProxyServiceTargets() returned unexpected number of service instances (--want/++got): %v", diff)
1026}
1027
1028for i := range tc.want {
1029assert.Equal(t, tc.want[i].Service.Hostname, got[i].Service.Hostname)
1030assert.Equal(t, tc.want[i].Port.TargetPort, got[i].Port.TargetPort)
1031}
1032})
1033}
1034}
1035
1036func TestController_Service(t *testing.T) {
1037controller, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1038
1039// Use a timeout to keep the test from hanging.
1040
1041createServiceWait(controller, "svc1", "nsA",
1042map[string]string{}, map[string]string{},
1043[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1044createServiceWait(controller, "svc2", "nsA",
1045map[string]string{}, map[string]string{},
1046[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1047createServiceWait(controller, "svc3", "nsA",
1048map[string]string{}, map[string]string{},
1049[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1050createServiceWait(controller, "svc4", "nsA",
1051map[string]string{}, map[string]string{},
1052[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1053
1054expectedSvcList := []*model.Service{
1055{
1056Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1057DefaultAddress: "10.0.0.1",
1058Ports: model.PortList{
1059&model.Port{
1060Name: "tcp-port",
1061Port: 8080,
1062Protocol: protocol.TCP,
1063},
1064},
1065},
1066{
1067Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1068DefaultAddress: "10.0.0.1",
1069Ports: model.PortList{
1070&model.Port{
1071Name: "tcp-port",
1072Port: 8081,
1073Protocol: protocol.TCP,
1074},
1075},
1076},
1077{
1078Hostname: kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
1079DefaultAddress: "10.0.0.1",
1080Ports: model.PortList{
1081&model.Port{
1082Name: "tcp-port",
1083Port: 8082,
1084Protocol: protocol.TCP,
1085},
1086},
1087},
1088{
1089Hostname: kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
1090DefaultAddress: "10.0.0.1",
1091Ports: model.PortList{
1092&model.Port{
1093Name: "tcp-port",
1094Port: 8083,
1095Protocol: protocol.TCP,
1096},
1097},
1098},
1099}
1100
1101svcList := controller.Services()
1102servicesEqual(svcList, expectedSvcList)
1103}
1104
1105func TestController_ServiceWithFixedDiscoveryNamespaces(t *testing.T) {
1106meshWatcher := mesh.NewFixedWatcher(&meshconfig.MeshConfig{
1107DiscoverySelectors: []*metav1.LabelSelector{
1108{
1109MatchLabels: map[string]string{
1110"pilot-discovery": "enabled",
1111},
1112},
1113{
1114MatchExpressions: []metav1.LabelSelectorRequirement{
1115{
1116Key: "env",
1117Operator: metav1.LabelSelectorOpIn,
1118Values: []string{"test", "dev"},
1119},
1120},
1121},
1122},
1123})
1124
1125svc1 := &model.Service{
1126Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1127DefaultAddress: "10.0.0.1",
1128Ports: model.PortList{
1129&model.Port{
1130Name: "tcp-port",
1131Port: 8080,
1132Protocol: protocol.TCP,
1133},
1134},
1135}
1136svc2 := &model.Service{
1137Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1138DefaultAddress: "10.0.0.1",
1139Ports: model.PortList{
1140&model.Port{
1141Name: "tcp-port",
1142Port: 8081,
1143Protocol: protocol.TCP,
1144},
1145},
1146}
1147svc3 := &model.Service{
1148Hostname: kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1149DefaultAddress: "10.0.0.1",
1150Ports: model.PortList{
1151&model.Port{
1152Name: "tcp-port",
1153Port: 8082,
1154Protocol: protocol.TCP,
1155},
1156},
1157}
1158svc4 := &model.Service{
1159Hostname: kube.ServiceHostname("svc4", "nsB", defaultFakeDomainSuffix),
1160DefaultAddress: "10.0.0.1",
1161Ports: model.PortList{
1162&model.Port{
1163Name: "tcp-port",
1164Port: 8083,
1165Protocol: protocol.TCP,
1166},
1167},
1168}
1169
1170controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1171MeshWatcher: meshWatcher,
1172})
1173
1174nsA := "nsA"
1175nsB := "nsB"
1176
1177// event handlers should only be triggered for services in namespaces selected for discovery
1178createNamespace(t, controller.client.Kube(), nsA, map[string]string{"pilot-discovery": "enabled"})
1179createNamespace(t, controller.client.Kube(), nsB, map[string]string{})
1180
1181// service event handlers should trigger for svc1 and svc2
1182createServiceWait(controller, "svc1", nsA,
1183map[string]string{}, map[string]string{},
1184[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1185createServiceWait(controller, "svc2", nsA,
1186map[string]string{}, map[string]string{},
1187[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1188// service event handlers should not trigger for svc3 and svc4
1189createService(controller, "svc3", nsB,
1190map[string]string{}, map[string]string{},
1191[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1192createService(controller, "svc4", nsB,
1193map[string]string{}, map[string]string{},
1194[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1195
1196expectedSvcList := []*model.Service{svc1, svc2}
1197eventually(t, func() bool {
1198svcList := controller.Services()
1199return servicesEqual(svcList, expectedSvcList)
1200})
1201
1202// test updating namespace with adding discovery label
1203updateNamespace(t, controller.client.Kube(), nsB, map[string]string{"env": "test"})
1204// service event handlers should trigger for svc3 and svc4
1205fx.WaitOrFail(t, "service")
1206fx.WaitOrFail(t, "service")
1207expectedSvcList = []*model.Service{svc1, svc2, svc3, svc4}
1208eventually(t, func() bool {
1209svcList := controller.Services()
1210return servicesEqual(svcList, expectedSvcList)
1211})
1212
1213// test updating namespace by removing discovery label
1214updateNamespace(t, controller.client.Kube(), nsA, map[string]string{"pilot-discovery": "disabled"})
1215// service event handlers should trigger for svc1 and svc2
1216fx.WaitOrFail(t, "service")
1217fx.WaitOrFail(t, "service")
1218expectedSvcList = []*model.Service{svc3, svc4}
1219eventually(t, func() bool {
1220svcList := controller.Services()
1221return servicesEqual(svcList, expectedSvcList)
1222})
1223}
1224
1225func TestController_ServiceWithChangingDiscoveryNamespaces(t *testing.T) {
1226svc1 := &model.Service{
1227Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1228DefaultAddress: "10.0.0.1",
1229Ports: model.PortList{
1230&model.Port{
1231Name: "tcp-port",
1232Port: 8080,
1233Protocol: protocol.TCP,
1234},
1235},
1236}
1237svc2 := &model.Service{
1238Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1239DefaultAddress: "10.0.0.1",
1240Ports: model.PortList{
1241&model.Port{
1242Name: "tcp-port",
1243Port: 8081,
1244Protocol: protocol.TCP,
1245},
1246},
1247}
1248svc3 := &model.Service{
1249Hostname: kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1250DefaultAddress: "10.0.0.1",
1251Ports: model.PortList{
1252&model.Port{
1253Name: "tcp-port",
1254Port: 8082,
1255Protocol: protocol.TCP,
1256},
1257},
1258}
1259svc4 := &model.Service{
1260Hostname: kube.ServiceHostname("svc4", "nsC", defaultFakeDomainSuffix),
1261DefaultAddress: "10.0.0.1",
1262Ports: model.PortList{
1263&model.Port{
1264Name: "tcp-port",
1265Port: 8083,
1266Protocol: protocol.TCP,
1267},
1268},
1269}
1270
1271updateMeshConfig := func(
1272meshConfig *meshconfig.MeshConfig,
1273expectedSvcList []*model.Service,
1274expectedNumSvcEvents int,
1275testMeshWatcher *mesh.TestWatcher,
1276fx *xdsfake.Updater,
1277controller *FakeController,
1278) {
1279// update meshConfig
1280if err := testMeshWatcher.Update(meshConfig, time.Second*5); err != nil {
1281t.Fatalf("%v", err)
1282}
1283
1284// assert firing of service events
1285for i := 0; i < expectedNumSvcEvents; i++ {
1286fx.WaitOrFail(t, "service")
1287}
1288
1289eventually(t, func() bool {
1290svcList := controller.Services()
1291return servicesEqual(svcList, expectedSvcList)
1292})
1293}
1294
1295meshWatcher := mesh.NewTestWatcher(&meshconfig.MeshConfig{})
1296
1297controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1298MeshWatcher: meshWatcher,
1299})
1300
1301nsA := "nsA"
1302nsB := "nsB"
1303nsC := "nsC"
1304
1305createNamespace(t, controller.client.Kube(), nsA, map[string]string{"app": "foo"})
1306createNamespace(t, controller.client.Kube(), nsB, map[string]string{"app": "bar"})
1307createNamespace(t, controller.client.Kube(), nsC, map[string]string{"app": "baz"})
1308
1309// service event handlers should trigger for all svcs
1310createServiceWait(controller, "svc1", nsA,
1311map[string]string{}, map[string]string{},
1312[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1313createServiceWait(controller, "svc2", nsA,
1314map[string]string{}, map[string]string{},
1315[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1316createServiceWait(controller, "svc3", nsB,
1317map[string]string{}, map[string]string{},
1318[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1319createServiceWait(controller, "svc4", nsC,
1320map[string]string{}, map[string]string{},
1321[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1322
1323expectedSvcList := []*model.Service{svc1, svc2, svc3, svc4}
1324eventually(t, func() bool {
1325svcList := controller.Services()
1326return servicesEqual(svcList, expectedSvcList)
1327})
1328
1329// restrict namespaces to nsA (expect 2 delete events for svc3 and svc4)
1330updateMeshConfig(
1331&meshconfig.MeshConfig{
1332DiscoverySelectors: []*metav1.LabelSelector{
1333{
1334MatchLabels: map[string]string{
1335"app": "foo",
1336},
1337},
1338},
1339},
1340[]*model.Service{svc1, svc2},
13412,
1342meshWatcher,
1343fx,
1344controller,
1345)
1346
1347// restrict namespaces to nsB (1 create event should trigger for nsB service and 2 delete events for nsA services)
1348updateMeshConfig(
1349&meshconfig.MeshConfig{
1350DiscoverySelectors: []*metav1.LabelSelector{
1351{
1352MatchLabels: map[string]string{
1353"app": "bar",
1354},
1355},
1356},
1357},
1358[]*model.Service{svc3},
13593,
1360meshWatcher,
1361fx,
1362controller,
1363)
1364
1365// expand namespaces to nsA and nsB with selectors (2 create events should trigger for nsA services)
1366updateMeshConfig(
1367&meshconfig.MeshConfig{
1368DiscoverySelectors: []*metav1.LabelSelector{
1369{
1370MatchExpressions: []metav1.LabelSelectorRequirement{
1371{
1372Key: "app",
1373Operator: metav1.LabelSelectorOpIn,
1374Values: []string{"foo", "bar"},
1375},
1376},
1377},
1378},
1379},
1380[]*model.Service{svc1, svc2, svc3},
13812,
1382meshWatcher,
1383fx,
1384controller,
1385)
1386
1387// permit all discovery namespaces by omitting discovery selectors (1 create event should trigger for the nsC service)
1388updateMeshConfig(
1389&meshconfig.MeshConfig{
1390DiscoverySelectors: []*metav1.LabelSelector{},
1391},
1392[]*model.Service{svc1, svc2, svc3, svc4},
13931,
1394meshWatcher,
1395fx,
1396controller,
1397)
1398}
1399
1400func TestControllerResourceScoping(t *testing.T) {
1401svc1 := &model.Service{
1402Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1403DefaultAddress: "10.0.0.1",
1404Ports: model.PortList{
1405&model.Port{
1406Name: "tcp-port",
1407Port: 8080,
1408Protocol: protocol.TCP,
1409},
1410},
1411}
1412svc2 := &model.Service{
1413Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1414DefaultAddress: "10.0.0.1",
1415Ports: model.PortList{
1416&model.Port{
1417Name: "tcp-port",
1418Port: 8081,
1419Protocol: protocol.TCP,
1420},
1421},
1422}
1423svc3 := &model.Service{
1424Hostname: kube.ServiceHostname("svc3", "nsB", defaultFakeDomainSuffix),
1425DefaultAddress: "10.0.0.1",
1426Ports: model.PortList{
1427&model.Port{
1428Name: "tcp-port",
1429Port: 8082,
1430Protocol: protocol.TCP,
1431},
1432},
1433}
1434svc4 := &model.Service{
1435Hostname: kube.ServiceHostname("svc4", "nsC", defaultFakeDomainSuffix),
1436DefaultAddress: "10.0.0.1",
1437Ports: model.PortList{
1438&model.Port{
1439Name: "tcp-port",
1440Port: 8083,
1441Protocol: protocol.TCP,
1442},
1443},
1444}
1445
1446updateMeshConfig := func(
1447meshConfig *meshconfig.MeshConfig,
1448expectedSvcList []*model.Service,
1449expectedNumSvcEvents int,
1450testMeshWatcher *mesh.TestWatcher,
1451fx *xdsfake.Updater,
1452controller *FakeController,
1453) {
1454t.Helper()
1455// update meshConfig
1456if err := testMeshWatcher.Update(meshConfig, time.Second*5); err != nil {
1457t.Fatalf("%v", err)
1458}
1459
1460// assert firing of service events
1461for i := 0; i < expectedNumSvcEvents; i++ {
1462fx.WaitOrFail(t, "service")
1463}
1464
1465eventually(t, func() bool {
1466svcList := controller.Services()
1467return servicesEqual(svcList, expectedSvcList)
1468})
1469}
1470
1471client := kubelib.NewFakeClient()
1472t.Cleanup(client.Shutdown)
1473meshWatcher := mesh.NewTestWatcher(&meshconfig.MeshConfig{})
1474
1475nsA := "nsA"
1476nsB := "nsB"
1477nsC := "nsC"
1478
1479createNamespace(t, client.Kube(), nsA, map[string]string{"app": "foo"})
1480createNamespace(t, client.Kube(), nsB, map[string]string{"app": "bar"})
1481createNamespace(t, client.Kube(), nsC, map[string]string{"app": "baz"})
1482
1483controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1484Client: client,
1485MeshWatcher: meshWatcher,
1486})
1487
1488// service event handlers should trigger for all svcs
1489createServiceWait(controller, "svc1", nsA,
1490map[string]string{},
1491map[string]string{},
1492[]int32{8080}, map[string]string{"test-app": "test-app-1"}, t)
1493
1494createServiceWait(controller, "svc2", nsA,
1495map[string]string{},
1496map[string]string{},
1497[]int32{8081}, map[string]string{"test-app": "test-app-2"}, t)
1498
1499createServiceWait(controller, "svc3", nsB,
1500map[string]string{},
1501map[string]string{},
1502[]int32{8082}, map[string]string{"test-app": "test-app-3"}, t)
1503
1504createServiceWait(controller, "svc4", nsC,
1505map[string]string{},
1506map[string]string{},
1507[]int32{8083}, map[string]string{"test-app": "test-app-4"}, t)
1508
1509expectedSvcList := []*model.Service{svc1, svc2, svc3, svc4}
1510eventually(t, func() bool {
1511svcList := controller.Services()
1512return servicesEqual(svcList, expectedSvcList)
1513})
1514
1515fx.Clear()
1516
1517// restrict namespaces to nsA (expect 2 delete events for svc3 and svc4)
1518updateMeshConfig(
1519&meshconfig.MeshConfig{
1520DiscoverySelectors: []*metav1.LabelSelector{
1521{
1522MatchLabels: map[string]string{
1523"app": "foo",
1524},
1525},
1526},
1527},
1528[]*model.Service{svc1, svc2},
15292,
1530meshWatcher,
1531fx,
1532controller,
1533)
1534
1535// namespace nsB, nsC deselected
1536fx.AssertEmpty(t, 0)
1537
1538// create vs1 in nsA
1539createVirtualService(controller, "vs1", nsA, map[string]string{}, t)
1540
1541// create vs1 in nsB
1542createVirtualService(controller, "vs2", nsB, map[string]string{}, t)
1543
1544// expand namespaces to nsA and nsB with selectors (expect events svc3 and a full push event for nsB selected)
1545updateMeshConfig(
1546&meshconfig.MeshConfig{
1547DiscoverySelectors: []*metav1.LabelSelector{
1548{
1549MatchExpressions: []metav1.LabelSelectorRequirement{
1550{
1551Key: "app",
1552Operator: metav1.LabelSelectorOpIn,
1553Values: []string{"foo", "bar"},
1554},
1555},
1556},
1557},
1558},
1559[]*model.Service{svc1, svc2, svc3},
15601,
1561meshWatcher,
1562fx,
1563controller,
1564)
1565
1566// namespace nsB selected
1567fx.AssertEmpty(t, 0)
1568}
1569
1570func TestEndpoints_WorkloadInstances(t *testing.T) {
1571ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1572
1573createServiceWithTargetPorts(ctl, "ratings", "bookinfo-ratings",
1574map[string]string{
1575annotation.AlphaKubernetesServiceAccounts.Name: "ratings",
1576annotation.AlphaCanonicalServiceAccounts.Name: "ratings@gserviceaccount2.com",
1577},
1578[]corev1.ServicePort{
1579{
1580Name: "http-port",
1581Port: 8080,
1582Protocol: "TCP",
1583TargetPort: intstr.IntOrString{Type: intstr.String, StrVal: "http"},
1584},
1585},
1586map[string]string{"app": "ratings"}, t)
1587
1588wiRatings1 := &model.WorkloadInstance{
1589Name: "ratings-1",
1590Namespace: "bookinfo-ratings",
1591Endpoint: &model.IstioEndpoint{
1592Labels: labels.Instance{"app": "ratings"},
1593Address: "2.2.2.2",
1594EndpointPort: 8081, // should be ignored since it doesn't define PortMap
1595},
1596}
1597
1598wiRatings2 := &model.WorkloadInstance{
1599Name: "ratings-2",
1600Namespace: "bookinfo-ratings",
1601Endpoint: &model.IstioEndpoint{
1602Labels: labels.Instance{"app": "ratings"},
1603Address: "2.2.2.2",
1604},
1605PortMap: map[string]uint32{
1606"http": 8082, // should be used
1607},
1608}
1609
1610wiRatings3 := &model.WorkloadInstance{
1611Name: "ratings-3",
1612Namespace: "bookinfo-ratings",
1613Endpoint: &model.IstioEndpoint{
1614Labels: labels.Instance{"app": "ratings"},
1615Address: "2.2.2.2",
1616},
1617PortMap: map[string]uint32{
1618"http": 8083, // should be used
1619},
1620}
1621
1622for _, wi := range []*model.WorkloadInstance{wiRatings1, wiRatings2, wiRatings3} {
1623ctl.workloadInstanceHandler(wi, model.EventAdd) // simulate adding a workload entry
1624}
1625
1626// get service object
1627svcs := ctl.Services()
1628if len(svcs) != 1 {
1629t.Fatalf("failed to get services (%v)", svcs)
1630}
1631
1632endpoints := GetEndpoints(svcs[0], ctl.Endpoints)
1633
1634want := []string{"2.2.2.2:8082", "2.2.2.2:8083"} // expect both WorkloadEntries even though they have the same IP
1635
1636got := make([]string, 0, len(endpoints))
1637for _, instance := range endpoints {
1638got = append(got, net.JoinHostPort(instance.Address, strconv.Itoa(int(instance.EndpointPort))))
1639}
1640sort.Strings(got)
1641
1642assert.Equal(t, want, got)
1643}
1644
1645func TestExternalNameServiceInstances(t *testing.T) {
1646t.Run("alias", func(t *testing.T) {
1647test.SetForTest(t, &features.EnableExternalNameAlias, true)
1648controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1649createExternalNameService(controller, "svc5", "nsA",
1650[]int32{1, 2, 3}, "foo.co", t, fx)
1651
1652converted := controller.Services()
1653assert.Equal(t, len(converted), 1)
1654
1655eps := GetEndpointsForPort(converted[0], controller.Endpoints, 1)
1656assert.Equal(t, len(eps), 0)
1657assert.Equal(t, converted[0].Attributes, model.ServiceAttributes{
1658ServiceRegistry: "Kubernetes",
1659Name: "svc5",
1660Namespace: "nsA",
1661Labels: nil,
1662ExportTo: nil,
1663LabelSelectors: nil,
1664Aliases: nil,
1665ClusterExternalAddresses: nil,
1666ClusterExternalPorts: nil,
1667K8sAttributes: model.K8sAttributes{
1668Type: string(corev1.ServiceTypeExternalName),
1669ExternalName: "foo.co",
1670},
1671})
1672})
1673t.Run("no alias", func(t *testing.T) {
1674test.SetForTest(t, &features.EnableExternalNameAlias, false)
1675controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
1676createExternalNameService(controller, "svc5", "nsA",
1677[]int32{1, 2, 3}, "foo.co", t, fx)
1678
1679converted := controller.Services()
1680assert.Equal(t, len(converted), 1)
1681eps := GetEndpointsForPort(converted[0], controller.Endpoints, 1)
1682assert.Equal(t, len(eps), 1)
1683assert.Equal(t, eps[0], &model.IstioEndpoint{
1684Address: "foo.co",
1685ServicePortName: "tcp-port-1",
1686EndpointPort: 1,
1687DiscoverabilityPolicy: model.AlwaysDiscoverable,
1688})
1689})
1690}
1691
1692func TestController_ExternalNameService(t *testing.T) {
1693test.SetForTest(t, &features.EnableExternalNameAlias, false)
1694deleteWg := sync.WaitGroup{}
1695controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{
1696ServiceHandler: func(_, _ *model.Service, e model.Event) {
1697if e == model.EventDelete {
1698deleteWg.Done()
1699}
1700},
1701})
1702
1703k8sSvcs := []*corev1.Service{
1704createExternalNameService(controller, "svc1", "nsA",
1705[]int32{8080}, "test-app-1.test.svc."+defaultFakeDomainSuffix, t, fx),
1706createExternalNameService(controller, "svc2", "nsA",
1707[]int32{8081}, "test-app-2.test.svc."+defaultFakeDomainSuffix, t, fx),
1708createExternalNameService(controller, "svc3", "nsA",
1709[]int32{8082}, "test-app-3.test.pod."+defaultFakeDomainSuffix, t, fx),
1710createExternalNameService(controller, "svc4", "nsA",
1711[]int32{8083}, "g.co", t, fx),
1712}
1713
1714expectedSvcList := []*model.Service{
1715{
1716Hostname: kube.ServiceHostname("svc1", "nsA", defaultFakeDomainSuffix),
1717Ports: model.PortList{
1718&model.Port{
1719Name: "tcp-port-8080",
1720Port: 8080,
1721Protocol: protocol.TCP,
1722},
1723},
1724MeshExternal: true,
1725Resolution: model.DNSLB,
1726},
1727{
1728Hostname: kube.ServiceHostname("svc2", "nsA", defaultFakeDomainSuffix),
1729Ports: model.PortList{
1730&model.Port{
1731Name: "tcp-port-8081",
1732Port: 8081,
1733Protocol: protocol.TCP,
1734},
1735},
1736MeshExternal: true,
1737Resolution: model.DNSLB,
1738},
1739{
1740Hostname: kube.ServiceHostname("svc3", "nsA", defaultFakeDomainSuffix),
1741Ports: model.PortList{
1742&model.Port{
1743Name: "tcp-port-8082",
1744Port: 8082,
1745Protocol: protocol.TCP,
1746},
1747},
1748MeshExternal: true,
1749Resolution: model.DNSLB,
1750},
1751{
1752Hostname: kube.ServiceHostname("svc4", "nsA", defaultFakeDomainSuffix),
1753Ports: model.PortList{
1754&model.Port{
1755Name: "tcp-port-8083",
1756Port: 8083,
1757Protocol: protocol.TCP,
1758},
1759},
1760MeshExternal: true,
1761Resolution: model.DNSLB,
1762},
1763}
1764
1765svcList := controller.Services()
1766if len(svcList) != len(expectedSvcList) {
1767t.Fatalf("Expecting %d service but got %d\r\n", len(expectedSvcList), len(svcList))
1768}
1769for i, exp := range expectedSvcList {
1770if exp.Hostname != svcList[i].Hostname {
1771t.Fatalf("got hostname of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Hostname, exp.Hostname)
1772}
1773if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
1774t.Fatalf("got ports of %dst service, got:\n%#v\nwanted:\n%#v\n", i+1, svcList[i].Ports, exp.Ports)
1775}
1776if svcList[i].MeshExternal != exp.MeshExternal {
1777t.Fatalf("i=%v, MeshExternal==%v, should be %v: externalName='%s'", i+1, exp.MeshExternal, svcList[i].MeshExternal, k8sSvcs[i].Spec.ExternalName)
1778}
1779if svcList[i].Resolution != exp.Resolution {
1780t.Fatalf("i=%v, Resolution=='%v', should be '%v'", i+1, svcList[i].Resolution, exp.Resolution)
1781}
1782endpoints := GetEndpoints(svcList[i], controller.Endpoints)
1783assert.Equal(t, len(endpoints), 1)
1784assert.Equal(t, endpoints[0].Address, k8sSvcs[i].Spec.ExternalName)
1785}
1786
1787deleteWg.Add(len(k8sSvcs))
1788for _, s := range k8sSvcs {
1789deleteExternalNameService(controller, s.Name, s.Namespace, t, fx)
1790}
1791deleteWg.Wait()
1792
1793svcList = controller.Services()
1794if len(svcList) != 0 {
1795t.Fatalf("Should have 0 services at this point")
1796}
1797for _, exp := range expectedSvcList {
1798endpoints := GetEndpoints(exp, controller.Endpoints)
1799assert.Equal(t, len(endpoints), 0)
1800}
1801}
1802
1803func createEndpoints(t *testing.T, controller *FakeController, name, namespace string,
1804portNames, ips []string, refs []*corev1.ObjectReference, labels map[string]string,
1805) {
1806if labels == nil {
1807labels = make(map[string]string)
1808}
1809// Add the reference to the service. Used by EndpointSlice logic only.
1810labels[discovery.LabelServiceName] = name
1811
1812if refs == nil {
1813refs = make([]*corev1.ObjectReference, len(ips))
1814}
1815var portNum int32 = 1001
1816eas := make([]corev1.EndpointAddress, 0)
1817for i, ip := range ips {
1818eas = append(eas, corev1.EndpointAddress{IP: ip, TargetRef: refs[i]})
1819}
1820
1821eps := make([]corev1.EndpointPort, 0)
1822for _, name := range portNames {
1823eps = append(eps, corev1.EndpointPort{Name: name, Port: portNum})
1824}
1825
1826endpoint := &corev1.Endpoints{
1827ObjectMeta: metav1.ObjectMeta{
1828Name: name,
1829Namespace: namespace,
1830Labels: labels,
1831},
1832Subsets: []corev1.EndpointSubset{{
1833Addresses: eas,
1834Ports: eps,
1835}},
1836}
1837clienttest.NewWriter[*corev1.Endpoints](t, controller.client).CreateOrUpdate(endpoint)
1838
1839// Create endpoint slice as well
1840esps := make([]discovery.EndpointPort, 0)
1841for _, name := range portNames {
1842n := name // Create a stable reference to take the pointer from
1843esps = append(esps, discovery.EndpointPort{Name: &n, Port: &portNum})
1844}
1845
1846sliceEndpoint := make([]discovery.Endpoint, 0, len(ips))
1847for i, ip := range ips {
1848sliceEndpoint = append(sliceEndpoint, discovery.Endpoint{
1849Addresses: []string{ip},
1850TargetRef: refs[i],
1851})
1852}
1853endpointSlice := &discovery.EndpointSlice{
1854ObjectMeta: metav1.ObjectMeta{
1855Name: name,
1856Namespace: namespace,
1857Labels: labels,
1858},
1859Endpoints: sliceEndpoint,
1860Ports: esps,
1861}
1862clienttest.NewWriter[*discovery.EndpointSlice](t, controller.client).CreateOrUpdate(endpointSlice)
1863}
1864
1865func updateEndpoints(controller *FakeController, name, namespace string, portNames, ips []string, t *testing.T) {
1866var portNum int32 = 1001
1867eas := make([]corev1.EndpointAddress, 0)
1868for _, ip := range ips {
1869eas = append(eas, corev1.EndpointAddress{IP: ip})
1870}
1871
1872eps := make([]corev1.EndpointPort, 0)
1873for _, name := range portNames {
1874eps = append(eps, corev1.EndpointPort{Name: name, Port: portNum})
1875}
1876
1877endpoint := &corev1.Endpoints{
1878ObjectMeta: metav1.ObjectMeta{
1879Name: name,
1880Namespace: namespace,
1881},
1882Subsets: []corev1.EndpointSubset{{
1883Addresses: eas,
1884Ports: eps,
1885}},
1886}
1887if _, err := controller.client.Kube().CoreV1().Endpoints(namespace).Update(context.TODO(), endpoint, metav1.UpdateOptions{}); err != nil {
1888t.Fatalf("failed to update endpoints %s in namespace %s (error %v)", name, namespace, err)
1889}
1890
1891// Update endpoint slice as well
1892esps := make([]discovery.EndpointPort, 0)
1893for i := range portNames {
1894esps = append(esps, discovery.EndpointPort{Name: &portNames[i], Port: &portNum})
1895}
1896endpointSlice := &discovery.EndpointSlice{
1897ObjectMeta: metav1.ObjectMeta{
1898Name: name,
1899Namespace: namespace,
1900Labels: map[string]string{
1901discovery.LabelServiceName: name,
1902},
1903},
1904Endpoints: []discovery.Endpoint{
1905{
1906Addresses: ips,
1907},
1908},
1909Ports: esps,
1910}
1911if _, err := controller.client.Kube().DiscoveryV1().EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}); err != nil {
1912t.Errorf("failed to create endpoint slice %s in namespace %s (error %v)", name, namespace, err)
1913}
1914}
1915
1916func createServiceWithTargetPorts(controller *FakeController, name, namespace string, annotations map[string]string,
1917svcPorts []corev1.ServicePort, selector map[string]string, t *testing.T,
1918) {
1919service := &corev1.Service{
1920ObjectMeta: metav1.ObjectMeta{
1921Name: name,
1922Namespace: namespace,
1923Annotations: annotations,
1924},
1925Spec: corev1.ServiceSpec{
1926ClusterIP: "10.0.0.1", // FIXME: generate?
1927Ports: svcPorts,
1928Selector: selector,
1929Type: corev1.ServiceTypeClusterIP,
1930},
1931}
1932
1933clienttest.Wrap(t, controller.services).Create(service)
1934controller.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
1935}
1936
1937func createServiceWait(controller *FakeController, name, namespace string, labels, annotations map[string]string,
1938ports []int32, selector map[string]string, t *testing.T,
1939) {
1940t.Helper()
1941createService(controller, name, namespace, labels, annotations, ports, selector, t)
1942controller.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "service")
1943}
1944
1945func createService(controller *FakeController, name, namespace string, labels, annotations map[string]string,
1946ports []int32, selector map[string]string, t *testing.T,
1947) {
1948service := generateService(name, namespace, labels, annotations, ports, selector, "10.0.0.1")
1949clienttest.Wrap(t, controller.services).CreateOrUpdate(service)
1950}
1951
1952func generateService(name, namespace string, labels, annotations map[string]string,
1953ports []int32, selector map[string]string, ip string,
1954) *corev1.Service {
1955svcPorts := make([]corev1.ServicePort, 0)
1956for _, p := range ports {
1957svcPorts = append(svcPorts, corev1.ServicePort{
1958Name: "tcp-port",
1959Port: p,
1960Protocol: "http",
1961})
1962}
1963
1964return &corev1.Service{
1965ObjectMeta: metav1.ObjectMeta{
1966Name: name,
1967Namespace: namespace,
1968Annotations: annotations,
1969Labels: labels,
1970},
1971Spec: corev1.ServiceSpec{
1972ClusterIP: ip,
1973Ports: svcPorts,
1974Selector: selector,
1975Type: corev1.ServiceTypeClusterIP,
1976},
1977}
1978}
1979
1980func createVirtualService(controller *FakeController, name, namespace string,
1981annotations map[string]string,
1982t *testing.T,
1983) {
1984vs := &v1alpha3.VirtualService{
1985ObjectMeta: metav1.ObjectMeta{
1986Name: name,
1987Namespace: namespace,
1988Annotations: annotations,
1989},
1990}
1991
1992clienttest.NewWriter[*v1alpha3.VirtualService](t, controller.client).Create(vs)
1993}
1994
1995func getService(controller *FakeController, name, namespace string, t *testing.T) *corev1.Service {
1996svc, err := controller.client.Kube().CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
1997if err != nil {
1998t.Fatalf("Cannot get service %s in namespace %s (error: %v)", name, namespace, err)
1999}
2000return svc
2001}
2002
2003func updateService(controller *FakeController, svc *corev1.Service, t *testing.T) *corev1.Service {
2004svcUpdated, err := controller.client.Kube().CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
2005if err != nil {
2006t.Fatalf("Cannot update service %s in namespace %s (error: %v)", svc.Name, svc.Namespace, err)
2007}
2008return svcUpdated
2009}
2010
2011func createServiceWithoutClusterIP(controller *FakeController, name, namespace string, annotations map[string]string,
2012ports []int32, selector map[string]string, t *testing.T,
2013) {
2014svcPorts := make([]corev1.ServicePort, 0)
2015for _, p := range ports {
2016svcPorts = append(svcPorts, corev1.ServicePort{
2017Name: "tcp-port",
2018Port: p,
2019Protocol: "http",
2020})
2021}
2022service := &corev1.Service{
2023ObjectMeta: metav1.ObjectMeta{
2024Name: name,
2025Namespace: namespace,
2026Annotations: annotations,
2027},
2028Spec: corev1.ServiceSpec{
2029ClusterIP: corev1.ClusterIPNone,
2030Ports: svcPorts,
2031Selector: selector,
2032Type: corev1.ServiceTypeClusterIP,
2033},
2034}
2035
2036clienttest.Wrap(t, controller.services).Create(service)
2037}
2038
2039// nolint: unparam
2040func createExternalNameService(controller *FakeController, name, namespace string,
2041ports []int32, externalName string, t *testing.T, xdsEvents *xdsfake.Updater,
2042) *corev1.Service {
2043svcPorts := make([]corev1.ServicePort, 0)
2044for _, p := range ports {
2045svcPorts = append(svcPorts, corev1.ServicePort{
2046Name: fmt.Sprintf("tcp-port-%d", p),
2047Port: p,
2048Protocol: "http",
2049})
2050}
2051service := &corev1.Service{
2052ObjectMeta: metav1.ObjectMeta{
2053Name: name,
2054Namespace: namespace,
2055},
2056Spec: corev1.ServiceSpec{
2057Ports: svcPorts,
2058Type: corev1.ServiceTypeExternalName,
2059ExternalName: externalName,
2060},
2061}
2062
2063clienttest.Wrap(t, controller.services).Create(service)
2064if features.EnableExternalNameAlias {
2065xdsEvents.MatchOrFail(t, xdsfake.Event{Type: "service"})
2066} else {
2067xdsEvents.MatchOrFail(t, xdsfake.Event{Type: "service"}, xdsfake.Event{Type: "eds cache"})
2068}
2069return service
2070}
2071
2072func deleteExternalNameService(controller *FakeController, name, namespace string, t *testing.T, xdsEvents *xdsfake.Updater) {
2073clienttest.Wrap(t, controller.services).Delete(name, namespace)
2074xdsEvents.WaitOrFail(t, "service")
2075}
2076
2077func servicesEqual(svcList, expectedSvcList []*model.Service) bool {
2078if len(svcList) != len(expectedSvcList) {
2079return false
2080}
2081for i, exp := range expectedSvcList {
2082if exp.Hostname != svcList[i].Hostname {
2083return false
2084}
2085if exp.DefaultAddress != svcList[i].DefaultAddress {
2086return false
2087}
2088if !reflect.DeepEqual(exp.Ports, svcList[i].Ports) {
2089return false
2090}
2091}
2092return true
2093}
2094
2095func addPods(t *testing.T, controller *FakeController, fx *xdsfake.Updater, pods ...*corev1.Pod) {
2096pc := clienttest.Wrap(t, controller.podsClient)
2097for _, pod := range pods {
2098newPod := pc.CreateOrUpdate(pod)
2099setPodReady(newPod)
2100// Apiserver doesn't allow Create/Update to modify the pod status. Creating doesn't result in
2101// events - since PodIP will be "".
2102newPod.Status.PodIP = pod.Status.PodIP
2103newPod.Status.Phase = corev1.PodRunning
2104pc.UpdateStatus(newPod)
2105waitForPod(t, controller, pod.Status.PodIP)
2106// pod first time occur will trigger proxy push
2107fx.WaitOrFail(t, "proxy")
2108}
2109}
2110
2111func setPodReady(pod *corev1.Pod) {
2112pod.Status.Conditions = []corev1.PodCondition{
2113{
2114Type: corev1.PodReady,
2115Status: corev1.ConditionTrue,
2116LastTransitionTime: metav1.Now(),
2117},
2118}
2119}
2120
2121func generatePod(ip, name, namespace, saName, node string, labels map[string]string, annotations map[string]string) *corev1.Pod {
2122automount := false
2123return &corev1.Pod{
2124ObjectMeta: metav1.ObjectMeta{
2125Name: name,
2126Labels: labels,
2127Annotations: annotations,
2128Namespace: namespace,
2129},
2130Spec: corev1.PodSpec{
2131ServiceAccountName: saName,
2132NodeName: node,
2133AutomountServiceAccountToken: &automount,
2134// Validation requires this
2135Containers: []corev1.Container{
2136{
2137Name: "test",
2138Image: "ununtu",
2139},
2140},
2141},
2142// The cache controller uses this as key, required by our impl.
2143Status: corev1.PodStatus{
2144Conditions: []corev1.PodCondition{
2145{
2146Type: corev1.PodReady,
2147Status: corev1.ConditionTrue,
2148LastTransitionTime: metav1.Now(),
2149},
2150},
2151PodIP: ip,
2152HostIP: ip,
2153PodIPs: []corev1.PodIP{
2154{
2155IP: ip,
2156},
2157},
2158Phase: corev1.PodRunning,
2159},
2160}
2161}
2162
2163func generateNode(name string, labels map[string]string) *corev1.Node {
2164return &corev1.Node{
2165TypeMeta: metav1.TypeMeta{
2166Kind: "Node",
2167APIVersion: "v1",
2168},
2169ObjectMeta: metav1.ObjectMeta{
2170Name: name,
2171Labels: labels,
2172},
2173}
2174}
2175
2176func addNodes(t *testing.T, controller *FakeController, nodes ...*corev1.Node) {
2177for _, node := range nodes {
2178clienttest.Wrap(t, controller.nodes).CreateOrUpdate(node)
2179waitForNode(t, controller, node.Name)
2180}
2181}
2182
2183func TestEndpointUpdate(t *testing.T) {
2184controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2185
2186pod1 := generatePod("128.0.0.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2187pods := []*corev1.Pod{pod1}
2188addPods(t, controller, fx, pods...)
2189
2190// 1. incremental eds for normal service endpoint update
2191createServiceWait(controller, "svc1", "nsa", nil, nil,
2192[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2193
2194// Endpoints are generated by Kubernetes from pod labels and service selectors.
2195// Here we manually create them for mocking purpose.
2196svc1Ips := []string{"128.0.0.1"}
2197portNames := []string{"tcp-port"}
2198// Create 1 endpoint that refers to a pod in the same namespace.
2199createEndpoints(t, controller, "svc1", "nsa", portNames, svc1Ips, nil, nil)
2200fx.WaitOrFail(t, "eds")
2201
2202// delete normal service
2203clienttest.Wrap(t, controller.services).Delete("svc1", "nsa")
2204fx.WaitOrFail(t, "service")
2205
2206// 2. full xds push request for headless service endpoint update
2207
2208// create a headless service
2209createServiceWithoutClusterIP(controller, "svc1", "nsa", nil,
2210[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2211fx.WaitOrFail(t, "service")
2212
2213// Create 1 endpoint that refers to a pod in the same namespace.
2214svc1Ips = append(svc1Ips, "128.0.0.2")
2215updateEndpoints(controller, "svc1", "nsa", portNames, svc1Ips, t)
2216host := string(kube.ServiceHostname("svc1", "nsa", controller.opts.DomainSuffix))
2217fx.MatchOrFail(t, xdsfake.Event{Type: "xds full", ID: host})
2218}
2219
2220// Validates that when Pilot sees Endpoint before the corresponding Pod, it triggers endpoint event on pod event.
2221func TestEndpointUpdateBeforePodUpdate(t *testing.T) {
2222controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2223
2224addNodes(t, controller, generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}))
2225// Setup help functions to make the test more explicit
2226addPod := func(name, ip string) {
2227pod := generatePod(ip, name, "nsA", name, "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2228addPods(t, controller, fx, pod)
2229}
2230deletePod := func(name, ip string) {
2231if err := controller.client.Kube().CoreV1().Pods("nsA").Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil {
2232t.Fatal(err)
2233}
2234retry.UntilSuccessOrFail(t, func() error {
2235controller.pods.RLock()
2236defer controller.pods.RUnlock()
2237if _, ok := controller.pods.podsByIP[ip]; ok {
2238return fmt.Errorf("pod still present")
2239}
2240return nil
2241}, retry.Timeout(time.Second))
2242}
2243addService := func(name string) {
2244// create service
2245createServiceWait(controller, name, "nsA", nil, nil,
2246[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2247}
2248addEndpoint := func(svcName string, ips []string, pods []string) {
2249var refs []*corev1.ObjectReference
2250for _, pod := range pods {
2251if pod == "" {
2252refs = append(refs, nil)
2253} else {
2254refs = append(refs, &corev1.ObjectReference{
2255Kind: "Pod",
2256Namespace: "nsA",
2257Name: pod,
2258})
2259}
2260}
2261createEndpoints(t, controller, svcName, "nsA", []string{"tcp-port"}, ips, refs, nil)
2262}
2263assertEndpointsEvent := func(ips []string, pods []string) {
2264t.Helper()
2265ev := fx.WaitOrFail(t, "eds")
2266var gotIps []string
2267for _, e := range ev.Endpoints {
2268gotIps = append(gotIps, e.Address)
2269}
2270var gotSA []string
2271var expectedSa []string
2272for _, e := range pods {
2273if e == "" {
2274expectedSa = append(expectedSa, "")
2275} else {
2276expectedSa = append(expectedSa, "spiffe://cluster.local/ns/nsA/sa/"+e)
2277}
2278}
2279
2280for _, e := range ev.Endpoints {
2281gotSA = append(gotSA, e.ServiceAccount)
2282}
2283if !reflect.DeepEqual(gotIps, ips) {
2284t.Fatalf("expected ips %v, got %v", ips, gotIps)
2285}
2286if !reflect.DeepEqual(gotSA, expectedSa) {
2287t.Fatalf("expected SAs %v, got %v", expectedSa, gotSA)
2288}
2289}
2290assertPendingResync := func(expected int) {
2291t.Helper()
2292retry.UntilSuccessOrFail(t, func() error {
2293controller.pods.RLock()
2294defer controller.pods.RUnlock()
2295if len(controller.pods.needResync) != expected {
2296return fmt.Errorf("expected %d pods needing resync, got %d", expected, len(controller.pods.needResync))
2297}
2298return nil
2299}, retry.Timeout(time.Second))
2300}
2301
2302// standard ordering
2303addService("svc")
2304addPod("pod1", "172.0.1.1")
2305addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
2306assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2307fx.Clear()
2308
2309// Create the endpoint, then later add the pod. Should eventually get an update for the endpoint
2310addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2311assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2312fx.Clear()
2313addPod("pod2", "172.0.1.2")
2314assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2315fx.Clear()
2316
2317// Create the endpoint without a pod reference. We should see it immediately
2318addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
2319assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", ""})
2320fx.Clear()
2321
2322// Delete a pod before the endpoint
2323addEndpoint("svc", []string{"172.0.1.1"}, []string{"pod1"})
2324deletePod("pod2", "172.0.1.2")
2325assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2326fx.Clear()
2327
2328// add another service
2329addService("other")
2330// Add endpoints for the new service, and the old one. Both should be missing the last IP
2331addEndpoint("other", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2332addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2333assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2334assertEndpointsEvent([]string{"172.0.1.1"}, []string{"pod1"})
2335fx.Clear()
2336// Add the pod, expect the endpoints update for both
2337addPod("pod2", "172.0.1.2")
2338assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2339assertEndpointsEvent([]string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2340
2341// Check for memory leaks
2342assertPendingResync(0)
2343addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
2344// This is really an implementation detail here - but checking to sanity check our test
2345assertPendingResync(1)
2346// Remove the endpoint again, with no pod events in between. Should have no memory leaks
2347addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2"}, []string{"pod1", "pod2"})
2348// TODO this case would leak
2349// assertPendingResync(0)
2350
2351// completely remove the endpoint
2352addEndpoint("svc", []string{"172.0.1.1", "172.0.1.2", "172.0.1.3"}, []string{"pod1", "pod2", "pod3"})
2353assertPendingResync(1)
2354if err := controller.client.Kube().CoreV1().Endpoints("nsA").Delete(context.TODO(), "svc", metav1.DeleteOptions{}); err != nil {
2355t.Fatal(err)
2356}
2357if err := controller.client.Kube().DiscoveryV1().EndpointSlices("nsA").Delete(context.TODO(), "svc", metav1.DeleteOptions{}); err != nil {
2358t.Fatal(err)
2359}
2360assertPendingResync(0)
2361}
2362
2363func TestWorkloadInstanceHandlerMultipleEndpoints(t *testing.T) {
2364controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2365
2366// Create an initial pod with a service, and endpoint.
2367pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2368pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2369pods := []*corev1.Pod{pod1, pod2}
2370nodes := []*corev1.Node{
2371generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
2372}
2373addNodes(t, controller, nodes...)
2374addPods(t, controller, fx, pods...)
2375createServiceWait(controller, "svc1", "nsA", nil, nil,
2376[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2377pod1Ips := []string{"172.0.1.1"}
2378portNames := []string{"tcp-port"}
2379createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
2380fx.WaitOrFail(t, "eds")
2381
2382// Simulate adding a workload entry (fired through invocation of WorkloadInstanceHandler)
2383controller.workloadInstanceHandler(&model.WorkloadInstance{
2384Namespace: "nsA",
2385Endpoint: &model.IstioEndpoint{
2386Labels: labels.Instance{"app": "prod-app"},
2387ServiceAccount: "account",
2388Address: "2.2.2.2",
2389EndpointPort: 8080,
2390},
2391}, model.EventAdd)
2392
2393expectedEndpointIPs := []string{"172.0.1.1", "2.2.2.2"}
2394// Check if an EDS event is fired
2395ev := fx.WaitOrFail(t, "eds")
2396// check if the hostname matches that of k8s service svc1.nsA
2397if ev.ID != "svc1.nsA.svc.company.com" {
2398t.Fatalf("eds event for workload entry addition did not match the expected service. got %s, want %s",
2399ev.ID, "svc1.nsA.svc.company.com")
2400}
2401// we should have the pod IP and the workload Entry's IP in the endpoints..
2402// the first endpoint should be that of the k8s pod and the second one should be the workload entry
2403
2404gotEndpointIPs := make([]string, 0, len(ev.Endpoints))
2405for _, ep := range ev.Endpoints {
2406gotEndpointIPs = append(gotEndpointIPs, ep.Address)
2407}
2408if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2409t.Fatalf("eds update after adding workload entry did not match expected list. got %v, want %v",
2410gotEndpointIPs, expectedEndpointIPs)
2411}
2412
2413// Check if InstancesByPort returns the same list
2414converted := controller.Services()
2415if len(converted) != 1 {
2416t.Fatalf("failed to get services (%v), converted", converted)
2417}
2418endpoints := GetEndpoints(converted[0], controller.Endpoints)
2419gotEndpointIPs = []string{}
2420for _, instance := range endpoints {
2421gotEndpointIPs = append(gotEndpointIPs, instance.Address)
2422}
2423if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2424t.Fatalf("InstancesByPort after adding workload entry did not match expected list. got %v, want %v",
2425gotEndpointIPs, expectedEndpointIPs)
2426}
2427
2428// Now add a k8s pod to the service and ensure that eds updates contain both pod IPs and workload entry IPs.
2429updateEndpoints(controller, "svc1", "nsA", portNames, []string{"172.0.1.1", "172.0.1.2"}, t)
2430ev = fx.WaitOrFail(t, "eds")
2431gotEndpointIPs = []string{}
2432for _, ep := range ev.Endpoints {
2433gotEndpointIPs = append(gotEndpointIPs, ep.Address)
2434}
2435expectedEndpointIPs = []string{"172.0.1.1", "172.0.1.2", "2.2.2.2"}
2436if !reflect.DeepEqual(gotEndpointIPs, expectedEndpointIPs) {
2437t.Fatalf("eds update after adding pod did not match expected list. got %v, want %v",
2438gotEndpointIPs, expectedEndpointIPs)
2439}
2440}
2441
2442func TestWorkloadInstanceHandler_WorkloadInstanceIndex(t *testing.T) {
2443ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2444
2445verifyGetByIP := func(address string, want []*model.WorkloadInstance) {
2446t.Helper()
2447got := ctl.workloadInstancesIndex.GetByIP(address)
2448
2449assert.Equal(t, want, got)
2450}
2451
2452wi1 := &model.WorkloadInstance{
2453Name: "ratings-1",
2454Namespace: "bookinfo",
2455Endpoint: &model.IstioEndpoint{
2456Labels: labels.Instance{"app": "ratings"},
2457Address: "2.2.2.2",
2458EndpointPort: 8080,
2459},
2460}
2461
2462// simulate adding a workload entry
2463ctl.workloadInstanceHandler(wi1, model.EventAdd)
2464
2465verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2466
2467wi2 := &model.WorkloadInstance{
2468Name: "details-1",
2469Namespace: "bookinfo",
2470Endpoint: &model.IstioEndpoint{
2471Labels: labels.Instance{"app": "details"},
2472Address: "3.3.3.3",
2473EndpointPort: 9090,
2474},
2475}
2476
2477// simulate adding a workload entry
2478ctl.workloadInstanceHandler(wi2, model.EventAdd)
2479
2480verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2481verifyGetByIP("3.3.3.3", []*model.WorkloadInstance{wi2})
2482
2483wi3 := &model.WorkloadInstance{
2484Name: "details-1",
2485Namespace: "bookinfo",
2486Endpoint: &model.IstioEndpoint{
2487Labels: labels.Instance{"app": "details"},
2488Address: "2.2.2.2", // update IP
2489EndpointPort: 9090,
2490},
2491}
2492
2493// simulate updating a workload entry
2494ctl.workloadInstanceHandler(wi3, model.EventUpdate)
2495
2496verifyGetByIP("3.3.3.3", nil)
2497verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi3, wi1})
2498
2499// simulate deleting a workload entry
2500ctl.workloadInstanceHandler(wi3, model.EventDelete)
2501
2502verifyGetByIP("2.2.2.2", []*model.WorkloadInstance{wi1})
2503
2504// simulate deleting a workload entry
2505ctl.workloadInstanceHandler(wi1, model.EventDelete)
2506
2507verifyGetByIP("2.2.2.2", nil)
2508}
2509
2510func TestUpdateEdsCacheOnServiceUpdate(t *testing.T) {
2511controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2512
2513// Create an initial pod with a service, and endpoint.
2514pod1 := generatePod("172.0.1.1", "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2515pod2 := generatePod("172.0.1.2", "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{})
2516pods := []*corev1.Pod{pod1, pod2}
2517nodes := []*corev1.Node{
2518generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}),
2519}
2520addNodes(t, controller, nodes...)
2521addPods(t, controller, fx, pods...)
2522createServiceWait(controller, "svc1", "nsA", nil, nil,
2523[]int32{8080}, map[string]string{"app": "prod-app"}, t)
2524
2525pod1Ips := []string{"172.0.1.1"}
2526portNames := []string{"tcp-port"}
2527createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil)
2528fx.WaitOrFail(t, "eds")
2529
2530// update service selector
2531svc := getService(controller, "svc1", "nsA", t)
2532svc.Spec.Selector = map[string]string{
2533"app": "prod-app",
2534"foo": "bar",
2535}
2536// set `K8SServiceSelectWorkloadEntries` to false temporarily
2537tmp := features.EnableK8SServiceSelectWorkloadEntries
2538features.EnableK8SServiceSelectWorkloadEntries = false
2539defer func() {
2540features.EnableK8SServiceSelectWorkloadEntries = tmp
2541}()
2542svc = updateService(controller, svc, t)
2543// don't update eds cache if `K8S_SELECT_WORKLOAD_ENTRIES` is disabled
2544fx.WaitOrFail(t, "service")
2545fx.AssertEmpty(t, 0)
2546
2547features.EnableK8SServiceSelectWorkloadEntries = true
2548svc.Spec.Selector = map[string]string{
2549"app": "prod-app",
2550}
2551updateService(controller, svc, t)
2552// update eds cache if `K8S_SELECT_WORKLOAD_ENTRIES` is enabled
2553fx.WaitOrFail(t, "eds cache")
2554}
2555
2556func TestDiscoverySelector(t *testing.T) {
2557networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{
2558Networks: map[string]*meshconfig.Network{
2559"network1": {
2560Endpoints: []*meshconfig.Network_NetworkEndpoints{
2561{
2562Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
2563FromCidr: "10.10.1.1/24",
2564},
2565},
2566},
2567},
2568"network2": {
2569Endpoints: []*meshconfig.Network_NetworkEndpoints{
2570{
2571Ne: &meshconfig.Network_NetworkEndpoints_FromCidr{
2572FromCidr: "10.11.1.1/24",
2573},
2574},
2575},
2576},
2577},
2578})
2579ctl, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{NetworksWatcher: networksWatcher})
2580t.Parallel()
2581ns := "ns-test"
2582
2583hostname := kube.ServiceHostname(testService, ns, defaultFakeDomainSuffix)
2584
2585var sds model.ServiceDiscovery = ctl
2586// "test", ports: http-example on 80
2587makeService(testService, ns, ctl, t)
2588
2589eventually(t, func() bool {
2590out := sds.Services()
2591
2592// Original test was checking for 'protocolTCP' - which is incorrect (the
2593// port name is 'http'. It was working because the Service was created with
2594// an invalid protocol, and the code was ignoring that ( not TCP/UDP).
2595for _, item := range out {
2596if item.Hostname == hostname &&
2597len(item.Ports) == 1 &&
2598item.Ports[0].Protocol == protocol.HTTP {
2599return true
2600}
2601}
2602return false
2603})
2604
2605svc := sds.GetService(hostname)
2606if svc == nil {
2607t.Fatalf("GetService(%q) => should exists", hostname)
2608}
2609if svc.Hostname != hostname {
2610t.Fatalf("GetService(%q) => %q", hostname, svc.Hostname)
2611}
2612
2613missing := kube.ServiceHostname("does-not-exist", ns, defaultFakeDomainSuffix)
2614svc = sds.GetService(missing)
2615if svc != nil {
2616t.Fatalf("GetService(%q) => %s, should not exist", missing, svc.Hostname)
2617}
2618}
2619
2620func TestStripNodeUnusedFields(t *testing.T) {
2621inputNode := &corev1.Node{
2622TypeMeta: metav1.TypeMeta{
2623Kind: "Node",
2624APIVersion: "v1",
2625},
2626ObjectMeta: metav1.ObjectMeta{
2627Name: "test",
2628Labels: map[string]string{
2629NodeZoneLabel: "zone1",
2630NodeRegionLabel: "region1",
2631label.TopologySubzone.Name: "subzone1",
2632},
2633Annotations: map[string]string{
2634"annotation1": "foo",
2635"annotation2": "bar",
2636},
2637ManagedFields: []metav1.ManagedFieldsEntry{
2638{
2639Manager: "test",
2640},
2641},
2642OwnerReferences: []metav1.OwnerReference{
2643{
2644Name: "test",
2645},
2646},
2647},
2648Status: corev1.NodeStatus{
2649Allocatable: map[corev1.ResourceName]resource.Quantity{
2650"cpu": {
2651Format: "500m",
2652},
2653},
2654Capacity: map[corev1.ResourceName]resource.Quantity{
2655"cpu": {
2656Format: "500m",
2657},
2658},
2659Images: []corev1.ContainerImage{
2660{
2661Names: []string{"test"},
2662},
2663},
2664Conditions: []corev1.NodeCondition{
2665{
2666Type: corev1.NodeMemoryPressure,
2667},
2668},
2669},
2670}
2671
2672expectNode := &corev1.Node{
2673TypeMeta: metav1.TypeMeta{
2674Kind: "Node",
2675APIVersion: "v1",
2676},
2677ObjectMeta: metav1.ObjectMeta{
2678Name: "test",
2679Labels: map[string]string{
2680NodeZoneLabel: "zone1",
2681NodeRegionLabel: "region1",
2682label.TopologySubzone.Name: "subzone1",
2683},
2684},
2685}
2686
2687controller, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2688addNodes(t, controller, inputNode)
2689
2690assert.Equal(t, expectNode, controller.nodes.Get(inputNode.Name, ""))
2691}
2692
2693func TestStripPodUnusedFields(t *testing.T) {
2694inputPod := &corev1.Pod{
2695TypeMeta: metav1.TypeMeta{
2696Kind: "Pod",
2697APIVersion: "v1",
2698},
2699ObjectMeta: metav1.ObjectMeta{
2700Name: "test",
2701Namespace: "default",
2702Labels: map[string]string{
2703"app": "test",
2704},
2705Annotations: map[string]string{
2706"annotation1": "foo",
2707"annotation2": "bar",
2708},
2709ManagedFields: []metav1.ManagedFieldsEntry{
2710{
2711Manager: "test",
2712},
2713},
2714},
2715Spec: corev1.PodSpec{
2716InitContainers: []corev1.Container{
2717{
2718Name: "init-container",
2719},
2720},
2721Containers: []corev1.Container{
2722{
2723Name: "container-1",
2724Ports: []corev1.ContainerPort{
2725{
2726Name: "http",
2727},
2728},
2729},
2730{
2731Name: "container-2",
2732},
2733},
2734Volumes: []corev1.Volume{
2735{
2736Name: "test",
2737},
2738},
2739},
2740Status: corev1.PodStatus{
2741InitContainerStatuses: []corev1.ContainerStatus{
2742{
2743Name: "init-container",
2744},
2745},
2746ContainerStatuses: []corev1.ContainerStatus{
2747{
2748Name: "container-1",
2749},
2750{
2751Name: "container-2",
2752},
2753},
2754PodIP: "1.1.1.1",
2755HostIP: "1.1.1.1",
2756Phase: corev1.PodRunning,
2757},
2758}
2759
2760expectPod := &corev1.Pod{
2761TypeMeta: metav1.TypeMeta{
2762Kind: "Pod",
2763APIVersion: "v1",
2764},
2765ObjectMeta: metav1.ObjectMeta{
2766Name: "test",
2767Namespace: "default",
2768Labels: map[string]string{
2769"app": "test",
2770},
2771Annotations: map[string]string{
2772"annotation1": "foo",
2773"annotation2": "bar",
2774},
2775},
2776Spec: corev1.PodSpec{
2777Containers: []corev1.Container{
2778{
2779Ports: []corev1.ContainerPort{
2780{
2781Name: "http",
2782},
2783},
2784},
2785},
2786},
2787Status: corev1.PodStatus{
2788PodIP: "1.1.1.1",
2789HostIP: "1.1.1.1",
2790Phase: corev1.PodRunning,
2791},
2792}
2793
2794controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{})
2795addPods(t, controller, fx, inputPod)
2796
2797output := controller.pods.getPodByKey(config.NamespacedName(expectPod))
2798// The final pod status conditions will be determined by the function addPods.
2799// So we assign these status conditions to expect pod.
2800expectPod.Status.Conditions = output.Status.Conditions
2801if !reflect.DeepEqual(expectPod, output) {
2802t.Fatalf("Wanted: %v\n. Got: %v", expectPod, output)
2803}
2804}
2805
2806func TestServiceUpdateNeedsPush(t *testing.T) {
2807newService := func(exportTo visibility.Instance, ports []int) *model.Service {
2808s := &model.Service{
2809Attributes: model.ServiceAttributes{
2810ExportTo: sets.New(exportTo),
2811},
2812}
2813for _, port := range ports {
2814s.Ports = append(s.Ports, &model.Port{
2815Port: port,
2816})
2817}
2818return s
2819}
2820
2821type testcase struct {
2822name string
2823prev *corev1.Service
2824curr *corev1.Service
2825prevConv *model.Service
2826currConv *model.Service
2827expect bool
2828}
2829
2830tests := []testcase{
2831{
2832name: "no change",
2833prevConv: newService(visibility.Public, []int{80}),
2834currConv: newService(visibility.Public, []int{80}),
2835expect: false,
2836},
2837{
2838name: "new service",
2839prevConv: nil,
2840currConv: newService(visibility.Public, []int{80}),
2841expect: true,
2842},
2843{
2844name: "new service with none visibility",
2845prevConv: nil,
2846currConv: newService(visibility.None, []int{80}),
2847expect: false,
2848},
2849{
2850name: "public visibility, spec change",
2851prevConv: newService(visibility.Public, []int{80}),
2852currConv: newService(visibility.Public, []int{80, 443}),
2853expect: true,
2854},
2855{
2856name: "none visibility, spec change",
2857prevConv: newService(visibility.None, []int{80}),
2858currConv: newService(visibility.None, []int{80, 443}),
2859expect: false,
2860},
2861}
2862
2863svc := corev1.Service{
2864ObjectMeta: metav1.ObjectMeta{
2865Name: "foo",
2866Namespace: "bar",
2867},
2868Spec: corev1.ServiceSpec{
2869Ports: []corev1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8080)}},
2870},
2871}
2872updatedSvc := corev1.Service{
2873ObjectMeta: metav1.ObjectMeta{
2874Name: "foo",
2875Namespace: "bar",
2876},
2877Spec: corev1.ServiceSpec{
2878Ports: []corev1.ServicePort{{Port: 80, TargetPort: intstr.FromInt32(8081)}},
2879},
2880}
2881tests = append(tests,
2882testcase{
2883name: "target ports changed",
2884prev: &svc,
2885curr: &updatedSvc,
2886prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2887currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, ""),
2888expect: true,
2889},
2890testcase{
2891name: "target ports unchanged",
2892prev: &svc,
2893curr: &svc,
2894prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2895currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
2896expect: false,
2897})
2898
2899for _, test := range tests {
2900actual := serviceUpdateNeedsPush(test.prev, test.curr, test.prevConv, test.currConv)
2901if actual != test.expect {
2902t.Fatalf("%s: expected %v, got %v", test.name, test.expect, actual)
2903}
2904}
2905}
2906