istio

Форк
0
/
adsc.go 
1282 строки · 35.0 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package adsc
16

17
import (
18
	"context"
19
	"crypto/tls"
20
	"crypto/x509"
21
	"encoding/json"
22
	"fmt"
23
	"math"
24
	"net"
25
	"os"
26
	"sort"
27
	"strings"
28
	"sync"
29
	"time"
30

31
	cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
32
	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
33
	endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
34
	listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
35
	route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
36
	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
37
	"github.com/envoyproxy/go-control-plane/pkg/conversion"
38
	"google.golang.org/grpc"
39
	"google.golang.org/grpc/credentials"
40
	"google.golang.org/grpc/credentials/insecure"
41
	"google.golang.org/protobuf/proto"
42
	anypb "google.golang.org/protobuf/types/known/anypb"
43
	pstruct "google.golang.org/protobuf/types/known/structpb"
44

45
	mcp "istio.io/api/mcp/v1alpha1"
46
	"istio.io/api/mesh/v1alpha1"
47
	mem "istio.io/istio/pilot/pkg/config/memory"
48
	"istio.io/istio/pilot/pkg/model"
49
	"istio.io/istio/pilot/pkg/networking/util"
50
	"istio.io/istio/pilot/pkg/serviceregistry/memory"
51
	"istio.io/istio/pilot/pkg/util/network"
52
	v3 "istio.io/istio/pilot/pkg/xds/v3"
53
	"istio.io/istio/pkg/backoff"
54
	"istio.io/istio/pkg/config"
55
	"istio.io/istio/pkg/config/constants"
56
	"istio.io/istio/pkg/config/schema/collections"
57
	"istio.io/istio/pkg/config/schema/gvk"
58
	"istio.io/istio/pkg/log"
59
	"istio.io/istio/pkg/security"
60
	"istio.io/istio/pkg/util/protomarshal"
61
	"istio.io/istio/pkg/util/sets"
62
	"istio.io/istio/pkg/wellknown"
63
)
64

65
const (
66
	defaultClientMaxReceiveMessageSize = math.MaxInt32
67
	defaultInitialConnWindowSize       = 1024 * 1024 // default gRPC InitialWindowSize
68
	defaultInitialWindowSize           = 1024 * 1024 // default gRPC ConnWindowSize
69
)
70

71
type Config struct {
72
	// Address of the xDS server
73
	Address string
74

75
	// Namespace defaults to 'default'
76
	Namespace string
77

78
	// Workload defaults to 'test'
79
	Workload string
80

81
	// Revision for this control plane instance. We will only read configs that match this revision.
82
	Revision string
83

84
	// Meta includes additional metadata for the node
85
	Meta *pstruct.Struct
86

87
	Locality *core.Locality
88

89
	// NodeType defaults to sidecar. "ingress" and "router" are also supported.
90
	NodeType model.NodeType
91

92
	// IP is currently the primary key used to locate inbound configs. It is sent by client,
93
	// must match a known endpoint IP. Tests can use a ServiceEntry to register fake IPs.
94
	IP string
95

96
	// CertDir is the directory where mTLS certs are configured.
97
	// If CertDir and Secret are empty, an insecure connection will be used.
98
	// TODO: implement SecretManager for cert dir
99
	CertDir string
100

101
	// Secrets is the interface used for getting keys and rootCA.
102
	SecretManager security.SecretManager
103

104
	// For getting the certificate, using same code as SDS server.
105
	// Either the JWTPath or the certs must be present.
106
	JWTPath string
107

108
	// XDSSAN is the expected SAN of the XDS server. If not set, the ProxyConfig.DiscoveryAddress is used.
109
	XDSSAN string
110

111
	// XDSRootCAFile explicitly set the root CA to be used for the XDS connection.
112
	// Mirrors Envoy file.
113
	XDSRootCAFile string
114

115
	// RootCert contains the XDS root certificate. Used mainly for tests, apps will normally use
116
	// XDSRootCAFile
117
	RootCert []byte
118

119
	// InsecureSkipVerify skips client verification the server's certificate chain and host name.
120
	InsecureSkipVerify bool
121

122
	// BackoffPolicy determines the reconnect policy. Based on MCP client.
123
	BackoffPolicy backoff.BackOff
124

125
	GrpcOpts []grpc.DialOption
126
}
127

128
// ADSConfig for the ADS connection.
129
type ADSConfig struct {
130
	Config
131

132
	// InitialDiscoveryRequests is a list of resources to watch at first, represented as URLs (for new XDS resource naming)
133
	// or type URLs.
134
	InitialDiscoveryRequests []*discovery.DiscoveryRequest
135

136
	// ResponseHandler will be called on each DiscoveryResponse.
137
	// TODO: mirror Generator, allow adding handler per type
138
	ResponseHandler ResponseHandler
139
}
140

141
func defaultGrpcDialOptions() []grpc.DialOption {
142
	return []grpc.DialOption{
143
		// TODO(SpecialYang) maybe need to make it configurable.
144
		grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)),
145
		grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)),
146
		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)),
147
	}
148
}
149

150
// ADSC implements a basic client for ADS, for use in stress tests and tools
151
// or libraries that need to connect to Istio pilot or other ADS servers.
152
type ADSC struct {
153
	// Stream is the GRPC connection stream, allowing direct GRPC send operations.
154
	// Set after Dial is called.
155
	stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
156
	// xds client used to create a stream
157
	client discovery.AggregatedDiscoveryServiceClient
158
	conn   *grpc.ClientConn
159

160
	// Indicates if the ADSC client is closed
161
	closed bool
162

163
	// NodeID is the node identity sent to Pilot.
164
	nodeID string
165

166
	watchTime time.Time
167

168
	// initialLoad tracks the time to receive the initial configuration.
169
	initialLoad time.Duration
170

171
	// indicates if the initial LDS request is sent
172
	initialLds bool
173

174
	// httpListeners contains received listeners with a http_connection_manager filter.
175
	httpListeners map[string]*listener.Listener
176

177
	// tcpListeners contains all listeners of type TCP (not-HTTP)
178
	tcpListeners map[string]*listener.Listener
179

180
	// All received clusters of type eds, keyed by name
181
	edsClusters map[string]*cluster.Cluster
182

183
	// All received clusters of no-eds type, keyed by name
184
	clusters map[string]*cluster.Cluster
185

186
	// All received routes, keyed by route name
187
	routes map[string]*route.RouteConfiguration
188

189
	// All received endpoints, keyed by cluster name
190
	eds map[string]*endpoint.ClusterLoadAssignment
191

192
	// Metadata has the node metadata to send to pilot.
193
	// If nil, the defaults will be used.
194
	Metadata *pstruct.Struct
195

196
	// Updates includes the type of the last update received from the server.
197
	Updates     chan string
198
	errChan     chan error
199
	XDSUpdates  chan *discovery.DiscoveryResponse
200
	VersionInfo map[string]string
201

202
	// Last received message, by type
203
	Received map[string]*discovery.DiscoveryResponse
204

205
	mutex sync.RWMutex
206

207
	Mesh *v1alpha1.MeshConfig
208

209
	// Retrieved configurations can be stored using the common istio model interface.
210
	Store model.ConfigStore
211

212
	// Retrieved endpoints can be stored in the memory registry. This is used for CDS and EDS responses.
213
	Registry *memory.ServiceDiscovery
214

215
	// LocalCacheDir is set to a base name used to save fetched resources.
216
	// If set, each update will be saved.
217
	// TODO: also load at startup - so we can support warm up in init-container, and survive
218
	// restarts.
219
	LocalCacheDir string
220

221
	cfg *ADSConfig
222

223
	// sendNodeMeta is set to true if the connection is new - and we need to send node meta.,
224
	sendNodeMeta bool
225

226
	sync     map[string]time.Time
227
	Locality *core.Locality
228
}
229

230
type ResponseHandler interface {
231
	HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
232
}
233

234
// jsonMarshalProtoWithName wraps a proto.Message with name so it can be marshaled with the standard encoding/json library
235
type jsonMarshalProtoWithName struct {
236
	Name    string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
237
	Message proto.Message
238
}
239

240
func (p jsonMarshalProtoWithName) MarshalJSON() ([]byte, error) {
241
	strSer, serr := protomarshal.ToJSONWithIndent(p.Message, "  ")
242
	if serr != nil {
243
		adscLog.Warnf("Error for marshaling [%s]: %v", p.Name, serr)
244
		return []byte(""), serr
245
	}
246
	serialItem := []byte("{\"" + p.Name + "\":" + strSer + "}")
247
	return serialItem, nil
248
}
249

250
var adscLog = log.RegisterScope("adsc", "adsc debugging")
251

252
func NewWithBackoffPolicy(discoveryAddr string, opts *ADSConfig, backoffPolicy backoff.BackOff) (*ADSC, error) {
253
	adsc, err := New(discoveryAddr, opts)
254
	if err != nil {
255
		return nil, err
256
	}
257
	adsc.cfg.BackoffPolicy = backoffPolicy
258
	return adsc, err
259
}
260

261
// New creates a new ADSC, maintaining a connection to an XDS server.
262
// Will:
263
// - get certificate using the Secret provider, if CertRequired
264
// - connect to the XDS server specified in ProxyConfig
265
// - send initial request for watched resources
266
// - wait for response from XDS server
267
// - on success, start a background thread to maintain the connection, with exp. backoff.
268
func New(discoveryAddr string, opts *ADSConfig) (*ADSC, error) {
269
	if opts == nil {
270
		opts = &ADSConfig{}
271
	}
272
	opts.Config = setDefaultConfig(&opts.Config)
273
	opts.Address = discoveryAddr
274
	adsc := &ADSC{
275
		Updates:     make(chan string, 100),
276
		XDSUpdates:  make(chan *discovery.DiscoveryResponse, 100),
277
		VersionInfo: map[string]string{},
278
		Received:    map[string]*discovery.DiscoveryResponse{},
279
		cfg:         opts,
280
		sync:        map[string]time.Time{},
281
		errChan:     make(chan error, 10),
282
	}
283

284
	adsc.Metadata = opts.Meta
285
	adsc.Locality = opts.Locality
286

287
	adsc.nodeID = nodeID(&adsc.cfg.Config)
288

289
	if err := adsc.Dial(); err != nil {
290
		return nil, err
291
	}
292

293
	return adsc, nil
294
}
295

296
func setDefaultConfig(config *Config) Config {
297
	if config == nil {
298
		config = &Config{}
299
	}
300
	if config.Namespace == "" {
301
		config.Namespace = "default"
302
	}
303
	if config.NodeType == "" {
304
		config.NodeType = model.SidecarProxy
305
	}
306
	if config.IP == "" {
307
		ips, ok := network.GetPrivateIPsIfAvailable()
308
		if ok && len(ips) > 0 {
309
			config.IP = ips[0]
310
		}
311
	}
312
	if config.Workload == "" {
313
		config.Workload = "test-1"
314
	}
315
	if config.BackoffPolicy == nil {
316
		config.BackoffPolicy = backoff.NewExponentialBackOff(backoff.DefaultOption())
317
	}
318
	return *config
319
}
320

321
// Dial connects to a ADS server, with optional MTLS authentication if a cert dir is specified.
322
func (a *ADSC) Dial() error {
323
	conn, err := dialWithConfig(&a.cfg.Config)
324
	if err != nil {
325
		return err
326
	}
327
	a.conn = conn
328
	return nil
329
}
330

331
func dialWithConfig(config *Config) (*grpc.ClientConn, error) {
332
	defaultGrpcDialOptions := defaultGrpcDialOptions()
333
	var grpcDialOptions []grpc.DialOption
334
	grpcDialOptions = append(grpcDialOptions, defaultGrpcDialOptions...)
335
	grpcDialOptions = append(grpcDialOptions, config.GrpcOpts...)
336

337
	var err error
338
	// If we need MTLS - CertDir or Secrets provider is set.
339
	if len(config.CertDir) > 0 || config.SecretManager != nil {
340
		tlsCfg, err := tlsConfig(config)
341
		if err != nil {
342
			return nil, err
343
		}
344
		creds := credentials.NewTLS(tlsCfg)
345
		grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(creds))
346
	}
347

348
	if len(grpcDialOptions) == len(defaultGrpcDialOptions) {
349
		// Only disable transport security if the user didn't supply custom dial options
350
		grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
351
	}
352

353
	conn, err := grpc.Dial(config.Address, grpcDialOptions...)
354
	if err != nil {
355
		return nil, err
356
	}
357
	return conn, nil
358
}
359

360
func tlsConfig(config *Config) (*tls.Config, error) {
361
	var clientCerts []tls.Certificate
362
	var serverCABytes []byte
363
	var err error
364

365
	getClientCertificate := getClientCertFn(config)
366

367
	// Load the root CAs
368
	if config.RootCert != nil {
369
		serverCABytes = config.RootCert
370
	} else if config.XDSRootCAFile != "" {
371
		serverCABytes, err = os.ReadFile(config.XDSRootCAFile)
372
		if err != nil {
373
			return nil, err
374
		}
375
	} else if config.SecretManager != nil {
376
		// This is a bit crazy - we could just use the file
377
		rootCA, err := config.SecretManager.GenerateSecret(security.RootCertReqResourceName)
378
		if err != nil {
379
			return nil, err
380
		}
381

382
		serverCABytes = rootCA.RootCert
383
	} else if config.CertDir != "" {
384
		serverCABytes, err = os.ReadFile(config.CertDir + "/root-cert.pem")
385
		if err != nil {
386
			return nil, err
387
		}
388
	}
389

390
	serverCAs := x509.NewCertPool()
391
	if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
392
		return nil, err
393
	}
394

395
	shost, _, _ := net.SplitHostPort(config.Address)
396
	if config.XDSSAN != "" {
397
		shost = config.XDSSAN
398
	}
399

400
	// nolint: gosec
401
	// it's insecure only when a user explicitly enable insecure mode.
402
	return &tls.Config{
403
		GetClientCertificate: getClientCertificate,
404
		Certificates:         clientCerts,
405
		RootCAs:              serverCAs,
406
		ServerName:           shost,
407
		InsecureSkipVerify:   config.InsecureSkipVerify,
408
	}, nil
409
}
410

411
// Close the stream.
412
func (a *ADSC) Close() {
413
	a.mutex.Lock()
414
	_ = a.conn.Close()
415
	a.closed = true
416
	a.mutex.Unlock()
417
}
418

419
// Run will create a new stream using the existing grpc client connection and send the initial xds requests.
420
// And then it will run a go routine receiving and handling xds response.
421
// Note: it is non blocking
422
func (a *ADSC) Run() error {
423
	var err error
424
	a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
425
	a.stream, err = a.client.StreamAggregatedResources(context.Background())
426
	if err != nil {
427
		return err
428
	}
429
	a.sendNodeMeta = true
430
	a.initialLoad = 0
431
	a.initialLds = false
432
	// Send the initial requests
433
	for _, r := range a.cfg.InitialDiscoveryRequests {
434
		if r.TypeUrl == v3.ClusterType {
435
			a.watchTime = time.Now()
436
		}
437
		_ = a.Send(r)
438
	}
439

440
	go a.handleRecv()
441
	return nil
442
}
443

444
// HasSynced returns true if MCP configs have synced
445
func (a *ADSC) HasSynced() bool {
446
	if a.cfg == nil || len(a.cfg.InitialDiscoveryRequests) == 0 {
447
		return true
448
	}
449

450
	a.mutex.RLock()
451
	defer a.mutex.RUnlock()
452

453
	for _, req := range a.cfg.InitialDiscoveryRequests {
454
		_, isMCP := convertTypeURLToMCPGVK(req.TypeUrl)
455
		if !isMCP {
456
			continue
457
		}
458

459
		if _, ok := a.sync[req.TypeUrl]; !ok {
460
			return false
461
		}
462
	}
463

464
	return true
465
}
466

467
// reconnect will create a new stream
468
func (a *ADSC) reconnect() {
469
	a.mutex.RLock()
470
	if a.closed {
471
		a.mutex.RUnlock()
472
		return
473
	}
474
	a.mutex.RUnlock()
475

476
	err := a.Run()
477
	if err != nil {
478
		// TODO: fix reconnect
479
		time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
480
	}
481
}
482

483
func (a *ADSC) handleRecv() {
484
	// We connected, so reset the backoff
485
	if a.cfg.BackoffPolicy != nil {
486
		a.cfg.BackoffPolicy.Reset()
487
	}
488
	for {
489
		var err error
490
		msg, err := a.stream.Recv()
491
		if err != nil {
492
			adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
493
			select {
494
			case a.errChan <- err:
495
			default:
496
			}
497
			// if 'reconnect' enabled - schedule a new Run
498
			if a.cfg.BackoffPolicy != nil {
499
				time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
500
			} else {
501
				a.Close()
502
				a.WaitClear()
503
				a.Updates <- ""
504
				a.XDSUpdates <- nil
505
				close(a.errChan)
506
			}
507
			return
508
		}
509

510
		// Group-value-kind - used for high level api generator.
511
		resourceGvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)
512

513
		adscLog.WithLabels("type", msg.TypeUrl, "count", len(msg.Resources), "nonce", msg.Nonce).Info("Received")
514
		if a.cfg.ResponseHandler != nil {
515
			a.cfg.ResponseHandler.HandleResponse(a, msg)
516
		}
517

518
		if msg.TypeUrl == gvk.MeshConfig.String() &&
519
			len(msg.Resources) > 0 {
520
			rsc := msg.Resources[0]
521
			m := &v1alpha1.MeshConfig{}
522
			err = proto.Unmarshal(rsc.Value, m)
523
			if err != nil {
524
				adscLog.Warnf("Failed to unmarshal mesh config: %v", err)
525
			}
526
			a.Mesh = m
527
			if a.LocalCacheDir != "" {
528
				strResponse, err := protomarshal.ToJSONWithIndent(m, "  ")
529
				if err != nil {
530
					continue
531
				}
532
				err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
533
				if err != nil {
534
					continue
535
				}
536
			}
537
			continue
538
		}
539

540
		// Process the resources.
541
		a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
542
		switch msg.TypeUrl {
543
		case v3.ListenerType:
544
			listeners := make([]*listener.Listener, 0, len(msg.Resources))
545
			for _, rsc := range msg.Resources {
546
				valBytes := rsc.Value
547
				ll := &listener.Listener{}
548
				_ = proto.Unmarshal(valBytes, ll)
549
				listeners = append(listeners, ll)
550
			}
551
			a.handleLDS(listeners)
552
		case v3.ClusterType:
553
			clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
554
			for _, rsc := range msg.Resources {
555
				valBytes := rsc.Value
556
				cl := &cluster.Cluster{}
557
				_ = proto.Unmarshal(valBytes, cl)
558
				clusters = append(clusters, cl)
559
			}
560
			a.handleCDS(clusters)
561
		case v3.EndpointType:
562
			eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
563
			for _, rsc := range msg.Resources {
564
				valBytes := rsc.Value
565
				el := &endpoint.ClusterLoadAssignment{}
566
				_ = proto.Unmarshal(valBytes, el)
567
				eds = append(eds, el)
568
			}
569
			a.handleEDS(eds)
570
		case v3.RouteType:
571
			routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
572
			for _, rsc := range msg.Resources {
573
				valBytes := rsc.Value
574
				rl := &route.RouteConfiguration{}
575
				_ = proto.Unmarshal(valBytes, rl)
576
				routes = append(routes, rl)
577
			}
578
			a.handleRDS(routes)
579
		default:
580
			if isMCP {
581
				a.handleMCP(resourceGvk, msg.Resources)
582
			}
583
		}
584

585
		// If we got no resource - still save to the store with empty name/namespace, to notify sync
586
		// This scheme also allows us to chunk large responses !
587

588
		// TODO: add hook to inject nacks
589

590
		a.mutex.Lock()
591
		if isMCP {
592
			if _, exist := a.sync[resourceGvk.String()]; !exist {
593
				a.sync[resourceGvk.String()] = time.Now()
594
			}
595
		}
596
		a.Received[msg.TypeUrl] = msg
597
		a.ack(msg)
598
		a.mutex.Unlock()
599

600
		select {
601
		case a.XDSUpdates <- msg:
602
		default:
603
		}
604
	}
605
}
606

607
func (a *ADSC) mcpToPilot(m *mcp.Resource) (*config.Config, error) {
608
	if m == nil || m.Metadata == nil {
609
		return &config.Config{}, nil
610
	}
611
	c := &config.Config{
612
		Meta: config.Meta{
613
			ResourceVersion: m.Metadata.Version,
614
			Labels:          m.Metadata.Labels,
615
			Annotations:     m.Metadata.Annotations,
616
		},
617
	}
618

619
	if !config.ObjectInRevision(c, a.cfg.Revision) { // In case upstream does not support rev in node meta.
620
		return nil, nil
621
	}
622

623
	if c.Meta.Annotations == nil {
624
		c.Meta.Annotations = make(map[string]string)
625
	}
626
	nsn := strings.Split(m.Metadata.Name, "/")
627
	if len(nsn) != 2 {
628
		return nil, fmt.Errorf("invalid name %s", m.Metadata.Name)
629
	}
630
	c.Namespace = nsn[0]
631
	c.Name = nsn[1]
632
	var err error
633
	c.CreationTimestamp = m.Metadata.CreateTime.AsTime()
634

635
	pb, err := m.Body.UnmarshalNew()
636
	if err != nil {
637
		return nil, err
638
	}
639
	c.Spec = pb
640
	return c, nil
641
}
642

643
// nolint: staticcheck
644
func (a *ADSC) handleLDS(ll []*listener.Listener) {
645
	lh := map[string]*listener.Listener{}
646
	lt := map[string]*listener.Listener{}
647

648
	routes := []string{}
649
	ldsSize := 0
650

651
	for _, l := range ll {
652
		ldsSize += proto.Size(l)
653

654
		// The last filter is the actual destination for inbound listener
655
		if l.ApiListener != nil {
656
			// This is an API Listener
657
			// TODO: extract VIP and RDS or cluster
658
			continue
659
		}
660
		fc := l.FilterChains[len(l.FilterChains)-1]
661
		// Find the terminal filter
662
		filter := fc.Filters[len(fc.Filters)-1]
663

664
		// The actual destination will be the next to the last if the last filter is a passthrough filter
665
		if fc.GetName() == util.PassthroughFilterChain {
666
			fc = l.FilterChains[len(l.FilterChains)-2]
667
			filter = fc.Filters[len(fc.Filters)-1]
668
		}
669

670
		switch filter.Name {
671
		case wellknown.TCPProxy:
672
			lt[l.Name] = l
673
			config, _ := conversion.MessageToStruct(filter.GetTypedConfig())
674
			c := config.Fields["cluster"].GetStringValue()
675
			adscLog.Debugf("TCP: %s -> %s", l.Name, c)
676
		case wellknown.HTTPConnectionManager:
677
			lh[l.Name] = l
678

679
			// Getting from config is too painful..
680
			port := l.Address.GetSocketAddress().GetPortValue()
681
			if port == 15002 {
682
				routes = append(routes, "http_proxy")
683
			} else {
684
				routes = append(routes, fmt.Sprintf("%d", port))
685
			}
686
		case wellknown.MongoProxy:
687
			// ignore for now
688
		case wellknown.RedisProxy:
689
			// ignore for now
690
		case wellknown.MySQLProxy:
691
			// ignore for now
692
		default:
693
			adscLog.Infof(protomarshal.ToJSONWithIndent(l, "  "))
694
		}
695
	}
696

697
	adscLog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
698
	if adscLog.DebugEnabled() {
699
		b, _ := json.MarshalIndent(ll, " ", " ")
700
		adscLog.Debugf(string(b))
701
	}
702
	a.mutex.Lock()
703
	defer a.mutex.Unlock()
704
	if len(routes) > 0 {
705
		a.sendRsc(v3.RouteType, routes)
706
	}
707
	a.httpListeners = lh
708
	a.tcpListeners = lt
709

710
	select {
711
	case a.Updates <- v3.ListenerType:
712
	default:
713
	}
714
}
715

716
// Save will save the json configs to files, using the base directory
717
func (a *ADSC) Save(base string) error {
718
	a.mutex.Lock()
719
	defer a.mutex.Unlock()
720

721
	// guarantee the persistence order for each element in tcpListeners
722
	var sortTCPListeners []string
723
	for key := range a.tcpListeners {
724
		sortTCPListeners = append(sortTCPListeners, key)
725
	}
726
	sort.Strings(sortTCPListeners)
727
	arrTCPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortTCPListeners))
728
	for _, element := range sortTCPListeners {
729
		sliceItem := &jsonMarshalProtoWithName{element, a.tcpListeners[element]}
730
		arrTCPListenersJSONProto = append(arrTCPListenersJSONProto, *sliceItem)
731
	}
732
	byteJSONResponse, err := json.MarshalIndent(arrTCPListenersJSONProto, "", "  ")
733
	if err != nil {
734
		adscLog.Warnf("Error for marshaling TCPListeners: %v", err)
735
	}
736
	err = os.WriteFile(base+"_lds_tcp.json", byteJSONResponse, 0o644)
737
	if err != nil {
738
		return err
739
	}
740

741
	// guarantee the persistence order for each element in httpListeners
742
	var sortHTTPListeners []string
743
	for key := range a.httpListeners {
744
		sortHTTPListeners = append(sortHTTPListeners, key)
745
	}
746
	sort.Strings(sortHTTPListeners)
747
	arrHTTPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortHTTPListeners))
748
	for _, element := range sortHTTPListeners {
749
		sliceItem := &jsonMarshalProtoWithName{element, a.httpListeners[element]}
750
		arrHTTPListenersJSONProto = append(arrHTTPListenersJSONProto, *sliceItem)
751
	}
752
	byteJSONResponse, err = json.MarshalIndent(arrHTTPListenersJSONProto, "", "  ")
753
	if err != nil {
754
		return err
755
	}
756
	err = os.WriteFile(base+"_lds_http.json", byteJSONResponse, 0o644)
757
	if err != nil {
758
		return err
759
	}
760

761
	// guarantee the persistence order for each element in routes
762
	var sortRoutes []string
763
	for key := range a.routes {
764
		sortRoutes = append(sortRoutes, key)
765
	}
766
	sort.Strings(sortRoutes)
767
	arrRoutesJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortRoutes))
768
	for _, element := range sortRoutes {
769
		sliceItem := &jsonMarshalProtoWithName{element, a.routes[element]}
770
		arrRoutesJSONProto = append(arrRoutesJSONProto, *sliceItem)
771
	}
772
	byteJSONResponse, err = json.MarshalIndent(arrRoutesJSONProto, "", "  ")
773
	if err != nil {
774
		return err
775
	}
776
	err = os.WriteFile(base+"_rds.json", byteJSONResponse, 0o644)
777
	if err != nil {
778
		return err
779
	}
780

781
	// guarantee the persistence order for each element in edsClusters
782
	var sortEdsClusters []string
783
	for key := range a.edsClusters {
784
		sortEdsClusters = append(sortEdsClusters, key)
785
	}
786
	sort.Strings(sortEdsClusters)
787
	arrEdsClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEdsClusters))
788
	for _, element := range sortEdsClusters {
789
		sliceItem := &jsonMarshalProtoWithName{element, a.edsClusters[element]}
790
		arrEdsClustersJSONProto = append(arrEdsClustersJSONProto, *sliceItem)
791
	}
792
	byteJSONResponse, err = json.MarshalIndent(arrEdsClustersJSONProto, "", "  ")
793
	if err != nil {
794
		return err
795
	}
796
	err = os.WriteFile(base+"_ecds.json", byteJSONResponse, 0o644)
797
	if err != nil {
798
		return err
799
	}
800

801
	// guarantee the persistence order for each element in clusters
802
	var sortClusters []string
803
	for key := range a.clusters {
804
		sortClusters = append(sortClusters, key)
805
	}
806
	sort.Strings(sortClusters)
807
	arrClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortClusters))
808
	for _, element := range sortClusters {
809
		sliceItem := &jsonMarshalProtoWithName{element, a.clusters[element]}
810
		arrClustersJSONProto = append(arrClustersJSONProto, *sliceItem)
811
	}
812
	byteJSONResponse, err = json.MarshalIndent(arrClustersJSONProto, "", "  ")
813
	if err != nil {
814
		return err
815
	}
816
	err = os.WriteFile(base+"_cds.json", byteJSONResponse, 0o644)
817
	if err != nil {
818
		return err
819
	}
820

821
	// guarantee the persistence order for each element in eds
822
	var sortEds []string
823
	for key := range a.eds {
824
		sortEds = append(sortEds, key)
825
	}
826
	sort.Strings(sortEds)
827
	arrEdsJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEds))
828
	for _, element := range sortEds {
829
		sliceItem := &jsonMarshalProtoWithName{element, a.eds[element]}
830
		arrEdsJSONProto = append(arrEdsJSONProto, *sliceItem)
831
	}
832
	byteJSONResponse, err = json.MarshalIndent(arrEdsJSONProto, "", "  ")
833
	if err != nil {
834
		return err
835
	}
836
	err = os.WriteFile(base+"_eds.json", byteJSONResponse, 0o644)
837
	if err != nil {
838
		return err
839
	}
840

841
	return err
842
}
843

844
func (a *ADSC) handleCDS(ll []*cluster.Cluster) {
845
	cn := make([]string, 0, len(ll))
846
	cdsSize := 0
847
	edscds := map[string]*cluster.Cluster{}
848
	cds := map[string]*cluster.Cluster{}
849
	for _, c := range ll {
850
		cdsSize += proto.Size(c)
851
		switch v := c.ClusterDiscoveryType.(type) {
852
		case *cluster.Cluster_Type:
853
			if v.Type != cluster.Cluster_EDS {
854
				cds[c.Name] = c
855
				continue
856
			}
857
		}
858
		cn = append(cn, c.Name)
859
		edscds[c.Name] = c
860
	}
861

862
	adscLog.Infof("CDS: %d size=%d", len(cn), cdsSize)
863

864
	if len(cn) > 0 {
865
		a.sendRsc(v3.EndpointType, cn)
866
	}
867
	if adscLog.DebugEnabled() {
868
		b, _ := json.MarshalIndent(ll, " ", " ")
869
		adscLog.Debugf(string(b))
870
	}
871

872
	a.mutex.Lock()
873
	defer a.mutex.Unlock()
874
	a.edsClusters = edscds
875
	a.clusters = cds
876

877
	select {
878
	case a.Updates <- v3.ClusterType:
879
	default:
880
	}
881
}
882

883
func (a *ADSC) node() *core.Node {
884
	return buildNode(&a.cfg.Config)
885
}
886

887
func nodeID(config *Config) string {
888
	return fmt.Sprintf("%s~%s~%s.%s~%s.svc.%s", config.NodeType, config.IP,
889
		config.Workload, config.Namespace, config.Namespace, constants.DefaultClusterLocalDomain)
890
}
891

892
func buildNode(config *Config) *core.Node {
893
	n := &core.Node{
894
		Id:       nodeID(config),
895
		Locality: config.Locality,
896
	}
897
	if config.Meta == nil {
898
		n.Metadata = &pstruct.Struct{
899
			Fields: map[string]*pstruct.Value{
900
				"ISTIO_VERSION": {Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
901
			},
902
		}
903
	} else {
904
		n.Metadata = config.Meta
905
		if config.Meta.Fields["ISTIO_VERSION"] == nil {
906
			config.Meta.Fields["ISTIO_VERSION"] = &pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}}
907
		}
908
	}
909
	return n
910
}
911

912
// Raw send of a request.
913
func (a *ADSC) Send(req *discovery.DiscoveryRequest) error {
914
	if a.sendNodeMeta {
915
		req.Node = a.node()
916
		a.sendNodeMeta = false
917
	}
918
	req.ResponseNonce = time.Now().String()
919
	if adscLog.DebugEnabled() {
920
		strReq, _ := protomarshal.ToJSONWithIndent(req, "  ")
921
		adscLog.Debugf("Sending Discovery Request to istiod: %s", strReq)
922
	}
923
	return a.stream.Send(req)
924
}
925

926
func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
927
	la := map[string]*endpoint.ClusterLoadAssignment{}
928
	edsSize := 0
929
	ep := 0
930
	for _, cla := range eds {
931
		edsSize += proto.Size(cla)
932
		la[cla.ClusterName] = cla
933
		ep += len(cla.Endpoints)
934
	}
935

936
	adscLog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
937
	if adscLog.DebugEnabled() {
938
		b, _ := json.MarshalIndent(eds, " ", " ")
939
		adscLog.Debugf(string(b))
940
	}
941
	if a.initialLoad == 0 && !a.initialLds {
942
		// first load - Envoy loads listeners after endpoints
943
		_ = a.stream.Send(&discovery.DiscoveryRequest{
944
			Node:    a.node(),
945
			TypeUrl: v3.ListenerType,
946
		})
947
		a.initialLds = true
948
	}
949

950
	a.mutex.Lock()
951
	defer a.mutex.Unlock()
952
	a.eds = la
953

954
	select {
955
	case a.Updates <- v3.EndpointType:
956
	default:
957
	}
958
}
959

960
func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
961
	vh := 0
962
	rcount := 0
963
	size := 0
964

965
	rds := map[string]*route.RouteConfiguration{}
966

967
	for _, r := range configurations {
968
		for _, h := range r.VirtualHosts {
969
			vh++
970
			for _, rt := range h.Routes {
971
				rcount++
972
				// Example: match:<prefix:"/" > route:<cluster:"outbound|9154||load-se-154.local" ...
973
				adscLog.Debugf("Handle route %v, path %v, cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
974
			}
975
		}
976
		rds[r.Name] = r
977
		size += proto.Size(r)
978
	}
979
	if a.initialLoad == 0 {
980
		a.initialLoad = time.Since(a.watchTime)
981
		adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", len(configurations), size, vh, rcount, a.initialLoad)
982
	} else {
983
		adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d", len(configurations), size, vh, rcount)
984
	}
985

986
	if adscLog.DebugEnabled() {
987
		b, _ := json.MarshalIndent(configurations, " ", " ")
988
		adscLog.Debugf(string(b))
989
	}
990

991
	a.mutex.Lock()
992
	a.routes = rds
993
	a.mutex.Unlock()
994

995
	select {
996
	case a.Updates <- v3.RouteType:
997
	default:
998
	}
999
}
1000

1001
// WaitClear will clear the waiting events, so next call to Wait will get
1002
// the next push type.
1003
func (a *ADSC) WaitClear() {
1004
	for {
1005
		select {
1006
		case <-a.Updates:
1007
		default:
1008
			return
1009
		}
1010
	}
1011
}
1012

1013
// WaitSingle waits for a single resource, and fails if the rejected type is
1014
// returned. We avoid rejecting all other types to avoid race conditions. For
1015
// example, a test asserting an incremental update of EDS may fail if a previous
1016
// push's RDS response comes in later. Instead, we can reject events coming
1017
// before (ie CDS). The only real alternative is to wait which introduces its own
1018
// issues.
1019
func (a *ADSC) WaitSingle(to time.Duration, want string, reject string) error {
1020
	t := time.NewTimer(to)
1021
	for {
1022
		select {
1023
		case t := <-a.Updates:
1024
			if t == "" {
1025
				return fmt.Errorf("closed")
1026
			}
1027
			if t != want && t == reject {
1028
				return fmt.Errorf("wanted update for %v got %v", want, t)
1029
			}
1030
			if t == want {
1031
				return nil
1032
			}
1033
			continue
1034
		case <-t.C:
1035
			return fmt.Errorf("timeout, still waiting for update for %v", want)
1036
		}
1037
	}
1038
}
1039

1040
// Wait for an updates for all the specified types
1041
// If updates is empty, this will wait for any update
1042
func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error) {
1043
	t := time.NewTimer(to)
1044
	want := sets.New[string](updates...)
1045
	got := make([]string, 0, len(updates))
1046
	for {
1047
		select {
1048
		case toDelete := <-a.Updates:
1049
			if toDelete == "" {
1050
				return got, fmt.Errorf("closed")
1051
			}
1052
			want.Delete(toDelete)
1053
			got = append(got, toDelete)
1054
			if want.Len() == 0 {
1055
				return got, nil
1056
			}
1057
		case <-t.C:
1058
			return got, fmt.Errorf("timeout, still waiting for updates: %v", want)
1059
		}
1060
	}
1061
}
1062

1063
// WaitVersion waits for a new or updated for a typeURL.
1064
func (a *ADSC) WaitVersion(to time.Duration, typeURL, lastVersion string) (*discovery.DiscoveryResponse, error) {
1065
	t := time.NewTimer(to)
1066
	a.mutex.Lock()
1067
	ex := a.Received[typeURL]
1068
	a.mutex.Unlock()
1069
	if ex != nil {
1070
		if lastVersion == "" {
1071
			return ex, nil
1072
		}
1073
		if lastVersion != ex.VersionInfo {
1074
			return ex, nil
1075
		}
1076
	}
1077

1078
	for {
1079
		select {
1080
		case t := <-a.XDSUpdates:
1081
			if t == nil {
1082
				return nil, fmt.Errorf("closed")
1083
			}
1084
			if t.TypeUrl == typeURL {
1085
				return t, nil
1086
			}
1087

1088
		case <-t.C:
1089
			return nil, fmt.Errorf("timeout, still waiting for updates: %v", typeURL)
1090
		case err, ok := <-a.errChan:
1091
			if ok {
1092
				return nil, err
1093
			}
1094
			return nil, fmt.Errorf("connection closed")
1095
		}
1096
	}
1097
}
1098

1099
// EndpointsJSON returns the endpoints, formatted as JSON, for debugging.
1100
func (a *ADSC) EndpointsJSON() string {
1101
	a.mutex.Lock()
1102
	defer a.mutex.Unlock()
1103
	out, _ := json.MarshalIndent(a.eds, " ", " ")
1104
	return string(out)
1105
}
1106

1107
func ConfigInitialRequests() []*discovery.DiscoveryRequest {
1108
	out := make([]*discovery.DiscoveryRequest, 0, len(collections.Pilot.All())+1)
1109
	out = append(out, &discovery.DiscoveryRequest{
1110
		TypeUrl: gvk.MeshConfig.String(),
1111
	})
1112
	for _, sch := range collections.Pilot.All() {
1113
		out = append(out, &discovery.DiscoveryRequest{
1114
			TypeUrl: sch.GroupVersionKind().String(),
1115
		})
1116
	}
1117

1118
	return out
1119
}
1120

1121
func (a *ADSC) sendRsc(typeurl string, rsc []string) {
1122
	ex := a.Received[typeurl]
1123
	version := ""
1124
	nonce := ""
1125
	if ex != nil {
1126
		version = ex.VersionInfo
1127
		nonce = ex.Nonce
1128
	}
1129
	_ = a.stream.Send(&discovery.DiscoveryRequest{
1130
		ResponseNonce: nonce,
1131
		VersionInfo:   version,
1132
		Node:          a.node(),
1133
		TypeUrl:       typeurl,
1134
		ResourceNames: rsc,
1135
	})
1136
}
1137

1138
func (a *ADSC) ack(msg *discovery.DiscoveryResponse) {
1139
	var resources []string
1140

1141
	if strings.HasPrefix(msg.TypeUrl, v3.DebugType) {
1142
		// If the response is for istio.io/debug or istio.io/debug/*,
1143
		// skip to send ACK.
1144
		return
1145
	}
1146

1147
	if msg.TypeUrl == v3.EndpointType {
1148
		for c := range a.edsClusters {
1149
			resources = append(resources, c)
1150
		}
1151
	}
1152
	if msg.TypeUrl == v3.RouteType {
1153
		for r := range a.routes {
1154
			resources = append(resources, r)
1155
		}
1156
	}
1157

1158
	_ = a.stream.Send(&discovery.DiscoveryRequest{
1159
		ResponseNonce: msg.Nonce,
1160
		TypeUrl:       msg.TypeUrl,
1161
		Node:          a.node(),
1162
		VersionInfo:   msg.VersionInfo,
1163
		ResourceNames: resources,
1164
	})
1165
}
1166

1167
// GetHTTPListeners returns all the http listeners.
1168
func (a *ADSC) GetHTTPListeners() map[string]*listener.Listener {
1169
	a.mutex.Lock()
1170
	defer a.mutex.Unlock()
1171
	return a.httpListeners
1172
}
1173

1174
// GetTCPListeners returns all the tcp listeners.
1175
func (a *ADSC) GetTCPListeners() map[string]*listener.Listener {
1176
	a.mutex.RLock()
1177
	defer a.mutex.RUnlock()
1178
	return a.tcpListeners
1179
}
1180

1181
// GetEdsClusters returns all the eds type clusters.
1182
func (a *ADSC) GetEdsClusters() map[string]*cluster.Cluster {
1183
	a.mutex.RLock()
1184
	defer a.mutex.RUnlock()
1185
	return a.edsClusters
1186
}
1187

1188
// GetClusters returns all the non-eds type clusters.
1189
func (a *ADSC) GetClusters() map[string]*cluster.Cluster {
1190
	a.mutex.RLock()
1191
	defer a.mutex.RUnlock()
1192
	return a.clusters
1193
}
1194

1195
// GetRoutes returns all the routes.
1196
func (a *ADSC) GetRoutes() map[string]*route.RouteConfiguration {
1197
	a.mutex.RLock()
1198
	defer a.mutex.RUnlock()
1199
	return a.routes
1200
}
1201

1202
// GetEndpoints returns all the routes.
1203
func (a *ADSC) GetEndpoints() map[string]*endpoint.ClusterLoadAssignment {
1204
	a.mutex.RLock()
1205
	defer a.mutex.RUnlock()
1206
	return a.eds
1207
}
1208

1209
func (a *ADSC) handleMCP(groupVersionKind config.GroupVersionKind, resources []*anypb.Any) {
1210
	// Generic - fill up the store
1211
	if a.Store == nil {
1212
		return
1213
	}
1214

1215
	existingConfigs := a.Store.List(groupVersionKind, "")
1216

1217
	received := make(map[string]*config.Config)
1218
	for _, rsc := range resources {
1219
		m := &mcp.Resource{}
1220
		err := rsc.UnmarshalTo(m)
1221
		if err != nil {
1222
			adscLog.Warnf("Error unmarshalling received MCP config %v", err)
1223
			continue
1224
		}
1225
		newCfg, err := a.mcpToPilot(m)
1226
		if err != nil {
1227
			adscLog.Warnf("Invalid data: %v (%v)", err, string(rsc.Value))
1228
			continue
1229
		}
1230
		if newCfg == nil {
1231
			continue
1232
		}
1233
		received[newCfg.Namespace+"/"+newCfg.Name] = newCfg
1234

1235
		newCfg.GroupVersionKind = groupVersionKind
1236
		oldCfg := a.Store.Get(newCfg.GroupVersionKind, newCfg.Name, newCfg.Namespace)
1237

1238
		if oldCfg == nil {
1239
			if _, err = a.Store.Create(*newCfg); err != nil {
1240
				adscLog.Warnf("Error adding a new resource to the store %v", err)
1241
				continue
1242
			}
1243
		} else if oldCfg.ResourceVersion != newCfg.ResourceVersion || newCfg.ResourceVersion == "" {
1244
			// update the store only when resource version differs or unset.
1245
			newCfg.Annotations[mem.ResourceVersion] = newCfg.ResourceVersion
1246
			newCfg.ResourceVersion = oldCfg.ResourceVersion
1247
			if _, err = a.Store.Update(*newCfg); err != nil {
1248
				adscLog.Warnf("Error updating an existing resource in the store %v", err)
1249
				continue
1250
			}
1251
		}
1252
		if a.LocalCacheDir != "" {
1253
			strResponse, err := json.MarshalIndent(newCfg, "  ", "  ")
1254
			if err != nil {
1255
				adscLog.Warnf("Error marshaling received MCP config %v", err)
1256
				continue
1257
			}
1258
			err = os.WriteFile(a.LocalCacheDir+"_res."+
1259
				newCfg.GroupVersionKind.Kind+"."+newCfg.Namespace+"."+newCfg.Name+".json", strResponse, 0o644)
1260
			if err != nil {
1261
				adscLog.Warnf("Error writing received MCP config to local file %v", err)
1262
			}
1263
		}
1264
	}
1265

1266
	// remove deleted resources from cache
1267
	for _, config := range existingConfigs {
1268
		if _, ok := received[config.Namespace+"/"+config.Name]; !ok {
1269
			if err := a.Store.Delete(config.GroupVersionKind, config.Name, config.Namespace, nil); err != nil {
1270
				adscLog.Warnf("Error deleting an outdated resource from the store %v", err)
1271
				continue
1272
			}
1273
			if a.LocalCacheDir != "" {
1274
				err := os.Remove(a.LocalCacheDir + "_res." +
1275
					config.GroupVersionKind.Kind + "." + config.Namespace + "." + config.Name + ".json")
1276
				if err != nil {
1277
					adscLog.Warnf("Error deleting received MCP config to local file %v", err)
1278
				}
1279
			}
1280
		}
1281
	}
1282
}
1283

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.