istio

Форк
0
/
controller_test.go 
465 строк · 14.1 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package aggregate
16

17
import (
18
	"fmt"
19
	"reflect"
20
	"testing"
21
	"time"
22

23
	"go.uber.org/atomic"
24

25
	meshconfig "istio.io/api/mesh/v1alpha1"
26
	"istio.io/istio/pilot/pkg/model"
27
	"istio.io/istio/pilot/pkg/serviceregistry"
28
	"istio.io/istio/pilot/pkg/serviceregistry/memory"
29
	"istio.io/istio/pilot/pkg/serviceregistry/mock"
30
	"istio.io/istio/pilot/pkg/serviceregistry/provider"
31
	"istio.io/istio/pkg/cluster"
32
	"istio.io/istio/pkg/config/host"
33
	"istio.io/istio/pkg/test/util/retry"
34
)
35

36
type mockMeshConfigHolder struct {
37
	trustDomainAliases []string
38
}
39

40
func (mh mockMeshConfigHolder) Mesh() *meshconfig.MeshConfig {
41
	return &meshconfig.MeshConfig{
42
		TrustDomainAliases: mh.trustDomainAliases,
43
	}
44
}
45

46
func buildMockController() *Controller {
47
	discovery1 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV1.DeepCopy(),
48
		mock.HelloService.DeepCopy(),
49
		mock.ExtHTTPService.DeepCopy(),
50
	)
51
	for _, port := range mock.HelloService.Ports {
52
		discovery1.AddInstance(mock.MakeServiceInstance(mock.HelloService, port, 0, model.Locality{}))
53
		discovery1.AddInstance(mock.MakeServiceInstance(mock.HelloService, port, 1, model.Locality{}))
54
	}
55

56
	discovery2 := memory.NewServiceDiscovery(mock.ReplicatedFooServiceV2.DeepCopy(),
57
		mock.WorldService.DeepCopy(),
58
		mock.ExtHTTPSService.DeepCopy(),
59
	)
60
	for _, port := range mock.WorldService.Ports {
61
		discovery2.AddInstance(mock.MakeServiceInstance(mock.WorldService, port, 0, model.Locality{}))
62
		discovery2.AddInstance(mock.MakeServiceInstance(mock.WorldService, port, 1, model.Locality{}))
63
	}
64
	registry1 := serviceregistry.Simple{
65
		ProviderID:          provider.ID("mockAdapter1"),
66
		DiscoveryController: discovery1,
67
	}
68

69
	registry2 := serviceregistry.Simple{
70
		ProviderID:          provider.ID("mockAdapter2"),
71
		DiscoveryController: discovery2,
72
	}
73

74
	ctls := NewController(Options{&mockMeshConfigHolder{}})
75
	ctls.AddRegistry(registry1)
76
	ctls.AddRegistry(registry2)
77

78
	return ctls
79
}
80

81
// return aggregator and cluster1 and cluster2 service discovery
82
func buildMockControllerForMultiCluster() (*Controller, *memory.ServiceDiscovery, *memory.ServiceDiscovery) {
83
	discovery1 := memory.NewServiceDiscovery(mock.HelloService)
84

85
	discovery2 := memory.NewServiceDiscovery(mock.MakeService(mock.ServiceArgs{
86
		Hostname:        mock.HelloService.Hostname,
87
		Address:         "10.1.2.0",
88
		ServiceAccounts: []string{},
89
		ClusterID:       "cluster-2",
90
	}), mock.WorldService)
91

92
	registry1 := serviceregistry.Simple{
93
		ProviderID:          provider.Kubernetes,
94
		ClusterID:           "cluster-1",
95
		DiscoveryController: discovery1,
96
	}
97

98
	registry2 := serviceregistry.Simple{
99
		ProviderID:          provider.Kubernetes,
100
		ClusterID:           "cluster-2",
101
		DiscoveryController: discovery2,
102
	}
103

104
	ctls := NewController(Options{})
105
	ctls.AddRegistry(registry1)
106
	ctls.AddRegistry(registry2)
107

108
	return ctls, discovery1, discovery2
109
}
110

111
func TestServicesForMultiCluster(t *testing.T) {
112
	originalHelloService := mock.HelloService.DeepCopy()
113
	aggregateCtl, _, registry2 := buildMockControllerForMultiCluster()
114
	// List Services from aggregate controller
115
	services := aggregateCtl.Services()
116

117
	// Set up ground truth hostname values
118
	hosts := map[host.Name]bool{
119
		mock.HelloService.Hostname: false,
120
		mock.WorldService.Hostname: false,
121
	}
122

123
	count := 0
124
	// Compare return value to ground truth
125
	for _, svc := range services {
126
		if counted, existed := hosts[svc.Hostname]; existed && !counted {
127
			count++
128
			hosts[svc.Hostname] = true
129
		}
130
	}
131

132
	if count != len(hosts) {
133
		t.Fatalf("Cluster local service map expected size %d, actual %v vs %v", count, hosts, services)
134
	}
135

136
	// Now verify ClusterVIPs for each service
137
	ClusterVIPs := map[host.Name]map[cluster.ID][]string{
138
		mock.HelloService.Hostname: {
139
			"cluster-1": []string{"10.1.0.0"},
140
			"cluster-2": []string{"10.1.2.0"},
141
		},
142
		mock.WorldService.Hostname: {
143
			"cluster-2": []string{"10.2.0.0"},
144
		},
145
	}
146
	for _, svc := range services {
147
		if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
148
			t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
149
				svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
150
		}
151
	}
152

153
	registry2.RemoveService(mock.HelloService.Hostname)
154
	// List Services from aggregate controller
155
	services = aggregateCtl.Services()
156
	// Now verify ClusterVIPs for each service
157
	ClusterVIPs = map[host.Name]map[cluster.ID][]string{
158
		mock.HelloService.Hostname: {
159
			"cluster-1": []string{"10.1.0.0"},
160
		},
161
		mock.WorldService.Hostname: {
162
			"cluster-2": []string{"10.2.0.0"},
163
		},
164
	}
165
	for _, svc := range services {
166
		if !reflect.DeepEqual(svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname]) {
167
			t.Fatalf("Service %s ClusterVIPs actual %v, expected %v", svc.Hostname,
168
				svc.ClusterVIPs.Addresses, ClusterVIPs[svc.Hostname])
169
		}
170
	}
171

172
	// check HelloService is not mutated
173
	if !reflect.DeepEqual(originalHelloService, mock.HelloService) {
174
		t.Errorf("Original hello service is mutated")
175
	}
176
}
177

178
func TestServices(t *testing.T) {
179
	aggregateCtl := buildMockController()
180
	// List Services from aggregate controller
181
	services := aggregateCtl.Services()
182

183
	// Set up ground truth hostname values
184
	serviceMap := map[host.Name]bool{
185
		mock.HelloService.Hostname:    false,
186
		mock.ExtHTTPService.Hostname:  false,
187
		mock.WorldService.Hostname:    false,
188
		mock.ExtHTTPSService.Hostname: false,
189
	}
190

191
	svcCount := 0
192
	// Compare return value to ground truth
193
	for _, svc := range services {
194
		if counted, existed := serviceMap[svc.Hostname]; existed && !counted {
195
			svcCount++
196
			serviceMap[svc.Hostname] = true
197
		}
198
	}
199

200
	if svcCount != len(serviceMap) {
201
		t.Fatal("Return services does not match ground truth")
202
	}
203
}
204

205
func TestGetService(t *testing.T) {
206
	aggregateCtl := buildMockController()
207

208
	// Get service from mockAdapter1
209
	svc := aggregateCtl.GetService(mock.HelloService.Hostname)
210
	if svc == nil {
211
		t.Fatal("Fail to get service")
212
	}
213
	if svc.Hostname != mock.HelloService.Hostname {
214
		t.Fatal("Returned service is incorrect")
215
	}
216

217
	// Get service from mockAdapter2
218
	svc = aggregateCtl.GetService(mock.WorldService.Hostname)
219
	if svc == nil {
220
		t.Fatal("Fail to get service")
221
	}
222
	if svc.Hostname != mock.WorldService.Hostname {
223
		t.Fatal("Returned service is incorrect")
224
	}
225
}
226

227
func TestGetProxyServiceTargets(t *testing.T) {
228
	aggregateCtl := buildMockController()
229

230
	// Get Instances from mockAdapter1
231
	instances := aggregateCtl.GetProxyServiceTargets(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
232
	if len(instances) != 6 {
233
		t.Fatalf("Returned GetProxyServiceTargets' amount %d is not correct", len(instances))
234
	}
235
	for _, inst := range instances {
236
		if inst.Service.Hostname != mock.HelloService.Hostname {
237
			t.Fatal("Returned Instance is incorrect")
238
		}
239
	}
240

241
	// Get Instances from mockAdapter2
242
	instances = aggregateCtl.GetProxyServiceTargets(&model.Proxy{IPAddresses: []string{mock.MakeIP(mock.WorldService, 1)}})
243
	if len(instances) != 6 {
244
		t.Fatalf("Returned GetProxyServiceTargets' amount %d is not correct", len(instances))
245
	}
246
	for _, inst := range instances {
247
		if inst.Service.Hostname != mock.WorldService.Hostname {
248
			t.Fatal("Returned Instance is incorrect")
249
		}
250
	}
251
}
252

253
func TestGetProxyWorkloadLabels(t *testing.T) {
254
	// If no registries return workload labels, we must return nil, rather than an empty list.
255
	// This ensures callers can distinguish between no labels, and labels not found.
256
	aggregateCtl := buildMockController()
257

258
	instances := aggregateCtl.GetProxyWorkloadLabels(&model.Proxy{IPAddresses: []string{mock.HelloInstanceV0}})
259
	if instances != nil {
260
		t.Fatalf("expected nil workload labels, got: %v", instances)
261
	}
262
}
263

264
func TestAddRegistry(t *testing.T) {
265
	registries := []serviceregistry.Simple{
266
		{
267
			ProviderID:          "registry1",
268
			ClusterID:           "cluster1",
269
			DiscoveryController: memory.NewServiceDiscovery(),
270
		},
271
		{
272
			ProviderID:          "registry2",
273
			ClusterID:           "cluster2",
274
			DiscoveryController: memory.NewServiceDiscovery(),
275
		},
276
	}
277
	ctrl := NewController(Options{})
278

279
	registry1Counter := atomic.NewInt32(0)
280
	registry2Counter := atomic.NewInt32(0)
281

282
	for _, r := range registries {
283
		clusterID := r.Cluster()
284
		counter := registry1Counter
285
		if clusterID == "cluster2" {
286
			counter = registry2Counter
287
		}
288
		ctrl.AppendServiceHandlerForCluster(clusterID, func(_, curr *model.Service, event model.Event) {
289
			counter.Add(1)
290
		})
291
		ctrl.AddRegistry(r)
292
	}
293
	if l := len(ctrl.registries); l != 2 {
294
		t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
295
	}
296

297
	registries[0].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
298
	registries[1].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
299

300
	ctrl.DeleteRegistry(registries[1].Cluster(), registries[1].Provider())
301
	ctrl.UnRegisterHandlersForCluster(registries[1].Cluster())
302
	registries[0].DiscoveryController.(*memory.ServiceDiscovery).AddService(mock.HelloService)
303

304
	if registry1Counter.Load() != 3 {
305
		t.Errorf("cluster1 expected 3 event, but got %d", registry1Counter.Load())
306
	}
307
	if registry2Counter.Load() != 2 {
308
		t.Errorf("cluster2 expected 2 event, but got %d", registry2Counter.Load())
309
	}
310
}
311

312
func TestGetDeleteRegistry(t *testing.T) {
313
	registries := []serviceregistry.Simple{
314
		{
315
			ProviderID:          "registry1",
316
			ClusterID:           "cluster1",
317
			DiscoveryController: memory.NewServiceDiscovery(),
318
		},
319
		{
320
			ProviderID:          "registry2",
321
			ClusterID:           "cluster2",
322
			DiscoveryController: memory.NewServiceDiscovery(),
323
		},
324
		{
325
			ProviderID:          "registry3",
326
			ClusterID:           "cluster3",
327
			DiscoveryController: memory.NewServiceDiscovery(),
328
		},
329
	}
330
	wrapRegistry := func(r serviceregistry.Instance) serviceregistry.Instance {
331
		return &registryEntry{Instance: r}
332
	}
333

334
	ctrl := NewController(Options{})
335
	for _, r := range registries {
336
		ctrl.AddRegistry(r)
337
	}
338

339
	// Test Get
340
	result := ctrl.GetRegistries()
341
	if l := len(result); l != 3 {
342
		t.Fatalf("Expected length of the registries slice should be 3, got %d", l)
343
	}
344

345
	// Test Delete cluster2
346
	ctrl.DeleteRegistry(registries[1].ClusterID, registries[1].ProviderID)
347
	result = ctrl.GetRegistries()
348
	if l := len(result); l != 2 {
349
		t.Fatalf("Expected length of the registries slice should be 2, got %d", l)
350
	}
351
	// check left registries are orders as before
352
	if !reflect.DeepEqual(result[0], wrapRegistry(registries[0])) || !reflect.DeepEqual(result[1], wrapRegistry(registries[2])) {
353
		t.Fatalf("Expected registries order has been changed")
354
	}
355
}
356

357
func TestSkipSearchingRegistryForProxy(t *testing.T) {
358
	cluster1 := serviceregistry.Simple{
359
		ClusterID:           "cluster-1",
360
		ProviderID:          provider.Kubernetes,
361
		DiscoveryController: memory.NewServiceDiscovery(),
362
	}
363
	cluster2 := serviceregistry.Simple{
364
		ClusterID:           "cluster-2",
365
		ProviderID:          provider.Kubernetes,
366
		DiscoveryController: memory.NewServiceDiscovery(),
367
	}
368
	// external registries may eventually be associated with a cluster
369
	external := serviceregistry.Simple{
370
		ClusterID:           "cluster-1",
371
		ProviderID:          provider.External,
372
		DiscoveryController: memory.NewServiceDiscovery(),
373
	}
374

375
	cases := []struct {
376
		nodeClusterID cluster.ID
377
		registry      serviceregistry.Instance
378
		want          bool
379
	}{
380
		// matching kube registry
381
		{"cluster-1", cluster1, false},
382
		// unmatching kube registry
383
		{"cluster-1", cluster2, true},
384
		// always search external
385
		{"cluster-1", external, false},
386
		{"cluster-2", external, false},
387
		{"", external, false},
388
		// always search for empty node cluster id
389
		{"", cluster1, false},
390
		{"", cluster2, false},
391
		{"", external, false},
392
	}
393

394
	for i, c := range cases {
395
		got := skipSearchingRegistryForProxy(c.nodeClusterID, c.registry)
396
		if got != c.want {
397
			t.Errorf("%s: got %v want %v",
398
				fmt.Sprintf("[%v] registry=%v node=%v", i, c.registry, c.nodeClusterID),
399
				got, c.want)
400
		}
401
	}
402
}
403

404
func runnableRegistry(name string) *RunnableRegistry {
405
	return &RunnableRegistry{
406
		Instance: serviceregistry.Simple{
407
			ClusterID: cluster.ID(name), ProviderID: "test",
408
			DiscoveryController: memory.NewServiceDiscovery(),
409
		},
410
		running: atomic.NewBool(false),
411
	}
412
}
413

414
type RunnableRegistry struct {
415
	serviceregistry.Instance
416
	running *atomic.Bool
417
}
418

419
func (rr *RunnableRegistry) Run(stop <-chan struct{}) {
420
	if rr.running.Load() {
421
		panic("--- registry has been run twice ---")
422
	}
423
	rr.running.Store(true)
424
	<-stop
425
}
426

427
func expectRunningOrFail(t *testing.T, ctrl *Controller, want bool) {
428
	// running gets flipped in a goroutine, retry to avoid race
429
	retry.UntilSuccessOrFail(t, func() error {
430
		for _, registry := range ctrl.registries {
431
			if running := registry.Instance.(*RunnableRegistry).running.Load(); running != want {
432
				return fmt.Errorf("%s running is %v but wanted %v", registry.Cluster(), running, want)
433
			}
434
		}
435
		return nil
436
	}, retry.Timeout(50*time.Millisecond), retry.Delay(0))
437
}
438

439
func TestDeferredRun(t *testing.T) {
440
	stop := make(chan struct{})
441
	defer close(stop)
442
	ctrl := NewController(Options{})
443

444
	t.Run("AddRegistry before aggregate Run does not run", func(t *testing.T) {
445
		ctrl.AddRegistry(runnableRegistry("earlyAdd"))
446
		ctrl.AddRegistryAndRun(runnableRegistry("earlyAddAndRun"), nil)
447
		expectRunningOrFail(t, ctrl, false)
448
	})
449
	t.Run("aggregate Run starts all registries", func(t *testing.T) {
450
		go ctrl.Run(stop)
451
		expectRunningOrFail(t, ctrl, true)
452
		ctrl.DeleteRegistry("earlyAdd", "test")
453
		ctrl.DeleteRegistry("earlyAddAndRun", "test")
454
	})
455
	t.Run("AddRegistry after aggregate Run does not start registry", func(t *testing.T) {
456
		ctrl.AddRegistry(runnableRegistry("missed"))
457
		expectRunningOrFail(t, ctrl, false)
458
		ctrl.DeleteRegistry("missed", "test")
459
		expectRunningOrFail(t, ctrl, true)
460
	})
461
	t.Run("AddRegistryAndRun after aggregate Run starts registry", func(t *testing.T) {
462
		ctrl.AddRegistryAndRun(runnableRegistry("late"), nil)
463
		expectRunningOrFail(t, ctrl, true)
464
	})
465
}
466

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.