1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
23
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
24
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
26
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
27
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
28
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
29
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
30
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
31
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
32
"github.com/google/go-cmp/cmp"
33
"google.golang.org/grpc"
34
"google.golang.org/protobuf/testing/protocmp"
36
authn_model "istio.io/istio/pilot/pkg/security/model"
37
"istio.io/istio/pilot/pkg/util/protoconv"
38
v3 "istio.io/istio/pilot/pkg/xds/v3"
39
"istio.io/istio/pilot/test/xdstest"
40
"istio.io/istio/pkg/log"
41
"istio.io/istio/pkg/test/util/assert"
42
"istio.io/istio/pkg/test/util/retry"
45
type mockDeltaXdsServer struct{}
47
var deltaHandler func(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
49
func (t *mockDeltaXdsServer) StreamAggregatedResources(discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
53
func (t *mockDeltaXdsServer) DeltaAggregatedResources(delta discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
54
return deltaHandler(delta)
57
var testCluster = &cluster.Cluster{
59
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
60
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
61
EdsConfig: &core.ConfigSource{
62
ConfigSourceSpecifier: &core.ConfigSource_Ads{
63
Ads: &core.AggregatedConfigSource{},
67
LbPolicy: cluster.Cluster_ROUND_ROBIN,
68
TransportSocket: &core.TransportSocket{
69
Name: wellknown.TransportSocketTLS,
70
ConfigType: &core.TransportSocket_TypedConfig{
71
TypedConfig: protoconv.MessageToAny(&tls.UpstreamTlsContext{
72
CommonTlsContext: &tls.CommonTlsContext{
73
ValidationContextType: &tls.CommonTlsContext_CombinedValidationContext{
74
CombinedValidationContext: &tls.CommonTlsContext_CombinedCertificateValidationContext{
75
ValidationContextSdsSecretConfig: &tls.SdsSecretConfig{
76
Name: "kubernetes://test",
77
SdsConfig: authn_model.SDSAdsConfig,
87
var testClusterNoSecret = &cluster.Cluster{
89
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
90
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
91
EdsConfig: &core.ConfigSource{
92
ConfigSourceSpecifier: &core.ConfigSource_Ads{
93
Ads: &core.AggregatedConfigSource{},
97
LbPolicy: cluster.Cluster_ROUND_ROBIN,
100
var testListener = &listener.Listener{
101
Name: "test-listener",
102
Address: &core.Address{
103
Address: &core.Address_SocketAddress{
104
SocketAddress: &core.SocketAddress{
105
Protocol: core.SocketAddress_TCP,
107
PortSpecifier: &core.SocketAddress_PortValue{
113
FilterChains: []*listener.FilterChain{
115
Filters: []*listener.Filter{
117
Name: wellknown.HTTPConnectionManager,
118
ConfigType: &listener.Filter_TypedConfig{
119
TypedConfig: protoconv.MessageToAny(&hcm.HttpConnectionManager{
120
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
122
RouteConfigName: "test-rds-config",
123
ConfigSource: &core.ConfigSource{
124
ConfigSourceSpecifier: &core.ConfigSource_Ads{
125
Ads: &core.AggregatedConfigSource{},
134
TransportSocket: &core.TransportSocket{
135
Name: wellknown.TransportSocketTLS,
136
ConfigType: &core.TransportSocket_TypedConfig{
137
TypedConfig: protoconv.MessageToAny(&tls.DownstreamTlsContext{
138
CommonTlsContext: &tls.CommonTlsContext{
139
ValidationContextType: &tls.CommonTlsContext_CombinedValidationContext{
140
CombinedValidationContext: &tls.CommonTlsContext_CombinedCertificateValidationContext{
141
ValidationContextSdsSecretConfig: &tls.SdsSecretConfig{
142
Name: "kubernetes://test",
143
SdsConfig: authn_model.SDSAdsConfig,
155
var testListenerNoSecret = &listener.Listener{
156
Name: "test-listener",
157
Address: &core.Address{
158
Address: &core.Address_SocketAddress{
159
SocketAddress: &core.SocketAddress{
160
Protocol: core.SocketAddress_TCP,
162
PortSpecifier: &core.SocketAddress_PortValue{
168
FilterChains: []*listener.FilterChain{
170
Filters: []*listener.Filter{
172
Name: wellknown.HTTPConnectionManager,
173
ConfigType: &listener.Filter_TypedConfig{
174
TypedConfig: protoconv.MessageToAny(&hcm.HttpConnectionManager{
175
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
177
RouteConfigName: "test-rds-config",
178
ConfigSource: &core.ConfigSource{
179
ConfigSourceSpecifier: &core.ConfigSource_Ads{
180
Ads: &core.AggregatedConfigSource{},
193
var testRouteConfig = &route.RouteConfiguration{
195
// Define the route entries here
196
VirtualHosts: []*route.VirtualHost{
199
Domains: []string{"*"},
200
Routes: []*route.Route{
202
Match: &route.RouteMatch{
203
PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/"},
205
Action: &route.Route_Route{
206
Route: &route.RouteAction{
207
ClusterSpecifier: &route.RouteAction_Cluster{Cluster: "test-cluster"},
216
func TestDeltaClient(t *testing.T) {
217
type testCase struct {
219
deltaHandler func(server discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
221
expectedDeltaResources *Client
227
clusterHandler := Register(func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity *cluster.Cluster, event Event) {
228
if event == EventDelete {
231
ctx.RegisterDependency(v3.SecretType, xdstest.ExtractClusterSecretResources(t, resourceEntity)...)
232
ctx.RegisterDependency(v3.EndpointType, xdstest.ExtractEdsClusterNames([]*cluster.Cluster{resourceEntity})...)
234
endpointsHandler := Register(func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity *endpoint.ClusterLoadAssignment,
237
listenerHandler := Register(func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity *listener.Listener, event Event) {
238
if event == EventDelete {
241
ctx.RegisterDependency(v3.SecretType, xdstest.ExtractListenerSecretResources(t, resourceEntity)...)
242
ctx.RegisterDependency(v3.RouteType, xdstest.ExtractRoutesFromListeners([]*listener.Listener{resourceEntity})...)
245
routesHandler := Register(func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity *route.RouteConfiguration, event Event) {
247
secretsHandler := Register(func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity *tls.Secret, event Event) {
250
handlers := []Option{
252
Watch[*cluster.Cluster]("*"),
254
Watch[*listener.Listener]("*"),
263
serverResponses []*discovery.DeltaDiscoveryResponse
267
desc: "initial request cluster with no secret",
268
serverResponses: []*discovery.DeltaDiscoveryResponse{
270
TypeUrl: v3.ClusterType,
271
Resources: []*discovery.Resource{
274
Resource: protoconv.MessageToAny(testClusterNoSecret),
286
desc: "initial request cluster with secret",
287
serverResponses: []*discovery.DeltaDiscoveryResponse{
289
TypeUrl: v3.ClusterType,
290
Resources: []*discovery.Resource{
293
Resource: protoconv.MessageToAny(testCluster),
301
SDS/kubernetes://test:
306
desc: "initial request listener with no secret",
307
serverResponses: []*discovery.DeltaDiscoveryResponse{
309
TypeUrl: v3.ListenerType,
310
Resources: []*discovery.Resource{
312
Name: "test-listener",
313
Resource: protoconv.MessageToAny(testListenerNoSecret),
325
desc: "initial request listener with secret",
326
serverResponses: []*discovery.DeltaDiscoveryResponse{
328
TypeUrl: v3.ListenerType,
329
Resources: []*discovery.Resource{
331
Name: "test-listener",
332
Resource: protoconv.MessageToAny(testListener),
341
SDS/kubernetes://test:
345
desc: "put things together",
346
serverResponses: []*discovery.DeltaDiscoveryResponse{
348
TypeUrl: v3.ClusterType,
349
Resources: []*discovery.Resource{
352
Resource: protoconv.MessageToAny(testCluster),
357
TypeUrl: v3.ListenerType,
358
Resources: []*discovery.Resource{
360
Name: "test-listener",
361
Resource: protoconv.MessageToAny(testListener),
366
TypeUrl: v3.RouteType,
367
Resources: []*discovery.Resource{
370
Resource: protoconv.MessageToAny(testRouteConfig),
378
SDS/kubernetes://test:
382
SDS/kubernetes://test:
388
desc: "begin two clusters then remove one",
389
serverResponses: []*discovery.DeltaDiscoveryResponse{
391
TypeUrl: v3.ClusterType,
392
Resources: []*discovery.Resource{
394
Name: "test-cluster1",
395
Resource: protoconv.MessageToAny(testCluster),
398
Name: "test-cluster2",
399
Resource: protoconv.MessageToAny(testClusterNoSecret),
404
TypeUrl: v3.ClusterType,
405
Resources: []*discovery.Resource{},
406
RemovedResources: []string{"test-cluster2"},
412
SDS/kubernetes://test:
417
for _, item := range descs {
418
desc := item // avoid refer to on-stack-var
419
expected := make(map[string]*discovery.DeltaDiscoveryResponse)
420
for _, response := range item.serverResponses {
421
expected[response.TypeUrl] = response
425
inClient: NewDeltaWithBackoffPolicy("", &DeltaADSConfig{}, nil),
426
deltaHandler: func(delta discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
427
for _, response := range desc.serverResponses {
428
_ = delta.Send(response)
432
expectedDeltaResources: &Client{
433
lastReceived: expected,
435
expectedTree: desc.expectedTree,
437
tests = append(tests, tc)
440
for _, tt := range tests {
441
t.Run(tt.desc, func(t *testing.T) {
442
deltaHandler = tt.deltaHandler
443
l, err := net.Listen("tcp", ":0")
445
t.Errorf("Unable to listen with tcp err %v", err)
448
tt.inClient.cfg.Address = l.Addr().String()
449
xds := grpc.NewServer()
450
discovery.RegisterAggregatedDiscoveryServiceServer(xds, new(mockDeltaXdsServer))
457
defer xds.GracefulStop()
459
t.Errorf("Could not start serving ads server %v", err)
463
tt.inClient = NewDeltaWithBackoffPolicy(tt.inClient.cfg.Address, tt.inClient.cfg, nil, handlers...)
464
if err := tt.inClient.Run(context.TODO()); err != nil {
465
t.Errorf("ADSC: failed running %v", err)
468
assert.EventuallyEqual(t, func() bool {
469
tt.inClient.mutex.Lock()
470
defer tt.inClient.mutex.Unlock()
471
rec := tt.inClient.lastReceived
473
if rec == nil && len(rec) != len(tt.expectedDeltaResources.lastReceived) {
476
for tpe, rsrcs := range tt.expectedDeltaResources.lastReceived {
477
if _, ok := rec[tpe]; !ok {
480
if len(rsrcs.Resources) != len(rec[tpe].Resources) {
485
}, true, retry.Timeout(time.Second), retry.Delay(time.Millisecond))
487
if !cmp.Equal(tt.inClient.lastReceived, tt.expectedDeltaResources.lastReceived, protocmp.Transform()) {
488
t.Errorf("%s: expected recv %v got %v", tt.desc, tt.expectedDeltaResources.lastReceived, tt.inClient.lastReceived)
491
tree := tt.inClient.dumpTree()
492
if diff := cmp.Diff(tt.expectedTree, tree); diff != "" {
493
t.Errorf("%s: expected tree %v got %v", tt.desc, tt.expectedTree, tree)