istio

Форк
0
/
serviceexportcache_test.go 
304 строки · 9.1 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package controller
16

17
import (
18
	"context"
19
	"fmt"
20
	"testing"
21

22
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24
	"k8s.io/apimachinery/pkg/runtime/schema"
25
	"k8s.io/apimachinery/pkg/types"
26
	mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
27

28
	"istio.io/istio/pilot/pkg/features"
29
	"istio.io/istio/pilot/pkg/model"
30
	"istio.io/istio/pilot/pkg/serviceregistry/kube"
31
	"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
32
	"istio.io/istio/pkg/config/host"
33
	"istio.io/istio/pkg/kube/mcs"
34
	"istio.io/istio/pkg/maps"
35
	"istio.io/istio/pkg/slices"
36
	istiotest "istio.io/istio/pkg/test"
37
	"istio.io/istio/pkg/test/util/retry"
38
)
39

40
const (
41
	serviceExportName      = "test-svc"
42
	serviceExportNamespace = "test-ns"
43
	serviceExportPodIP     = "128.0.0.2"
44
	testCluster            = "test-cluster"
45
)
46

47
var serviceExportNamespacedName = types.NamespacedName{
48
	Namespace: serviceExportNamespace,
49
	Name:      serviceExportName,
50
}
51

52
type ClusterLocalMode string
53

54
func (m ClusterLocalMode) String() string {
55
	return string(m)
56
}
57

58
const (
59
	alwaysClusterLocal ClusterLocalMode = "always cluster local"
60
	meshWide           ClusterLocalMode = "mesh wide"
61
)
62

63
var ClusterLocalModes = []ClusterLocalMode{alwaysClusterLocal, meshWide}
64

65
func TestServiceNotExported(t *testing.T) {
66
	for _, clusterLocalMode := range ClusterLocalModes {
67
		t.Run(clusterLocalMode.String(), func(t *testing.T) {
68
			// Create and run the controller.
69
			ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)
70
			// Check that the endpoint is cluster-local
71
			ec.checkServiceInstancesOrFail(t, false, endpoints)
72
		})
73
	}
74
}
75

76
func TestServiceExported(t *testing.T) {
77
	for _, clusterLocalMode := range ClusterLocalModes {
78
		t.Run(clusterLocalMode.String(), func(t *testing.T) {
79
			// Create and run the controller.
80
			ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)
81
			// Export the service.
82
			ec.export(t)
83

84
			// Check that the endpoint is mesh-wide
85
			ec.checkServiceInstancesOrFail(t, true, endpoints)
86
		})
87
	}
88
}
89

90
func TestServiceUnexported(t *testing.T) {
91
	for _, clusterLocalMode := range ClusterLocalModes {
92
		t.Run(clusterLocalMode.String(), func(t *testing.T) {
93
			// Create and run the controller.
94
			ec, endpoints := newTestServiceExportCache(t, clusterLocalMode)
95
			// Export the service and then unexport it immediately.
96
			ec.export(t)
97
			ec.unExport(t)
98

99
			// Check that the endpoint is cluster-local
100
			ec.checkServiceInstancesOrFail(t, false, endpoints)
101
		})
102
	}
103
}
104

105
func newServiceExport() *unstructured.Unstructured {
106
	se := &mcsapi.ServiceExport{
107
		TypeMeta: metav1.TypeMeta{
108
			Kind:       "ServiceExport",
109
			APIVersion: mcs.MCSSchemeGroupVersion.String(),
110
		},
111
		ObjectMeta: metav1.ObjectMeta{
112
			Name:      serviceExportName,
113
			Namespace: serviceExportNamespace,
114
		},
115
	}
116
	return toUnstructured(se)
117
}
118

119
func newTestServiceExportCache(t *testing.T, clusterLocalMode ClusterLocalMode) (*serviceExportCacheImpl, *model.EndpointIndex) {
120
	t.Helper()
121

122
	istiotest.SetForTest(t, &features.EnableMCSServiceDiscovery, true)
123
	istiotest.SetForTest(t, &features.EnableMCSClusterLocal, clusterLocalMode == alwaysClusterLocal)
124

125
	c, _ := NewFakeControllerWithOptions(t, FakeControllerOptions{
126
		ClusterID: testCluster,
127
		CRDs:      []schema.GroupVersionResource{mcs.ServiceExportGVR},
128
	})
129

130
	// Create the test service and endpoints.
131
	createService(c, serviceExportName, serviceExportNamespace, map[string]string{}, map[string]string{},
132
		[]int32{8080}, map[string]string{"app": "prod-app"}, t)
133
	createEndpoints(t, c, serviceExportName, serviceExportNamespace, []string{"tcp-port"}, []string{serviceExportPodIP}, nil, nil)
134

135
	ec := c.exports.(*serviceExportCacheImpl)
136
	// Wait for the resources to be processed by the controller.
137
	retry.UntilOrFail(t, func() bool {
138
		if svc := ec.GetService(ec.serviceHostname()); svc == nil {
139
			return false
140
		}
141
		inst := ec.getEndpoint(c.Endpoints)
142
		return inst != nil
143
	}, serviceExportTimeout)
144
	return ec, c.Endpoints
145
}
146

147
func (ec *serviceExportCacheImpl) serviceHostname() host.Name {
148
	return kube.ServiceHostname(serviceExportName, serviceExportNamespace, ec.opts.DomainSuffix)
149
}
150

151
func (ec *serviceExportCacheImpl) export(t *testing.T) {
152
	t.Helper()
153

154
	_, err := ec.client.Dynamic().Resource(mcs.ServiceExportGVR).Namespace(serviceExportNamespace).Create(context.TODO(),
155
		newServiceExport(),
156
		metav1.CreateOptions{})
157
	if err != nil {
158
		t.Fatal(err)
159
	}
160

161
	// Wait for the export to be processed by the controller.
162
	retry.UntilOrFail(t, func() bool {
163
		return ec.isExported(serviceExportNamespacedName)
164
	}, serviceExportTimeout, retry.Message("expected to be exported"))
165

166
	// Wait for the XDS event.
167
	ec.waitForXDS(t, true)
168
}
169

170
func (ec *serviceExportCacheImpl) unExport(t *testing.T) {
171
	t.Helper()
172

173
	_ = ec.client.Dynamic().Resource(mcs.ServiceExportGVR).Namespace(serviceExportNamespace).Delete(
174
		context.TODO(),
175
		serviceExportName,
176
		metav1.DeleteOptions{})
177

178
	// Wait for the delete to be processed by the controller.
179
	retry.UntilOrFail(t, func() bool {
180
		return !ec.isExported(serviceExportNamespacedName)
181
	}, serviceExportTimeout)
182

183
	// Wait for the XDS event.
184
	ec.waitForXDS(t, false)
185
}
186

187
func (ec *serviceExportCacheImpl) waitForXDS(t *testing.T, exported bool) {
188
	t.Helper()
189
	retry.UntilSuccessOrFail(t, func() error {
190
		event := ec.opts.XDSUpdater.(*xdsfake.Updater).WaitOrFail(t, "eds")
191
		if len(event.Endpoints) != 1 {
192
			return fmt.Errorf("waitForXDS failed: expected 1 endpoint, found %d", len(event.Endpoints))
193
		}
194

195
		hostName := host.Name(event.ID)
196
		svc := ec.GetService(hostName)
197
		if svc == nil {
198
			return fmt.Errorf("unable to find service for host %s", hostName)
199
		}
200
		return ec.checkEndpoint(exported, event.Endpoints[0])
201
	}, serviceExportTimeout)
202
}
203

204
func (ec *serviceExportCacheImpl) getEndpoint(endpoints *model.EndpointIndex) *model.IstioEndpoint {
205
	svcs := ec.Services()
206
	for _, s := range svcs {
207
		ep := GetEndpoints(s, endpoints)
208
		if len(ep) > 0 {
209
			return ep[0]
210
		}
211
	}
212
	return nil
213
}
214

215
func GetEndpoints(s *model.Service, endpoints *model.EndpointIndex) []*model.IstioEndpoint {
216
	return GetEndpointsForPort(s, endpoints, 0)
217
}
218

219
func GetEndpointsForPort(s *model.Service, endpoints *model.EndpointIndex, port int) []*model.IstioEndpoint {
220
	shards, ok := endpoints.ShardsForService(string(s.Hostname), s.Attributes.Namespace)
221
	if !ok {
222
		return nil
223
	}
224
	var pn string
225
	for _, p := range s.Ports {
226
		if p.Port == port {
227
			pn = p.Name
228
			break
229
		}
230
	}
231
	if pn == "" && port != 0 {
232
		return nil
233
	}
234
	shards.RLock()
235
	defer shards.RUnlock()
236
	return slices.FilterInPlace(slices.Flatten(maps.Values(shards.Shards)), func(endpoint *model.IstioEndpoint) bool {
237
		return pn == "" || endpoint.ServicePortName == pn
238
	})
239
}
240

241
func (ec *serviceExportCacheImpl) checkServiceInstancesOrFail(t *testing.T, exported bool, endpoints *model.EndpointIndex) {
242
	t.Helper()
243
	if err := ec.checkEndpoints(exported, endpoints); err != nil {
244
		t.Fatal(err)
245
	}
246
}
247

248
func (ec *serviceExportCacheImpl) checkEndpoints(exported bool, endpoints *model.EndpointIndex) error {
249
	ep := ec.getEndpoint(endpoints)
250
	if ep == nil {
251
		return fmt.Errorf("expected an endpoint, found none")
252
	}
253
	return ec.checkEndpoint(exported, ep)
254
}
255

256
func (ec *serviceExportCacheImpl) checkEndpoint(exported bool, ep *model.IstioEndpoint) error {
257
	// Should always be discoverable from the same cluster.
258
	if err := ec.checkDiscoverableFromSameCluster(ep); err != nil {
259
		return err
260
	}
261

262
	if exported && !features.EnableMCSClusterLocal {
263
		return ec.checkDiscoverableFromDifferentCluster(ep)
264
	}
265

266
	return ec.checkNotDiscoverableFromDifferentCluster(ep)
267
}
268

269
func (ec *serviceExportCacheImpl) checkDiscoverableFromSameCluster(ep *model.IstioEndpoint) error {
270
	if !ec.isDiscoverableFromSameCluster(ep) {
271
		return fmt.Errorf("endpoint was not discoverable from the same cluster")
272
	}
273
	return nil
274
}
275

276
func (ec *serviceExportCacheImpl) checkDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) error {
277
	if !ec.isDiscoverableFromDifferentCluster(ep) {
278
		return fmt.Errorf("endpoint was not discoverable from a different cluster")
279
	}
280
	return nil
281
}
282

283
func (ec *serviceExportCacheImpl) checkNotDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) error {
284
	if ec.isDiscoverableFromDifferentCluster(ep) {
285
		return fmt.Errorf("endpoint was discoverable from a different cluster")
286
	}
287
	return nil
288
}
289

290
func (ec *serviceExportCacheImpl) isDiscoverableFromSameCluster(ep *model.IstioEndpoint) bool {
291
	return ep.IsDiscoverableFromProxy(&model.Proxy{
292
		Metadata: &model.NodeMetadata{
293
			ClusterID: ec.Cluster(),
294
		},
295
	})
296
}
297

298
func (ec *serviceExportCacheImpl) isDiscoverableFromDifferentCluster(ep *model.IstioEndpoint) bool {
299
	return ep.IsDiscoverableFromProxy(&model.Proxy{
300
		Metadata: &model.NodeMetadata{
301
			ClusterID: "some-other-cluster",
302
		},
303
	})
304
}
305

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.