istio
304 строки · 9.1 Кб
1// Copyright Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller16
17import (18"context"19"fmt"20"testing"21
22metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"23"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"24"k8s.io/apimachinery/pkg/runtime/schema"25"k8s.io/apimachinery/pkg/types"26mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"27
28"istio.io/istio/pilot/pkg/features"29"istio.io/istio/pilot/pkg/model"30"istio.io/istio/pilot/pkg/serviceregistry/kube"31"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"32"istio.io/istio/pkg/config/host"33"istio.io/istio/pkg/kube/mcs"34"istio.io/istio/pkg/maps"35"istio.io/istio/pkg/slices"36istiotest "istio.io/istio/pkg/test"37"istio.io/istio/pkg/test/util/retry"38)
39
40const (41serviceExportName = "test-svc"42serviceExportNamespace = "test-ns"43serviceExportPodIP = "128.0.0.2"44testCluster = "test-cluster"45)
46
47var serviceExportNamespacedName = types.NamespacedName{48Namespace: serviceExportNamespace,49Name: serviceExportName,50}
51
52type ClusterLocalMode string53
54func (m ClusterLocalMode) String() string {55return string(m)56}
57
58const (59alwaysClusterLocal ClusterLocalMode = "always cluster local"60meshWide ClusterLocalMode = "mesh wide"61)
62
63var ClusterLocalModes = []ClusterLocalMode{alwaysClusterLocal, meshWide}64
65func TestServiceNotExported(t *testing.T) {66for _, clusterLocalMode := range ClusterLocalModes {67t.Run(clusterLocalMode.String(), func(t *testing.T) {68// Create and run the controller.69ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)70// Check that the endpoint is cluster-local71ec.checkServiceInstancesOrFail(t, false, endpoints)72})73}74}
75
76func TestServiceExported(t *testing.T) {77for _, clusterLocalMode := range ClusterLocalModes {78t.Run(clusterLocalMode.String(), func(t *testing.T) {79// Create and run the controller.80ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)81// Export the service.82ec.export(t)83
84// Check that the endpoint is mesh-wide85ec.checkServiceInstancesOrFail(t, true, endpoints)86})87}88}
89
90func TestServiceUnexported(t *testing.T) {91for _, clusterLocalMode := range ClusterLocalModes {92t.Run(clusterLocalMode.String(), func(t *testing.T) {93// Create and run the controller.94ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)95// Export the service and then unexport it immediately.96ec.export(t)97ec.unExport(t)98
99// Check that the endpoint is cluster-local100ec.checkServiceInstancesOrFail(t, false, endpoints)101})102}103}
104
105func newServiceExport() *unstructured.Unstructured {106se := &mcsapi.ServiceExport{107TypeMeta: metav1.TypeMeta{108Kind: "ServiceExport",109APIVersion: mcs.MCSSchemeGroupVersion.String(),110},111ObjectMeta: metav1.ObjectMeta{112Name: serviceExportName,113Namespace: serviceExportNamespace,114},115}116return toUnstructured(se)117}
118
119func newTestServiceExportCache(t *testing.T, clusterLocalMode ClusterLocalMode) (*serviceExportCacheImpl, *model.EndpointIndex) {120t.Helper()121
122istiotest.SetForTest(t, &features.EnableMCSServiceDiscovery, true)123istiotest.SetForTest(t, &features.EnableMCSClusterLocal, clusterLocalMode == alwaysClusterLocal)124
125c, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{126ClusterID: testCluster,127CRDs: []schema.GroupVersionResource{mcs.ServiceExportGVR},128})129
130// Create the test service and endpoints.131createService(c, serviceExportName, serviceExportNamespace, map[string]string{}, map[string]string{},132[]int32{8080}, map[string]string{"app": "prod-app"}, t)133createEndpoints(t, c, serviceExportName, serviceExportNamespace, []string{"tcp-port"}, []string{serviceExportPodIP}, nil, nil)134
135ec := c.exports.(*serviceExportCacheImpl)136// Wait for the resources to be processed by the controller.137retry.UntilOrFail(t, func() bool {138if svc := ec.GetService(ec.serviceHostname()); svc == nil {139return false140}141inst := ec.getEndpoint(c.Endpoints)142return inst != nil143}, serviceExportTimeout)144return ec, c.Endpoints145}
146
147func (ec *serviceExportCacheImpl) serviceHostname() host.Name {148return kube.ServiceHostname(serviceExportName, serviceExportNamespace, ec.opts.DomainSuffix)149}
150
151func (ec *serviceExportCacheImpl) export(t *testing.T) {152t.Helper()153
154_, err := ec.client.Dynamic().Resource(mcs.ServiceExportGVR).Namespace(serviceExportNamespace).Create(context.TODO(),155newServiceExport(),156metav1.CreateOptions{})157if err != nil {158t.Fatal(err)159}160
161// Wait for the export to be processed by the controller.162retry.UntilOrFail(t, func() bool {163return ec.isExported(serviceExportNamespacedName)164}, serviceExportTimeout, retry.Message("expected to be exported"))165
166// Wait for the XDS event.167ec.waitForXDS(t, true)168}
169
170func (ec *serviceExportCacheImpl) unExport(t *testing.T) {171t.Helper()172
173_ = ec.client.Dynamic().Resource(mcs.ServiceExportGVR).Namespace(serviceExportNamespace).Delete(174context.TODO(),175serviceExportName,176metav1.DeleteOptions{})177
178// Wait for the delete to be processed by the controller.179retry.UntilOrFail(t, func() bool {180return !ec.isExported(serviceExportNamespacedName)181}, serviceExportTimeout)182
183// Wait for the XDS event.184ec.waitForXDS(t, false)185}
186
187func (ec *serviceExportCacheImpl) waitForXDS(t *testing.T, exported bool) {188t.Helper()189retry.UntilSuccessOrFail(t, func() error {190event := ec.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "eds")191if len(event.Endpoints) != 1 {192return fmt.Errorf("waitForXDS failed: expected 1 endpoint, found %d", len(event.Endpoints))193}194
195hostName := host.Name(event.ID)196svc := ec.GetService(hostName)197if svc == nil {198return fmt.Errorf("unable to find service for host %s", hostName)199}200return ec.checkEndpoint(exported, event.Endpoints[0])201}, serviceExportTimeout)202}
203
204func (ec *serviceExportCacheImpl) getEndpoint(endpoints *model.EndpointIndex) *model.IstioEndpoint {205svcs := ec.Services()206for _, s := range svcs {207ep := GetEndpoints(s, endpoints)208if len(ep) > 0 {209return ep[0]210}211}212return nil213}
214
215func GetEndpoints(s *model.Service, endpoints *model.EndpointIndex) []*model.IstioEndpoint {216return GetEndpointsForPort(s, endpoints, 0)217}
218
219func GetEndpointsForPort(s *model.Service, endpoints *model.EndpointIndex, port int) []*model.IstioEndpoint {220shards, ok := endpoints.ShardsForService(string(s.Hostname), s.Attributes.Namespace)221if !ok {222return nil223}224var pn string225for _, p := range s.Ports {226if p.Port == port {227pn = p.Name228break229}230}231if pn == "" && port != 0 {232return nil233}234shards.RLock()235defer shards.RUnlock()236return slices.FilterInPlace(slices.Flatten(maps.Values(shards.Shards)), func(endpoint *model.IstioEndpoint) bool {237return pn == "" || endpoint.ServicePortName == pn238})239}
240
241func (ec *serviceExportCacheImpl) checkServiceInstancesOrFail(t *testing.T, exported bool, endpoints *model.EndpointIndex) {242t.Helper()243if err := ec.checkEndpoints(exported, endpoints); err != nil {244t.Fatal(err)245}246}
247
248func (ec *serviceExportCacheImpl) checkEndpoints(exported bool, endpoints *model.EndpointIndex) error {249ep := ec.getEndpoint(endpoints)250if ep == nil {251return fmt.Errorf("expected an endpoint, found none")252}253return ec.checkEndpoint(exported, ep)254}
255
256func (ec *serviceExportCacheImpl) checkEndpoint(exported bool, ep *model.IstioEndpoint) error {257// Should always be discoverable from the same cluster.258if err := ec.checkDiscoverableFromSameCluster(ep); err != nil {259return err260}261
262if exported && !features.EnableMCSClusterLocal {263return ec.checkDiscoverableFromDifferentCluster(ep)264}265
266return ec.checkNotDiscoverableFromDifferentCluster(ep)267}
268
269func (ec *serviceExportCacheImpl) checkDiscoverableFromSameCluster(ep *model.IstioEndpoint) error {270if !ec.isDiscoverableFromSameCluster(ep) {271return fmt.Errorf("endpoint was not discoverable from the same cluster")272}273return nil274}
275
276func (ec *serviceExportCacheImpl) checkDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) error {277if !ec.isDiscoverableFromDifferentCluster(ep) {278return fmt.Errorf("endpoint was not discoverable from a different cluster")279}280return nil281}
282
283func (ec *serviceExportCacheImpl) checkNotDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) error {284if ec.isDiscoverableFromDifferentCluster(ep) {285return fmt.Errorf("endpoint was discoverable from a different cluster")286}287return nil288}
289
290func (ec *serviceExportCacheImpl) isDiscoverableFromSameCluster(ep *model.IstioEndpoint) bool {291return ep.IsDiscoverableFromProxy(&model.Proxy{292Metadata: &model.NodeMetadata{293ClusterID: ec.Cluster(),294},295})296}
297
298func (ec *serviceExportCacheImpl) isDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) bool {299return ep.IsDiscoverableFromProxy(&model.Proxy{300Metadata: &model.NodeMetadata{301ClusterID: "some-other-cluster",302},303})304}
305