istio

Форк
0
/
serviceimportcache_test.go 
498 строк · 15.2 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"context"
19
	"fmt"
20
	"reflect"
21
	"testing"
22
	"time"
23

24
	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27
	"k8s.io/apimachinery/pkg/runtime"
28
	"k8s.io/apimachinery/pkg/runtime/schema"
29
	"k8s.io/apimachinery/pkg/types"
30
	mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
31

32
	"istio.io/api/label"
33
	"istio.io/istio/pilot/pkg/features"
34
	"istio.io/istio/pilot/pkg/model"
35
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
36
	"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
37
	"istio.io/istio/pkg/config/host"
38
	"istio.io/istio/pkg/kube/mcs"
39
	"istio.io/istio/pkg/test"
40
	"istio.io/istio/pkg/test/util/assert"
41
	"istio.io/istio/pkg/test/util/retry"
42
)
43

44
const (
45
	serviceImportName      = "test-svc"
46
	serviceImportNamespace = "test-ns"
47
	serviceImportPodIP     = "128.0.0.2"
48
	serviceImportCluster   = "test-cluster"
49
)
50

51
var (
52
	serviceImportNamespacedName = types.NamespacedName{
53
		Namespace: serviceImportNamespace,
54
		Name:      serviceImportName,
55
	}
56
	serviceImportClusterSetHost = serviceClusterSetLocalHostname(serviceImportNamespacedName)
57
	serviceImportVIPs           = []string{"1.1.1.1"}
58
	serviceImportTimeout        = retry.Timeout(2 * time.Second)
59
)
60

61
func TestServiceNotImported(t *testing.T) {
62
	c, ic := newTestServiceImportCache(t)
63
	ic.createKubeService(t, c)
64

65
	// Check that the service does not have ClusterSet IPs.
66
	ic.checkServiceInstances(t)
67
}
68

69
func TestServiceImportedAfterCreated(t *testing.T) {
70
	c, ic := newTestServiceImportCache(t)
71

72
	ic.createKubeService(t, c)
73
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
74

75
	// Check that the service has been assigned ClusterSet IPs.
76
	ic.checkServiceInstances(t)
77
}
78

79
func TestServiceCreatedAfterImported(t *testing.T) {
80
	c, ic := newTestServiceImportCache(t)
81

82
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
83
	ic.createKubeService(t, c)
84

85
	// Check that the service has been assigned ClusterSet IPs.
86
	ic.checkServiceInstances(t)
87
}
88

89
func TestUpdateImportedService(t *testing.T) {
90
	c, ic := newTestServiceImportCache(t)
91

92
	ic.createKubeService(t, c)
93
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
94
	ic.checkServiceInstances(t)
95

96
	// Update the k8s service and verify that both services are updated.
97
	ic.updateKubeService(t)
98
}
99

100
func TestHeadlessServiceImported(t *testing.T) {
101
	// Create and run the controller.
102
	c, ic := newTestServiceImportCache(t)
103

104
	ic.createKubeService(t, c)
105
	ic.createServiceImport(t, mcsapi.Headless, nil)
106

107
	// Verify that we did not generate the synthetic service for the headless service.
108
	ic.checkServiceInstances(t)
109
}
110

111
func TestDeleteImportedService(t *testing.T) {
112
	// Create and run the controller.
113
	c1, ic := newTestServiceImportCache(t)
114

115
	// Create and run another controller.
116
	c2, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{
117
		ClusterID: "test-cluster2",
118
	})
119

120
	c1.opts.MeshServiceController.AddRegistryAndRun(c2, c2.stop)
121

122
	ic.createKubeService(t, c1)
123
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
124
	ic.checkServiceInstances(t)
125

126
	// create the same service in cluster2
127
	createService(c2, serviceImportName, serviceImportNamespace, map[string]string{}, map[string]string{},
128
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
129

130
	// Delete the k8s service and verify that all internal services are removed.
131
	ic.deleteKubeService(t, c2)
132
}
133

134
func TestUnimportService(t *testing.T) {
135
	// Create and run the controller.
136
	c, ic := newTestServiceImportCache(t)
137

138
	ic.createKubeService(t, c)
139
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
140
	ic.checkServiceInstances(t)
141

142
	ic.unimportService(t)
143
}
144

145
func TestAddServiceImportVIPs(t *testing.T) {
146
	// Create and run the controller.
147
	c, ic := newTestServiceImportCache(t)
148

149
	ic.createKubeService(t, c)
150
	ic.createServiceImport(t, mcsapi.ClusterSetIP, nil)
151
	ic.checkServiceInstances(t)
152

153
	ic.setServiceImportVIPs(t, serviceImportVIPs)
154
}
155

156
func TestUpdateServiceImportVIPs(t *testing.T) {
157
	// Create and run the controller.
158
	c, ic := newTestServiceImportCache(t)
159

160
	ic.createKubeService(t, c)
161
	ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
162
	ic.checkServiceInstances(t)
163

164
	updatedVIPs := []string{"1.1.1.1", "1.1.1.2"}
165
	ic.setServiceImportVIPs(t, updatedVIPs)
166
}
167

168
func newTestServiceImportCache(t test.Failer) (*FakeController, *serviceImportCacheImpl) {
169
	test.SetForTest(t, &features.EnableMCSHost, true)
170

171
	c, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{
172
		ClusterID: serviceImportCluster,
173
		CRDs:      []schema.GroupVersionResource{mcs.ServiceImportGVR},
174
	})
175

176
	return c, c.imports.(*serviceImportCacheImpl)
177
}
178

179
func (ic *serviceImportCacheImpl) createKubeService(t *testing.T, c *FakeController) {
180
	t.Helper()
181

182
	// Create the test service and endpoints.
183
	createService(c, serviceImportName, serviceImportNamespace, map[string]string{}, map[string]string{},
184
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
185
	createEndpoints(t, c, serviceImportName, serviceImportNamespace, []string{"tcp-port"}, []string{serviceImportPodIP}, nil, nil)
186

187
	isImported := ic.isImported(serviceImportNamespacedName)
188

189
	// Wait for the resources to be processed by the controller.
190
	retry.UntilSuccessOrFail(t, func() error {
191
		clusterLocalHost := ic.clusterLocalHost()
192
		if svc := c.GetService(clusterLocalHost); svc == nil {
193
			return fmt.Errorf("failed looking up service for host %s", clusterLocalHost)
194
		}
195

196
		var expectedHosts map[host.Name]struct{}
197
		if isImported {
198
			expectedHosts = map[host.Name]struct{}{
199
				clusterLocalHost:            {},
200
				serviceImportClusterSetHost: {},
201
			}
202
		} else {
203
			expectedHosts = map[host.Name]struct{}{
204
				clusterLocalHost: {},
205
			}
206
		}
207

208
		instances := ic.getProxyServiceTargets()
209
		if len(instances) != len(expectedHosts) {
210
			return fmt.Errorf("expected 1 service instance, found %d", len(instances))
211
		}
212
		for _, si := range instances {
213
			if si.Service == nil {
214
				return fmt.Errorf("proxy ServiceInstance has nil service")
215
			}
216
			if _, found := expectedHosts[si.Service.Hostname]; !found {
217
				return fmt.Errorf("found proxy ServiceInstance for unexpected host: %s", si.Service.Hostname)
218
			}
219
			delete(expectedHosts, si.Service.Hostname)
220
		}
221

222
		if len(expectedHosts) > 0 {
223
			return fmt.Errorf("failed to find proxy ServiceEndpoints for hosts: %v", expectedHosts)
224
		}
225

226
		return nil
227
	}, serviceImportTimeout)
228
}
229

230
func (ic *serviceImportCacheImpl) updateKubeService(t *testing.T) {
231
	t.Helper()
232
	svc, _ := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Get(context.TODO(), serviceImportName, metav1.GetOptions{})
233
	if svc == nil {
234
		t.Fatalf("failed to find k8s service: %s/%s", serviceImportNamespace, serviceImportName)
235
	}
236

237
	// Just add a new label.
238
	svc.Labels = map[string]string{
239
		"foo": "bar",
240
	}
241
	if _, err := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}); err != nil {
242
		t.Fatal(err)
243
	}
244

245
	hostNames := []host.Name{
246
		ic.clusterLocalHost(),
247
		serviceImportClusterSetHost,
248
	}
249

250
	// Wait for the services to pick up the label.
251
	retry.UntilSuccessOrFail(t, func() error {
252
		for _, hostName := range hostNames {
253
			svc := ic.GetService(hostName)
254
			if svc == nil {
255
				return fmt.Errorf("failed to find service for host %s", hostName)
256
			}
257
			if svc.Attributes.Labels["foo"] != "bar" {
258
				return fmt.Errorf("service not updated for %s", hostName)
259
			}
260
		}
261

262
		return nil
263
	}, serviceImportTimeout)
264
}
265

266
func (ic *serviceImportCacheImpl) deleteKubeService(t *testing.T, anotherCluster *FakeController) {
267
	t.Helper()
268

269
	if err := anotherCluster.client.Kube().
270
		CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
271
		t.Fatal(err)
272
	}
273
	// Wait for the resources to be processed by the controller.
274
	if err := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
275
		t.Fatal(err)
276
	}
277

278
	// Wait for the resources to be processed by the controller.
279
	retry.UntilSuccessOrFail(t, func() error {
280
		if svc := ic.GetService(ic.clusterLocalHost()); svc != nil {
281
			return fmt.Errorf("found deleted service for host %s", ic.clusterLocalHost())
282
		}
283
		if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
284
			return fmt.Errorf("found deleted service for host %s", serviceImportClusterSetHost)
285
		}
286

287
		instances := ic.getProxyServiceTargets()
288
		if len(instances) != 0 {
289
			return fmt.Errorf("expected 0 service instance, found %d", len(instances))
290
		}
291

292
		return nil
293
	}, serviceImportTimeout)
294
}
295

296
func (ic *serviceImportCacheImpl) getProxyServiceTargets() []model.ServiceTarget {
297
	return ic.GetProxyServiceTargets(&model.Proxy{
298
		Type:            model.SidecarProxy,
299
		IPAddresses:     []string{serviceImportPodIP},
300
		Locality:        &core.Locality{Region: "r", Zone: "z"},
301
		ConfigNamespace: serviceImportNamespace,
302
		Labels: map[string]string{
303
			"app":                      "prod-app",
304
			label.SecurityTlsMode.Name: "mutual",
305
		},
306
		Metadata: &model.NodeMetadata{
307
			ServiceAccount: "account",
308
			ClusterID:      ic.Cluster(),
309
			Labels: map[string]string{
310
				"app":                      "prod-app",
311
				label.SecurityTlsMode.Name: "mutual",
312
			},
313
		},
314
	})
315
}
316

317
func (ic *serviceImportCacheImpl) getServiceImport(t *testing.T) *mcsapi.ServiceImport {
318
	t.Helper()
319

320
	// Get the ServiceImport as unstructured
321
	u, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Get(
322
		context.TODO(), serviceImportName, metav1.GetOptions{})
323
	if err != nil {
324
		return nil
325
	}
326

327
	// Convert to ServiceImport
328
	si := &mcsapi.ServiceImport{}
329
	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, si); err != nil {
330
		t.Fatal(err)
331
	}
332
	return si
333
}
334

335
func (ic *serviceImportCacheImpl) checkServiceInstances(t *testing.T) {
336
	t.Helper()
337

338
	si := ic.getServiceImport(t)
339

340
	var expectedIPs []string
341
	expectedServiceCount := 1
342
	expectMCSService := false
343
	if si != nil && si.Spec.Type == mcsapi.ClusterSetIP && len(si.Spec.IPs) > 0 {
344
		expectedIPs = si.Spec.IPs
345
		expectedServiceCount = 2
346
		expectMCSService = true
347
	}
348

349
	instances := ic.getProxyServiceTargets()
350
	assert.Equal(t, len(instances), expectedServiceCount)
351

352
	for _, inst := range instances {
353
		svc := inst.Service
354
		if svc.Hostname == serviceImportClusterSetHost {
355
			if !expectMCSService {
356
				t.Fatalf("found ServiceInstance for unimported service %s", serviceImportClusterSetHost)
357
			}
358
			// Check the ClusterSet IPs.
359
			assert.Equal(t, svc.ClusterVIPs.GetAddressesFor(ic.Cluster()), expectedIPs)
360
			return
361
		}
362
	}
363

364
	if expectMCSService {
365
		t.Fatalf("failed finding ServiceInstance for %s", serviceImportClusterSetHost)
366
	}
367
}
368

369
func (ic *serviceImportCacheImpl) createServiceImport(t *testing.T, importType mcsapi.ServiceImportType, vips []string) {
370
	t.Helper()
371

372
	// Create the ServiceImport resource in the cluster.
373
	_, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Create(context.TODO(),
374
		newServiceImport(importType, vips),
375
		metav1.CreateOptions{})
376
	if err != nil {
377
		t.Fatal(err)
378
	}
379

380
	shouldCreateMCSService := importType == mcsapi.ClusterSetIP && len(vips) > 0 &&
381
		ic.GetService(ic.clusterLocalHost()) != nil
382

383
	// Wait for the import to be processed by the controller.
384
	retry.UntilSuccessOrFail(t, func() error {
385
		if !ic.isImported(serviceImportNamespacedName) {
386
			return fmt.Errorf("serviceImport not found for %s", serviceImportClusterSetHost)
387
		}
388
		if shouldCreateMCSService && ic.GetService(serviceImportClusterSetHost) == nil {
389
			return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
390
		}
391
		return nil
392
	}, serviceImportTimeout)
393

394
	if shouldCreateMCSService {
395
		// Wait for the XDS event.
396
		ic.checkXDS(t)
397
	}
398
}
399

400
func (ic *serviceImportCacheImpl) setServiceImportVIPs(t *testing.T, vips []string) {
401
	t.Helper()
402

403
	// Get the ServiceImport
404
	si := ic.getServiceImport(t)
405

406
	// Apply the ClusterSet IPs.
407
	si.Spec.IPs = vips
408
	if _, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Update(
409
		context.TODO(), toUnstructured(si), metav1.UpdateOptions{}); err != nil {
410
		t.Fatal(err)
411
	}
412

413
	if len(vips) > 0 {
414
		// Wait for the import to be processed by the controller.
415
		retry.UntilSuccessOrFail(t, func() error {
416
			svc := ic.GetService(serviceImportClusterSetHost)
417
			if svc == nil {
418
				return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
419
			}
420

421
			actualVIPs := svc.ClusterVIPs.GetAddressesFor(ic.Cluster())
422
			if !reflect.DeepEqual(vips, actualVIPs) {
423
				return fmt.Errorf("expected ClusterSet VIPs %v, but found %v", vips, actualVIPs)
424
			}
425
			return nil
426
		}, serviceImportTimeout)
427

428
		// Wait for the XDS event.
429
		ic.checkXDS(t)
430
	} else {
431
		// Wait for the import to be processed by the controller.
432
		retry.UntilSuccessOrFail(t, func() error {
433
			if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
434
				return fmt.Errorf("found unexpected service for %s", serviceImportClusterSetHost)
435
			}
436
			return nil
437
		}, serviceImportTimeout)
438
	}
439
}
440

441
func (ic *serviceImportCacheImpl) unimportService(t *testing.T) {
442
	t.Helper()
443

444
	if err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Delete(
445
		context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
446
		t.Fatal(err)
447
	}
448

449
	// Wait for the import to be processed by the controller.
450
	retry.UntilSuccessOrFail(t, func() error {
451
		if ic.isImported(serviceImportNamespacedName) {
452
			return fmt.Errorf("serviceImport found for %s", serviceImportClusterSetHost)
453
		}
454
		if ic.GetService(serviceImportClusterSetHost) != nil {
455
			return fmt.Errorf("found MCS service for unimported service %s", serviceImportClusterSetHost)
456
		}
457
		return nil
458
	}, serviceImportTimeout)
459
}
460

461
func (ic *serviceImportCacheImpl) isImported(name types.NamespacedName) bool {
462
	return ic.serviceImports.Get(name.Name, name.Namespace) != nil
463
}
464

465
func (ic *serviceImportCacheImpl) checkXDS(t test.Failer) {
466
	t.Helper()
467
	ic.opts.XDSUpdater.(*xdsfake.Updater).MatchOrFail(t, xdsfake.Event{Type: "service", ID: serviceImportClusterSetHost.String()})
468
}
469

470
func (ic *serviceImportCacheImpl) clusterLocalHost() host.Name {
471
	return kube.ServiceHostname(serviceImportName, serviceImportNamespace, ic.opts.DomainSuffix)
472
}
473

474
func newServiceImport(importType mcsapi.ServiceImportType, vips []string) *unstructured.Unstructured {
475
	si := &mcsapi.ServiceImport{
476
		TypeMeta: metav1.TypeMeta{
477
			Kind:       "ServiceImport",
478
			APIVersion: "multicluster.x-k8s.io/v1alpha1",
479
		},
480
		ObjectMeta: metav1.ObjectMeta{
481
			Name:      serviceImportName,
482
			Namespace: serviceImportNamespace,
483
		},
484
		Spec: mcsapi.ServiceImportSpec{
485
			Type: importType,
486
			IPs:  vips,
487
		},
488
	}
489
	return toUnstructured(si)
490
}
491

492
func toUnstructured(o any) *unstructured.Unstructured {
493
	u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
494
	if err != nil {
495
		panic(err)
496
	}
497
	return &unstructured.Unstructured{Object: u}
498
}
499

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.