istio
465 строк · 14.1 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aggregate
16
17import (
18"fmt"
19"reflect"
20"testing"
21"time"
22
23"go.uber.org/atomic"
24
25meshconfig "istio.io/api/mesh/v1alpha1"
26"istio.io/istio/pilot/pkg/model"
27"istio.io/istio/pilot/pkg/serviceregistry"
28"istio.io/istio/pilot/pkg/serviceregistry/memory"
29"istio.io/istio/pilot/pkg/serviceregistry/mock"
30"istio.io/istio/pilot/pkg/serviceregistry/provider"
31"istio.io/istio/pkg/cluster"
32"istio.io/istio/pkg/config/host"
33"istio.io/istio/pkg/test/util/retry"
34)
35
36type mockMeshConfigHolder struct {
37trustDomainAliases []string
38}
39
40func (mh mockMeshConfigHolder) Mesh() *meshconfig.MeshConfig {
41return &meshconfig.MeshConfig{
42TrustDomainAliases: mh.trustDomainAliases,
43}
44}
45
46func buildMockController() *Controller {
47discovery1 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV1.DeepCopy(),
48mock.HelloService.DeepCopy(),
49mock.ExtHTTPService.DeepCopy(),
50)
51for _, port := range mock.HelloService.Ports {
52discovery1.AddInstance(mock.MakeServiceInstance(mock.HelloService, port, 0, model.Locality{}))
53discovery1.AddInstance(mock.MakeServiceInstance(mock.HelloService, port, 1, model.Locality{}))
54}
55
56discovery2 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV2.DeepCopy(),
57mock.WorldService.DeepCopy(),
58mock.ExtHTTPSService.DeepCopy(),
59)
60for _, port := range mock.WorldService.Ports {
61discovery2.AddInstance(mock.MakeServiceInstance(mock.WorldService, port, 0, model.Locality{}))
62discovery2.AddInstance(mock.MakeServiceInstance(mock.WorldService, port, 1, model.Locality{}))
63}
64registry1 := serviceregistry.Simple{
65ProviderID: provider.ID("mockAdapter1"),
66DiscoveryController: discovery1,
67}
68
69registry2 := serviceregistry.Simple{
70ProviderID: provider.ID("mockAdapter2"),
71DiscoveryController: discovery2,
72}
73
74ctls := NewController(Options{&mockMeshConfigHolder{}})
75ctls.AddRegistry(registry1)
76ctls.AddRegistry(registry2)
77
78return ctls
79}
80
81// return aggregator and cluster1 and cluster2 service discovery
82func buildMockControllerForMultiCluster() (*Controller, *memory.ServiceDiscovery, *memory.ServiceDiscovery) {
83discovery1 := memory.NewServiceDiscovery(mock.HelloService)
84
85discovery2 := memory.NewServiceDiscovery(mock.MakeService(mock.ServiceArgs{
86Hostname: mock.HelloService.Hostname,
87Address: "10.1.2.0",
88ServiceAccounts: []string{},
89ClusterID: "cluster-2",
90}), mock.WorldService)
91
92registry1 := serviceregistry.Simple{
93ProviderID: provider.Kubernetes,
94ClusterID: "cluster-1",
95DiscoveryController: discovery1,
96}
97
98registry2 := serviceregistry.Simple{
99ProviderID: provider.Kubernetes,
100ClusterID: "cluster-2",
101DiscoveryController: discovery2,
102}
103
104ctls := NewController(Options{})
105ctls.AddRegistry(registry1)
106ctls.AddRegistry(registry2)
107
108return ctls, discovery1, discovery2
109}
110
111func TestServicesForMultiCluster(t *testing.T) {
112originalHelloService := mock.HelloService.DeepCopy()
113aggregateCtl, _, registry2 := buildMockControllerForMultiCluster()
114// List Services from aggregate controller
115services := aggregateCtl.Services()
116
117// Set up ground truth hostname values
118hosts := map[host.Name]bool{
119mock.HelloService.Hostname: false,
120mock.WorldService.Hostname: false,
121}
122
123count := 0
124// Compare return value to ground truth
125for _, svc := range services {
126if counted, existed := hosts[svc.Hostname]; existed && !counted {
127count++
128hosts[svc.Hostname] = true
129}
130}
131
132if count != len(hosts) {
133t.Fatalf("Cluster local service map expected size %d, actual %v vs %v", count, hosts, services)
134}
135
136// Now verify ClusterVIPs for each service
137ClusterVIPs := map[host.Name]map[cluster.ID][]string{
138mock.HelloService.Hostname: {
139"cluster-1": []string{"10.1.0.0"},
140"cluster-2": []string{"10.1.2.0"},
141},
142mock.WorldService.Hostname: {
143"cluster-2": []string{"10.2.0.0"},
144},
145}
146for _, svc := range services {
147if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
148t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
149svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
150}
151}
152
153registry2.RemoveService(mock.HelloService.Hostname)
154// List Services from aggregate controller
155services = aggregateCtl.Services()
156// Now verify ClusterVIPs for each service
157ClusterVIPs = map[host.Name]map[cluster.ID][]string{
158mock.HelloService.Hostname: {
159"cluster-1": []string{"10.1.0.0"},
160},
161mock.WorldService.Hostname: {
162"cluster-2": []string{"10.2.0.0"},
163},
164}
165for _, svc := range services {
166if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
167t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
168svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
169}
170}
171
172// check HelloService is not mutated
173if !reflect.DeepEqual(originalHelloService, mock.HelloService) {
174t.Errorf("Original hello service is mutated")
175}
176}
177
178func TestServices(t *testing.T) {
179aggregateCtl := buildMockController()
180// List Services from aggregate controller
181services := aggregateCtl.Services()
182
183// Set up ground truth hostname values
184serviceMap := map[host.Name]bool{
185mock.HelloService.Hostname: false,
186mock.ExtHTTPService.Hostname: false,
187mock.WorldService.Hostname: false,
188mock.ExtHTTPSService.Hostname: false,
189}
190
191svcCount := 0
192// Compare return value to ground truth
193for _, svc := range services {
194if counted, existed := serviceMap[svc.Hostname]; existed && !counted {
195svcCount++
196serviceMap[svc.Hostname] = true
197}
198}
199
200if svcCount != len(serviceMap) {
201t.Fatal("Return services does not match ground truth")
202}
203}
204
205func TestGetService(t *testing.T) {
206aggregateCtl := buildMockController()
207
208// Get service from mockAdapter1
209svc := aggregateCtl.GetService(mock.HelloService.Hostname)
210if svc == nil {
211t.Fatal("Fail to get service")
212}
213if svc.Hostname != mock.HelloService.Hostname {
214t.Fatal("Returned service is incorrect")
215}
216
217// Get service from mockAdapter2
218svc = aggregateCtl.GetService(mock.WorldService.Hostname)
219if svc == nil {
220t.Fatal("Fail to get service")
221}
222if svc.Hostname != mock.WorldService.Hostname {
223t.Fatal("Returned service is incorrect")
224}
225}
226
227func TestGetProxyServiceTargets(t *testing.T) {
228aggregateCtl := buildMockController()
229
230// Get Instances from mockAdapter1
231instances := aggregateCtl.GetProxyServiceTargets(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
232if len(instances) != 6 {
233t.Fatalf("Returned GetProxyServiceTargets' amount %d is not correct", len(instances))
234}
235for _, inst := range instances {
236if inst.Service.Hostname != mock.HelloService.Hostname {
237t.Fatal("Returned Instance is incorrect")
238}
239}
240
241// Get Instances from mockAdapter2
242instances = aggregateCtl.GetProxyServiceTargets(&model.Proxy{IPAddresses: []string{mock.MakeIP(mock.WorldService, 1)}})
243if len(instances) != 6 {
244t.Fatalf("Returned GetProxyServiceTargets' amount %d is not correct", len(instances))
245}
246for _, inst := range instances {
247if inst.Service.Hostname != mock.WorldService.Hostname {
248t.Fatal("Returned Instance is incorrect")
249}
250}
251}
252
253func TestGetProxyWorkloadLabels(t *testing.T) {
254// If no registries return workload labels, we must return nil, rather than an empty list.
255// This ensures callers can distinguish between no labels, and labels not found.
256aggregateCtl := buildMockController()
257
258instances := aggregateCtl.GetProxyWorkloadLabels(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
259if instances != nil {
260t.Fatalf("expected nil workload labels, got: %v", instances)
261}
262}
263
264func TestAddRegistry(t *testing.T) {
265registries := []serviceregistry.Simple{
266{
267ProviderID: "registry1",
268ClusterID: "cluster1",
269DiscoveryController: memory.NewServiceDiscovery(),
270},
271{
272ProviderID: "registry2",
273ClusterID: "cluster2",
274DiscoveryController: memory.NewServiceDiscovery(),
275},
276}
277ctrl := NewController(Options{})
278
279registry1Counter := atomic.NewInt32(0)
280registry2Counter := atomic.NewInt32(0)
281
282for _, r := range registries {
283clusterID := r.Cluster()
284counter := registry1Counter
285if clusterID == "cluster2" {
286counter = registry2Counter
287}
288ctrl.AppendServiceHandlerForCluster(clusterID, func(_, curr *model.Service, event model.Event) {
289counter.Add(1)
290})
291ctrl.AddRegistry(r)
292}
293if l := len(ctrl.registries); l != 2 {
294t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
295}
296
297registries[0].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
298registries[1].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
299
300ctrl.DeleteRegistry(registries[1].Cluster(), registries[1].Provider())
301ctrl.UnRegisterHandlersForCluster(registries[1].Cluster())
302registries[0].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
303
304if registry1Counter.Load() != 3 {
305t.Errorf("cluster1 expected 3 event, but got %d", registry1Counter.Load())
306}
307if registry2Counter.Load() != 2 {
308t.Errorf("cluster2 expected 2 event, but got %d", registry2Counter.Load())
309}
310}
311
312func TestGetDeleteRegistry(t *testing.T) {
313registries := []serviceregistry.Simple{
314{
315ProviderID: "registry1",
316ClusterID: "cluster1",
317DiscoveryController: memory.NewServiceDiscovery(),
318},
319{
320ProviderID: "registry2",
321ClusterID: "cluster2",
322DiscoveryController: memory.NewServiceDiscovery(),
323},
324{
325ProviderID: "registry3",
326ClusterID: "cluster3",
327DiscoveryController: memory.NewServiceDiscovery(),
328},
329}
330wrapRegistry := func(r serviceregistry.Instance) serviceregistry.Instance {
331return ®istryEntry{Instance: r}
332}
333
334ctrl := NewController(Options{})
335for _, r := range registries {
336ctrl.AddRegistry(r)
337}
338
339// Test Get
340result := ctrl.GetRegistries()
341if l := len(result); l != 3 {
342t.Fatalf("Expected length of the registries slice should be 3, got %d", l)
343}
344
345// Test Delete cluster2
346ctrl.DeleteRegistry(registries[1].ClusterID, registries[1].ProviderID)
347result = ctrl.GetRegistries()
348if l := len(result); l != 2 {
349t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
350}
351// check left registries are orders as before
352if !reflect.DeepEqual(result[0], wrapRegistry(registries[0])) || !reflect.DeepEqual(result[1], wrapRegistry(registries[2])) {
353t.Fatalf("Expected registries order has been changed")
354}
355}
356
357func TestSkipSearchingRegistryForProxy(t *testing.T) {
358cluster1 := serviceregistry.Simple{
359ClusterID: "cluster-1",
360ProviderID: provider.Kubernetes,
361DiscoveryController: memory.NewServiceDiscovery(),
362}
363cluster2 := serviceregistry.Simple{
364ClusterID: "cluster-2",
365ProviderID: provider.Kubernetes,
366DiscoveryController: memory.NewServiceDiscovery(),
367}
368// external registries may eventually be associated with a cluster
369external := serviceregistry.Simple{
370ClusterID: "cluster-1",
371ProviderID: provider.External,
372DiscoveryController: memory.NewServiceDiscovery(),
373}
374
375cases := []struct {
376nodeClusterID cluster.ID
377registry serviceregistry.Instance
378want bool
379}{
380// matching kube registry
381{"cluster-1", cluster1, false},
382// unmatching kube registry
383{"cluster-1", cluster2, true},
384// always search external
385{"cluster-1", external, false},
386{"cluster-2", external, false},
387{"", external, false},
388// always search for empty node cluster id
389{"", cluster1, false},
390{"", cluster2, false},
391{"", external, false},
392}
393
394for i, c := range cases {
395got := skipSearchingRegistryForProxy(c.nodeClusterID, c.registry)
396if got != c.want {
397t.Errorf("%s: got %v want %v",
398fmt.Sprintf("[%v] registry=%v node=%v", i, c.registry, c.nodeClusterID),
399got, c.want)
400}
401}
402}
403
404func runnableRegistry(name string) *RunnableRegistry {
405return &RunnableRegistry{
406Instance: serviceregistry.Simple{
407ClusterID: cluster.ID(name), ProviderID: "test",
408DiscoveryController: memory.NewServiceDiscovery(),
409},
410running: atomic.NewBool(false),
411}
412}
413
414type RunnableRegistry struct {
415serviceregistry.Instance
416running *atomic.Bool
417}
418
419func (rr *RunnableRegistry) Run(stop <-chan struct{}) {
420if rr.running.Load() {
421panic("--- registry has been run twice ---")
422}
423rr.running.Store(true)
424<-stop
425}
426
427func expectRunningOrFail(t *testing.T, ctrl *Controller, want bool) {
428// running gets flipped in a goroutine, retry to avoid race
429retry.UntilSuccessOrFail(t, func() error {
430for _, registry := range ctrl.registries {
431if running := registry.Instance.(*RunnableRegistry).running.Load(); running != want {
432return fmt.Errorf("%s running is %v but wanted %v", registry.Cluster(), running, want)
433}
434}
435return nil
436}, retry.Timeout(50*time.Millisecond), retry.Delay(0))
437}
438
439func TestDeferredRun(t *testing.T) {
440stop := make(chan struct{})
441defer close(stop)
442ctrl := NewController(Options{})
443
444t.Run("AddRegistry before aggregate Run does not run", func(t *testing.T) {
445ctrl.AddRegistry(runnableRegistry("earlyAdd"))
446ctrl.AddRegistryAndRun(runnableRegistry("earlyAddAndRun"), nil)
447expectRunningOrFail(t, ctrl, false)
448})
449t.Run("aggregate Run starts all registries", func(t *testing.T) {
450go ctrl.Run(stop)
451expectRunningOrFail(t, ctrl, true)
452ctrl.DeleteRegistry("earlyAdd", "test")
453ctrl.DeleteRegistry("earlyAddAndRun", "test")
454})
455t.Run("AddRegistry after aggregate Run does not start registry", func(t *testing.T) {
456ctrl.AddRegistry(runnableRegistry("missed"))
457expectRunningOrFail(t, ctrl, false)
458ctrl.DeleteRegistry("missed", "test")
459expectRunningOrFail(t, ctrl, true)
460})
461t.Run("AddRegistryAndRun after aggregate Run starts registry", func(t *testing.T) {
462ctrl.AddRegistryAndRun(runnableRegistry("late"), nil)
463expectRunningOrFail(t, ctrl, true)
464})
465}
466