istio
498 строк · 15.2 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18"context"
19"fmt"
20"reflect"
21"testing"
22"time"
23
24core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27"k8s.io/apimachinery/pkg/runtime"
28"k8s.io/apimachinery/pkg/runtime/schema"
29"k8s.io/apimachinery/pkg/types"
30mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
31
32"istio.io/api/label"
33"istio.io/istio/pilot/pkg/features"
34"istio.io/istio/pilot/pkg/model"
35"istio.io/istio/pilot/pkg/serviceregistry/kube"
36"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
37"istio.io/istio/pkg/config/host"
38"istio.io/istio/pkg/kube/mcs"
39"istio.io/istio/pkg/test"
40"istio.io/istio/pkg/test/util/assert"
41"istio.io/istio/pkg/test/util/retry"
42)
43
44const (
45serviceImportName = "test-svc"
46serviceImportNamespace = "test-ns"
47serviceImportPodIP = "128.0.0.2"
48serviceImportCluster = "test-cluster"
49)
50
51var (
52serviceImportNamespacedName = types.NamespacedName{
53Namespace: serviceImportNamespace,
54Name: serviceImportName,
55}
56serviceImportClusterSetHost = serviceClusterSetLocalHostname(serviceImportNamespacedName)
57serviceImportVIPs = []string{"1.1.1.1"}
58serviceImportTimeout = retry.Timeout(2 * time.Second)
59)
60
61func TestServiceNotImported(t *testing.T) {
62c, ic := newTestServiceImportCache(t)
63ic.createKubeService(t, c)
64
65// Check that the service does not have ClusterSet IPs.
66ic.checkServiceInstances(t)
67}
68
69func TestServiceImportedAfterCreated(t *testing.T) {
70c, ic := newTestServiceImportCache(t)
71
72ic.createKubeService(t, c)
73ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
74
75// Check that the service has been assigned ClusterSet IPs.
76ic.checkServiceInstances(t)
77}
78
79func TestServiceCreatedAfterImported(t *testing.T) {
80c, ic := newTestServiceImportCache(t)
81
82ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
83ic.createKubeService(t, c)
84
85// Check that the service has been assigned ClusterSet IPs.
86ic.checkServiceInstances(t)
87}
88
89func TestUpdateImportedService(t *testing.T) {
90c, ic := newTestServiceImportCache(t)
91
92ic.createKubeService(t, c)
93ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
94ic.checkServiceInstances(t)
95
96// Update the k8s service and verify that both services are updated.
97ic.updateKubeService(t)
98}
99
100func TestHeadlessServiceImported(t *testing.T) {
101// Create and run the controller.
102c, ic := newTestServiceImportCache(t)
103
104ic.createKubeService(t, c)
105ic.createServiceImport(t, mcsapi.Headless, nil)
106
107// Verify that we did not generate the synthetic service for the headless service.
108ic.checkServiceInstances(t)
109}
110
111func TestDeleteImportedService(t *testing.T) {
112// Create and run the controller.
113c1, ic := newTestServiceImportCache(t)
114
115// Create and run another controller.
116c2, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{
117ClusterID: "test-cluster2",
118})
119
120c1.opts.MeshServiceController.AddRegistryAndRun(c2, c2.stop)
121
122ic.createKubeService(t, c1)
123ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
124ic.checkServiceInstances(t)
125
126// create the same service in cluster2
127createService(c2, serviceImportName, serviceImportNamespace, map[string]string{}, map[string]string{},
128[]int32{8080}, map[string]string{"app": "prod-app"}, t)
129
130// Delete the k8s service and verify that all internal services are removed.
131ic.deleteKubeService(t, c2)
132}
133
134func TestUnimportService(t *testing.T) {
135// Create and run the controller.
136c, ic := newTestServiceImportCache(t)
137
138ic.createKubeService(t, c)
139ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
140ic.checkServiceInstances(t)
141
142ic.unimportService(t)
143}
144
145func TestAddServiceImportVIPs(t *testing.T) {
146// Create and run the controller.
147c, ic := newTestServiceImportCache(t)
148
149ic.createKubeService(t, c)
150ic.createServiceImport(t, mcsapi.ClusterSetIP, nil)
151ic.checkServiceInstances(t)
152
153ic.setServiceImportVIPs(t, serviceImportVIPs)
154}
155
156func TestUpdateServiceImportVIPs(t *testing.T) {
157// Create and run the controller.
158c, ic := newTestServiceImportCache(t)
159
160ic.createKubeService(t, c)
161ic.createServiceImport(t, mcsapi.ClusterSetIP, serviceImportVIPs)
162ic.checkServiceInstances(t)
163
164updatedVIPs := []string{"1.1.1.1", "1.1.1.2"}
165ic.setServiceImportVIPs(t, updatedVIPs)
166}
167
168func newTestServiceImportCache(t test.Failer) (*FakeController, *serviceImportCacheImpl) {
169test.SetForTest(t, &features.EnableMCSHost, true)
170
171c, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{
172ClusterID: serviceImportCluster,
173CRDs: []schema.GroupVersionResource{mcs.ServiceImportGVR},
174})
175
176return c, c.imports.(*serviceImportCacheImpl)
177}
178
179func (ic *serviceImportCacheImpl) createKubeService(t *testing.T, c *FakeController) {
180t.Helper()
181
182// Create the test service and endpoints.
183createService(c, serviceImportName, serviceImportNamespace, map[string]string{}, map[string]string{},
184[]int32{8080}, map[string]string{"app": "prod-app"}, t)
185createEndpoints(t, c, serviceImportName, serviceImportNamespace, []string{"tcp-port"}, []string{serviceImportPodIP}, nil, nil)
186
187isImported := ic.isImported(serviceImportNamespacedName)
188
189// Wait for the resources to be processed by the controller.
190retry.UntilSuccessOrFail(t, func() error {
191clusterLocalHost := ic.clusterLocalHost()
192if svc := c.GetService(clusterLocalHost); svc == nil {
193return fmt.Errorf("failed looking up service for host %s", clusterLocalHost)
194}
195
196var expectedHosts map[host.Name]struct{}
197if isImported {
198expectedHosts = map[host.Name]struct{}{
199clusterLocalHost: {},
200serviceImportClusterSetHost: {},
201}
202} else {
203expectedHosts = map[host.Name]struct{}{
204clusterLocalHost: {},
205}
206}
207
208instances := ic.getProxyServiceTargets()
209if len(instances) != len(expectedHosts) {
210return fmt.Errorf("expected 1 service instance, found %d", len(instances))
211}
212for _, si := range instances {
213if si.Service == nil {
214return fmt.Errorf("proxy ServiceInstance has nil service")
215}
216if _, found := expectedHosts[si.Service.Hostname]; !found {
217return fmt.Errorf("found proxy ServiceInstance for unexpected host: %s", si.Service.Hostname)
218}
219delete(expectedHosts, si.Service.Hostname)
220}
221
222if len(expectedHosts) > 0 {
223return fmt.Errorf("failed to find proxy ServiceEndpoints for hosts: %v", expectedHosts)
224}
225
226return nil
227}, serviceImportTimeout)
228}
229
230func (ic *serviceImportCacheImpl) updateKubeService(t *testing.T) {
231t.Helper()
232svc, _ := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Get(context.TODO(), serviceImportName, metav1.GetOptions{})
233if svc == nil {
234t.Fatalf("failed to find k8s service: %s/%s", serviceImportNamespace, serviceImportName)
235}
236
237// Just add a new label.
238svc.Labels = map[string]string{
239"foo": "bar",
240}
241if _, err := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}); err != nil {
242t.Fatal(err)
243}
244
245hostNames := []host.Name{
246ic.clusterLocalHost(),
247serviceImportClusterSetHost,
248}
249
250// Wait for the services to pick up the label.
251retry.UntilSuccessOrFail(t, func() error {
252for _, hostName := range hostNames {
253svc := ic.GetService(hostName)
254if svc == nil {
255return fmt.Errorf("failed to find service for host %s", hostName)
256}
257if svc.Attributes.Labels["foo"] != "bar" {
258return fmt.Errorf("service not updated for %s", hostName)
259}
260}
261
262return nil
263}, serviceImportTimeout)
264}
265
266func (ic *serviceImportCacheImpl) deleteKubeService(t *testing.T, anotherCluster *FakeController) {
267t.Helper()
268
269if err := anotherCluster.client.Kube().
270CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
271t.Fatal(err)
272}
273// Wait for the resources to be processed by the controller.
274if err := ic.client.Kube().CoreV1().Services(serviceImportNamespace).Delete(context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
275t.Fatal(err)
276}
277
278// Wait for the resources to be processed by the controller.
279retry.UntilSuccessOrFail(t, func() error {
280if svc := ic.GetService(ic.clusterLocalHost()); svc != nil {
281return fmt.Errorf("found deleted service for host %s", ic.clusterLocalHost())
282}
283if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
284return fmt.Errorf("found deleted service for host %s", serviceImportClusterSetHost)
285}
286
287instances := ic.getProxyServiceTargets()
288if len(instances) != 0 {
289return fmt.Errorf("expected 0 service instance, found %d", len(instances))
290}
291
292return nil
293}, serviceImportTimeout)
294}
295
296func (ic *serviceImportCacheImpl) getProxyServiceTargets() []model.ServiceTarget {
297return ic.GetProxyServiceTargets(&model.Proxy{
298Type: model.SidecarProxy,
299IPAddresses: []string{serviceImportPodIP},
300Locality: &core.Locality{Region: "r", Zone: "z"},
301ConfigNamespace: serviceImportNamespace,
302Labels: map[string]string{
303"app": "prod-app",
304label.SecurityTlsMode.Name: "mutual",
305},
306Metadata: &model.NodeMetadata{
307ServiceAccount: "account",
308ClusterID: ic.Cluster(),
309Labels: map[string]string{
310"app": "prod-app",
311label.SecurityTlsMode.Name: "mutual",
312},
313},
314})
315}
316
317func (ic *serviceImportCacheImpl) getServiceImport(t *testing.T) *mcsapi.ServiceImport {
318t.Helper()
319
320// Get the ServiceImport as unstructured
321u, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Get(
322context.TODO(), serviceImportName, metav1.GetOptions{})
323if err != nil {
324return nil
325}
326
327// Convert to ServiceImport
328si := &mcsapi.ServiceImport{}
329if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, si); err != nil {
330t.Fatal(err)
331}
332return si
333}
334
335func (ic *serviceImportCacheImpl) checkServiceInstances(t *testing.T) {
336t.Helper()
337
338si := ic.getServiceImport(t)
339
340var expectedIPs []string
341expectedServiceCount := 1
342expectMCSService := false
343if si != nil && si.Spec.Type == mcsapi.ClusterSetIP && len(si.Spec.IPs) > 0 {
344expectedIPs = si.Spec.IPs
345expectedServiceCount = 2
346expectMCSService = true
347}
348
349instances := ic.getProxyServiceTargets()
350assert.Equal(t, len(instances), expectedServiceCount)
351
352for _, inst := range instances {
353svc := inst.Service
354if svc.Hostname == serviceImportClusterSetHost {
355if !expectMCSService {
356t.Fatalf("found ServiceInstance for unimported service %s", serviceImportClusterSetHost)
357}
358// Check the ClusterSet IPs.
359assert.Equal(t, svc.ClusterVIPs.GetAddressesFor(ic.Cluster()), expectedIPs)
360return
361}
362}
363
364if expectMCSService {
365t.Fatalf("failed finding ServiceInstance for %s", serviceImportClusterSetHost)
366}
367}
368
369func (ic *serviceImportCacheImpl) createServiceImport(t *testing.T, importType mcsapi.ServiceImportType, vips []string) {
370t.Helper()
371
372// Create the ServiceImport resource in the cluster.
373_, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Create(context.TODO(),
374newServiceImport(importType, vips),
375metav1.CreateOptions{})
376if err != nil {
377t.Fatal(err)
378}
379
380shouldCreateMCSService := importType == mcsapi.ClusterSetIP && len(vips) > 0 &&
381ic.GetService(ic.clusterLocalHost()) != nil
382
383// Wait for the import to be processed by the controller.
384retry.UntilSuccessOrFail(t, func() error {
385if !ic.isImported(serviceImportNamespacedName) {
386return fmt.Errorf("serviceImport not found for %s", serviceImportClusterSetHost)
387}
388if shouldCreateMCSService && ic.GetService(serviceImportClusterSetHost) == nil {
389return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
390}
391return nil
392}, serviceImportTimeout)
393
394if shouldCreateMCSService {
395// Wait for the XDS event.
396ic.checkXDS(t)
397}
398}
399
400func (ic *serviceImportCacheImpl) setServiceImportVIPs(t *testing.T, vips []string) {
401t.Helper()
402
403// Get the ServiceImport
404si := ic.getServiceImport(t)
405
406// Apply the ClusterSet IPs.
407si.Spec.IPs = vips
408if _, err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Update(
409context.TODO(), toUnstructured(si), metav1.UpdateOptions{}); err != nil {
410t.Fatal(err)
411}
412
413if len(vips) > 0 {
414// Wait for the import to be processed by the controller.
415retry.UntilSuccessOrFail(t, func() error {
416svc := ic.GetService(serviceImportClusterSetHost)
417if svc == nil {
418return fmt.Errorf("failed to find service for %s", serviceImportClusterSetHost)
419}
420
421actualVIPs := svc.ClusterVIPs.GetAddressesFor(ic.Cluster())
422if !reflect.DeepEqual(vips, actualVIPs) {
423return fmt.Errorf("expected ClusterSet VIPs %v, but found %v", vips, actualVIPs)
424}
425return nil
426}, serviceImportTimeout)
427
428// Wait for the XDS event.
429ic.checkXDS(t)
430} else {
431// Wait for the import to be processed by the controller.
432retry.UntilSuccessOrFail(t, func() error {
433if svc := ic.GetService(serviceImportClusterSetHost); svc != nil {
434return fmt.Errorf("found unexpected service for %s", serviceImportClusterSetHost)
435}
436return nil
437}, serviceImportTimeout)
438}
439}
440
441func (ic *serviceImportCacheImpl) unimportService(t *testing.T) {
442t.Helper()
443
444if err := ic.client.Dynamic().Resource(mcs.ServiceImportGVR).Namespace(serviceImportNamespace).Delete(
445context.TODO(), serviceImportName, metav1.DeleteOptions{}); err != nil {
446t.Fatal(err)
447}
448
449// Wait for the import to be processed by the controller.
450retry.UntilSuccessOrFail(t, func() error {
451if ic.isImported(serviceImportNamespacedName) {
452return fmt.Errorf("serviceImport found for %s", serviceImportClusterSetHost)
453}
454if ic.GetService(serviceImportClusterSetHost) != nil {
455return fmt.Errorf("found MCS service for unimported service %s", serviceImportClusterSetHost)
456}
457return nil
458}, serviceImportTimeout)
459}
460
461func (ic *serviceImportCacheImpl) isImported(name types.NamespacedName) bool {
462return ic.serviceImports.Get(name.Name, name.Namespace) != nil
463}
464
465func (ic *serviceImportCacheImpl) checkXDS(t test.Failer) {
466t.Helper()
467ic.opts.XDSUpdater.(*xdsfake.Updater).MatchOrFail(t, xdsfake.Event{Type: "service", ID: serviceImportClusterSetHost.String()})
468}
469
470func (ic *serviceImportCacheImpl) clusterLocalHost() host.Name {
471return kube.ServiceHostname(serviceImportName, serviceImportNamespace, ic.opts.DomainSuffix)
472}
473
474func newServiceImport(importType mcsapi.ServiceImportType, vips []string) *unstructured.Unstructured {
475si := &mcsapi.ServiceImport{
476TypeMeta: metav1.TypeMeta{
477Kind: "ServiceImport",
478APIVersion: "multicluster.x-k8s.io/v1alpha1",
479},
480ObjectMeta: metav1.ObjectMeta{
481Name: serviceImportName,
482Namespace: serviceImportNamespace,
483},
484Spec: mcsapi.ServiceImportSpec{
485Type: importType,
486IPs: vips,
487},
488}
489return toUnstructured(si)
490}
491
492func toUnstructured(o any) *unstructured.Unstructured {
493u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
494if err != nil {
495panic(err)
496}
497return &unstructured.Unstructured{Object: u}
498}
499