31
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
32
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
33
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
34
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
35
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
36
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
37
"github.com/envoyproxy/go-control-plane/pkg/conversion"
38
"google.golang.org/grpc"
39
"google.golang.org/grpc/credentials"
40
"google.golang.org/grpc/credentials/insecure"
41
"google.golang.org/protobuf/proto"
42
anypb "google.golang.org/protobuf/types/known/anypb"
43
pstruct "google.golang.org/protobuf/types/known/structpb"
45
mcp "istio.io/api/mcp/v1alpha1"
46
"istio.io/api/mesh/v1alpha1"
47
mem "istio.io/istio/pilot/pkg/config/memory"
48
"istio.io/istio/pilot/pkg/model"
49
"istio.io/istio/pilot/pkg/networking/util"
50
"istio.io/istio/pilot/pkg/serviceregistry/memory"
51
"istio.io/istio/pilot/pkg/util/network"
52
v3 "istio.io/istio/pilot/pkg/xds/v3"
53
"istio.io/istio/pkg/backoff"
54
"istio.io/istio/pkg/config"
55
"istio.io/istio/pkg/config/constants"
56
"istio.io/istio/pkg/config/schema/collections"
57
"istio.io/istio/pkg/config/schema/gvk"
58
"istio.io/istio/pkg/log"
59
"istio.io/istio/pkg/security"
60
"istio.io/istio/pkg/util/protomarshal"
61
"istio.io/istio/pkg/util/sets"
62
"istio.io/istio/pkg/wellknown"
66
defaultClientMaxReceiveMessageSize = math.MaxInt32
67
defaultInitialConnWindowSize = 1024 * 1024
68
defaultInitialWindowSize = 1024 * 1024
87
Locality *core.Locality
90
NodeType model.NodeType
102
SecretManager security.SecretManager
120
InsecureSkipVerify bool
123
BackoffPolicy backoff.BackOff
125
GrpcOpts []grpc.DialOption
129
type ADSConfig struct {
134
InitialDiscoveryRequests []*discovery.DiscoveryRequest
138
ResponseHandler ResponseHandler
141
func defaultGrpcDialOptions() []grpc.DialOption {
142
return []grpc.DialOption{
144
grpc.WithInitialWindowSize(int32(defaultInitialWindowSize)),
145
grpc.WithInitialConnWindowSize(int32(defaultInitialConnWindowSize)),
146
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize)),
155
stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
157
client discovery.AggregatedDiscoveryServiceClient
158
conn *grpc.ClientConn
169
initialLoad time.Duration
175
httpListeners map[string]*listener.Listener
178
tcpListeners map[string]*listener.Listener
181
edsClusters map[string]*cluster.Cluster
184
clusters map[string]*cluster.Cluster
187
routes map[string]*route.RouteConfiguration
190
eds map[string]*endpoint.ClusterLoadAssignment
194
Metadata *pstruct.Struct
199
XDSUpdates chan *discovery.DiscoveryResponse
200
VersionInfo map[string]string
203
Received map[string]*discovery.DiscoveryResponse
207
Mesh *v1alpha1.MeshConfig
210
Store model.ConfigStore
213
Registry *memory.ServiceDiscovery
226
sync map[string]time.Time
227
Locality *core.Locality
230
type ResponseHandler interface {
231
HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
235
type jsonMarshalProtoWithName struct {
236
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
237
Message proto.Message
240
func (p jsonMarshalProtoWithName) MarshalJSON() ([]byte, error) {
241
strSer, serr := protomarshal.ToJSONWithIndent(p.Message, " ")
243
adscLog.Warnf("Error for marshaling [%s]: %v", p.Name, serr)
244
return []byte(""), serr
246
serialItem := []byte("{\"" + p.Name + "\":" + strSer + "}")
247
return serialItem, nil
250
var adscLog = log.RegisterScope("adsc", "adsc debugging")
252
func NewWithBackoffPolicy(discoveryAddr string, opts *ADSConfig, backoffPolicy backoff.BackOff) (*ADSC, error) {
253
adsc, err := New(discoveryAddr, opts)
257
adsc.cfg.BackoffPolicy = backoffPolicy
268
func New(discoveryAddr string, opts *ADSConfig) (*ADSC, error) {
272
opts.Config = setDefaultConfig(&opts.Config)
273
opts.Address = discoveryAddr
275
Updates: make(chan string, 100),
276
XDSUpdates: make(chan *discovery.DiscoveryResponse, 100),
277
VersionInfo: map[string]string{},
278
Received: map[string]*discovery.DiscoveryResponse{},
280
sync: map[string]time.Time{},
281
errChan: make(chan error, 10),
284
adsc.Metadata = opts.Meta
285
adsc.Locality = opts.Locality
287
adsc.nodeID = nodeID(&adsc.cfg.Config)
289
if err := adsc.Dial(); err != nil {
296
func setDefaultConfig(config *Config) Config {
300
if config.Namespace == "" {
301
config.Namespace = "default"
303
if config.NodeType == "" {
304
config.NodeType = model.SidecarProxy
307
ips, ok := network.GetPrivateIPsIfAvailable()
308
if ok && len(ips) > 0 {
312
if config.Workload == "" {
313
config.Workload = "test-1"
315
if config.BackoffPolicy == nil {
316
config.BackoffPolicy = backoff.NewExponentialBackOff(backoff.DefaultOption())
322
func (a *ADSC) Dial() error {
323
conn, err := dialWithConfig(&a.cfg.Config)
331
func dialWithConfig(config *Config) (*grpc.ClientConn, error) {
332
defaultGrpcDialOptions := defaultGrpcDialOptions()
333
var grpcDialOptions []grpc.DialOption
334
grpcDialOptions = append(grpcDialOptions, defaultGrpcDialOptions...)
335
grpcDialOptions = append(grpcDialOptions, config.GrpcOpts...)
339
if len(config.CertDir) > 0 || config.SecretManager != nil {
340
tlsCfg, err := tlsConfig(config)
344
creds := credentials.NewTLS(tlsCfg)
345
grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(creds))
348
if len(grpcDialOptions) == len(defaultGrpcDialOptions) {
350
grpcDialOptions = append(grpcDialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
353
conn, err := grpc.Dial(config.Address, grpcDialOptions...)
360
func tlsConfig(config *Config) (*tls.Config, error) {
361
var clientCerts []tls.Certificate
362
var serverCABytes []byte
365
getClientCertificate := getClientCertFn(config)
368
if config.RootCert != nil {
369
serverCABytes = config.RootCert
370
} else if config.XDSRootCAFile != "" {
371
serverCABytes, err = os.ReadFile(config.XDSRootCAFile)
375
} else if config.SecretManager != nil {
377
rootCA, err := config.SecretManager.GenerateSecret(security.RootCertReqResourceName)
382
serverCABytes = rootCA.RootCert
383
} else if config.CertDir != "" {
384
serverCABytes, err = os.ReadFile(config.CertDir + "/root-cert.pem")
390
serverCAs := x509.NewCertPool()
391
if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
395
shost, _, _ := net.SplitHostPort(config.Address)
396
if config.XDSSAN != "" {
397
shost = config.XDSSAN
403
GetClientCertificate: getClientCertificate,
404
Certificates: clientCerts,
407
InsecureSkipVerify: config.InsecureSkipVerify,
412
func (a *ADSC) Close() {
422
func (a *ADSC) Run() error {
424
a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
425
a.stream, err = a.client.StreamAggregatedResources(context.Background())
429
a.sendNodeMeta = true
433
for _, r := range a.cfg.InitialDiscoveryRequests {
434
if r.TypeUrl == v3.ClusterType {
435
a.watchTime = time.Now()
445
func (a *ADSC) HasSynced() bool {
446
if a.cfg == nil || len(a.cfg.InitialDiscoveryRequests) == 0 {
451
defer a.mutex.RUnlock()
453
for _, req := range a.cfg.InitialDiscoveryRequests {
454
_, isMCP := convertTypeURLToMCPGVK(req.TypeUrl)
459
if _, ok := a.sync[req.TypeUrl]; !ok {
468
func (a *ADSC) reconnect() {
479
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
483
func (a *ADSC) handleRecv() {
485
if a.cfg.BackoffPolicy != nil {
486
a.cfg.BackoffPolicy.Reset()
490
msg, err := a.stream.Recv()
492
adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
494
case a.errChan <- err:
498
if a.cfg.BackoffPolicy != nil {
499
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
511
resourceGvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)
513
adscLog.WithLabels("type", msg.TypeUrl, "count", len(msg.Resources), "nonce", msg.Nonce).Info("Received")
514
if a.cfg.ResponseHandler != nil {
515
a.cfg.ResponseHandler.HandleResponse(a, msg)
518
if msg.TypeUrl == gvk.MeshConfig.String() &&
519
len(msg.Resources) > 0 {
520
rsc := msg.Resources[0]
521
m := &v1alpha1.MeshConfig{}
522
err = proto.Unmarshal(rsc.Value, m)
524
adscLog.Warnf("Failed to unmarshal mesh config: %v", err)
527
if a.LocalCacheDir != "" {
528
strResponse, err := protomarshal.ToJSONWithIndent(m, " ")
532
err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
541
a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
543
case v3.ListenerType:
544
listeners := make([]*listener.Listener, 0, len(msg.Resources))
545
for _, rsc := range msg.Resources {
546
valBytes := rsc.Value
547
ll := &listener.Listener{}
548
_ = proto.Unmarshal(valBytes, ll)
549
listeners = append(listeners, ll)
551
a.handleLDS(listeners)
553
clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
554
for _, rsc := range msg.Resources {
555
valBytes := rsc.Value
556
cl := &cluster.Cluster{}
557
_ = proto.Unmarshal(valBytes, cl)
558
clusters = append(clusters, cl)
560
a.handleCDS(clusters)
561
case v3.EndpointType:
562
eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
563
for _, rsc := range msg.Resources {
564
valBytes := rsc.Value
565
el := &endpoint.ClusterLoadAssignment{}
566
_ = proto.Unmarshal(valBytes, el)
567
eds = append(eds, el)
571
routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
572
for _, rsc := range msg.Resources {
573
valBytes := rsc.Value
574
rl := &route.RouteConfiguration{}
575
_ = proto.Unmarshal(valBytes, rl)
576
routes = append(routes, rl)
581
a.handleMCP(resourceGvk, msg.Resources)
592
if _, exist := a.sync[resourceGvk.String()]; !exist {
593
a.sync[resourceGvk.String()] = time.Now()
596
a.Received[msg.TypeUrl] = msg
601
case a.XDSUpdates <- msg:
607
func (a *ADSC) mcpToPilot(m *mcp.Resource) (*config.Config, error) {
608
if m == nil || m.Metadata == nil {
609
return &config.Config{}, nil
613
ResourceVersion: m.Metadata.Version,
614
Labels: m.Metadata.Labels,
615
Annotations: m.Metadata.Annotations,
619
if !config.ObjectInRevision(c, a.cfg.Revision) {
623
if c.Meta.Annotations == nil {
624
c.Meta.Annotations = make(map[string]string)
626
nsn := strings.Split(m.Metadata.Name, "/")
628
return nil, fmt.Errorf("invalid name %s", m.Metadata.Name)
633
c.CreationTimestamp = m.Metadata.CreateTime.AsTime()
635
pb, err := m.Body.UnmarshalNew()
644
func (a *ADSC) handleLDS(ll []*listener.Listener) {
645
lh := map[string]*listener.Listener{}
646
lt := map[string]*listener.Listener{}
651
for _, l := range ll {
652
ldsSize += proto.Size(l)
655
if l.ApiListener != nil {
660
fc := l.FilterChains[len(l.FilterChains)-1]
662
filter := fc.Filters[len(fc.Filters)-1]
665
if fc.GetName() == util.PassthroughFilterChain {
666
fc = l.FilterChains[len(l.FilterChains)-2]
667
filter = fc.Filters[len(fc.Filters)-1]
671
case wellknown.TCPProxy:
673
config, _ := conversion.MessageToStruct(filter.GetTypedConfig())
674
c := config.Fields["cluster"].GetStringValue()
675
adscLog.Debugf("TCP: %s -> %s", l.Name, c)
676
case wellknown.HTTPConnectionManager:
680
port := l.Address.GetSocketAddress().GetPortValue()
682
routes = append(routes, "http_proxy")
684
routes = append(routes, fmt.Sprintf("%d", port))
686
case wellknown.MongoProxy:
688
case wellknown.RedisProxy:
690
case wellknown.MySQLProxy:
693
adscLog.Infof(protomarshal.ToJSONWithIndent(l, " "))
697
adscLog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
698
if adscLog.DebugEnabled() {
699
b, _ := json.MarshalIndent(ll, " ", " ")
700
adscLog.Debugf(string(b))
703
defer a.mutex.Unlock()
705
a.sendRsc(v3.RouteType, routes)
711
case a.Updates <- v3.ListenerType:
717
func (a *ADSC) Save(base string) error {
719
defer a.mutex.Unlock()
722
var sortTCPListeners []string
723
for key := range a.tcpListeners {
724
sortTCPListeners = append(sortTCPListeners, key)
726
sort.Strings(sortTCPListeners)
727
arrTCPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortTCPListeners))
728
for _, element := range sortTCPListeners {
729
sliceItem := &jsonMarshalProtoWithName{element, a.tcpListeners[element]}
730
arrTCPListenersJSONProto = append(arrTCPListenersJSONProto, *sliceItem)
732
byteJSONResponse, err := json.MarshalIndent(arrTCPListenersJSONProto, "", " ")
734
adscLog.Warnf("Error for marshaling TCPListeners: %v", err)
736
err = os.WriteFile(base+"_lds_tcp.json", byteJSONResponse, 0o644)
742
var sortHTTPListeners []string
743
for key := range a.httpListeners {
744
sortHTTPListeners = append(sortHTTPListeners, key)
746
sort.Strings(sortHTTPListeners)
747
arrHTTPListenersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortHTTPListeners))
748
for _, element := range sortHTTPListeners {
749
sliceItem := &jsonMarshalProtoWithName{element, a.httpListeners[element]}
750
arrHTTPListenersJSONProto = append(arrHTTPListenersJSONProto, *sliceItem)
752
byteJSONResponse, err = json.MarshalIndent(arrHTTPListenersJSONProto, "", " ")
756
err = os.WriteFile(base+"_lds_http.json", byteJSONResponse, 0o644)
762
var sortRoutes []string
763
for key := range a.routes {
764
sortRoutes = append(sortRoutes, key)
766
sort.Strings(sortRoutes)
767
arrRoutesJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortRoutes))
768
for _, element := range sortRoutes {
769
sliceItem := &jsonMarshalProtoWithName{element, a.routes[element]}
770
arrRoutesJSONProto = append(arrRoutesJSONProto, *sliceItem)
772
byteJSONResponse, err = json.MarshalIndent(arrRoutesJSONProto, "", " ")
776
err = os.WriteFile(base+"_rds.json", byteJSONResponse, 0o644)
782
var sortEdsClusters []string
783
for key := range a.edsClusters {
784
sortEdsClusters = append(sortEdsClusters, key)
786
sort.Strings(sortEdsClusters)
787
arrEdsClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEdsClusters))
788
for _, element := range sortEdsClusters {
789
sliceItem := &jsonMarshalProtoWithName{element, a.edsClusters[element]}
790
arrEdsClustersJSONProto = append(arrEdsClustersJSONProto, *sliceItem)
792
byteJSONResponse, err = json.MarshalIndent(arrEdsClustersJSONProto, "", " ")
796
err = os.WriteFile(base+"_ecds.json", byteJSONResponse, 0o644)
802
var sortClusters []string
803
for key := range a.clusters {
804
sortClusters = append(sortClusters, key)
806
sort.Strings(sortClusters)
807
arrClustersJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortClusters))
808
for _, element := range sortClusters {
809
sliceItem := &jsonMarshalProtoWithName{element, a.clusters[element]}
810
arrClustersJSONProto = append(arrClustersJSONProto, *sliceItem)
812
byteJSONResponse, err = json.MarshalIndent(arrClustersJSONProto, "", " ")
816
err = os.WriteFile(base+"_cds.json", byteJSONResponse, 0o644)
823
for key := range a.eds {
824
sortEds = append(sortEds, key)
826
sort.Strings(sortEds)
827
arrEdsJSONProto := make([]jsonMarshalProtoWithName, 0, len(sortEds))
828
for _, element := range sortEds {
829
sliceItem := &jsonMarshalProtoWithName{element, a.eds[element]}
830
arrEdsJSONProto = append(arrEdsJSONProto, *sliceItem)
832
byteJSONResponse, err = json.MarshalIndent(arrEdsJSONProto, "", " ")
836
err = os.WriteFile(base+"_eds.json", byteJSONResponse, 0o644)
844
func (a *ADSC) handleCDS(ll []*cluster.Cluster) {
845
cn := make([]string, 0, len(ll))
847
edscds := map[string]*cluster.Cluster{}
848
cds := map[string]*cluster.Cluster{}
849
for _, c := range ll {
850
cdsSize += proto.Size(c)
851
switch v := c.ClusterDiscoveryType.(type) {
852
case *cluster.Cluster_Type:
853
if v.Type != cluster.Cluster_EDS {
858
cn = append(cn, c.Name)
862
adscLog.Infof("CDS: %d size=%d", len(cn), cdsSize)
865
a.sendRsc(v3.EndpointType, cn)
867
if adscLog.DebugEnabled() {
868
b, _ := json.MarshalIndent(ll, " ", " ")
869
adscLog.Debugf(string(b))
873
defer a.mutex.Unlock()
874
a.edsClusters = edscds
878
case a.Updates <- v3.ClusterType:
883
func (a *ADSC) node() *core.Node {
884
return buildNode(&a.cfg.Config)
887
func nodeID(config *Config) string {
888
return fmt.Sprintf("%s~%s~%s.%s~%s.svc.%s", config.NodeType, config.IP,
889
config.Workload, config.Namespace, config.Namespace, constants.DefaultClusterLocalDomain)
892
func buildNode(config *Config) *core.Node {
895
Locality: config.Locality,
897
if config.Meta == nil {
898
n.Metadata = &pstruct.Struct{
899
Fields: map[string]*pstruct.Value{
900
"ISTIO_VERSION": {Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
904
n.Metadata = config.Meta
905
if config.Meta.Fields["ISTIO_VERSION"] == nil {
906
config.Meta.Fields["ISTIO_VERSION"] = &pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}}
913
func (a *ADSC) Send(req *discovery.DiscoveryRequest) error {
916
a.sendNodeMeta = false
918
req.ResponseNonce = time.Now().String()
919
if adscLog.DebugEnabled() {
920
strReq, _ := protomarshal.ToJSONWithIndent(req, " ")
921
adscLog.Debugf("Sending Discovery Request to istiod: %s", strReq)
923
return a.stream.Send(req)
926
func (a *ADSC) handleEDS(eds []*endpoint.ClusterLoadAssignment) {
927
la := map[string]*endpoint.ClusterLoadAssignment{}
930
for _, cla := range eds {
931
edsSize += proto.Size(cla)
932
la[cla.ClusterName] = cla
933
ep += len(cla.Endpoints)
936
adscLog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
937
if adscLog.DebugEnabled() {
938
b, _ := json.MarshalIndent(eds, " ", " ")
939
adscLog.Debugf(string(b))
941
if a.initialLoad == 0 && !a.initialLds {
943
_ = a.stream.Send(&discovery.DiscoveryRequest{
945
TypeUrl: v3.ListenerType,
951
defer a.mutex.Unlock()
955
case a.Updates <- v3.EndpointType:
960
func (a *ADSC) handleRDS(configurations []*route.RouteConfiguration) {
965
rds := map[string]*route.RouteConfiguration{}
967
for _, r := range configurations {
968
for _, h := range r.VirtualHosts {
970
for _, rt := range h.Routes {
973
adscLog.Debugf("Handle route %v, path %v, cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
977
size += proto.Size(r)
979
if a.initialLoad == 0 {
980
a.initialLoad = time.Since(a.watchTime)
981
adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", len(configurations), size, vh, rcount, a.initialLoad)
983
adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d", len(configurations), size, vh, rcount)
986
if adscLog.DebugEnabled() {
987
b, _ := json.MarshalIndent(configurations, " ", " ")
988
adscLog.Debugf(string(b))
996
case a.Updates <- v3.RouteType:
1003
func (a *ADSC) WaitClear() {
1019
func (a *ADSC) WaitSingle(to time.Duration, want string, reject string) error {
1020
t := time.NewTimer(to)
1023
case t := <-a.Updates:
1025
return fmt.Errorf("closed")
1027
if t != want && t == reject {
1028
return fmt.Errorf("wanted update for %v got %v", want, t)
1035
return fmt.Errorf("timeout, still waiting for update for %v", want)
1042
func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error) {
1043
t := time.NewTimer(to)
1044
want := sets.New[string](updates...)
1045
got := make([]string, 0, len(updates))
1048
case toDelete := <-a.Updates:
1050
return got, fmt.Errorf("closed")
1052
want.Delete(toDelete)
1053
got = append(got, toDelete)
1054
if want.Len() == 0 {
1058
return got, fmt.Errorf("timeout, still waiting for updates: %v", want)
1064
func (a *ADSC) WaitVersion(to time.Duration, typeURL, lastVersion string) (*discovery.DiscoveryResponse, error) {
1065
t := time.NewTimer(to)
1067
ex := a.Received[typeURL]
1070
if lastVersion == "" {
1073
if lastVersion != ex.VersionInfo {
1080
case t := <-a.XDSUpdates:
1082
return nil, fmt.Errorf("closed")
1084
if t.TypeUrl == typeURL {
1089
return nil, fmt.Errorf("timeout, still waiting for updates: %v", typeURL)
1090
case err, ok := <-a.errChan:
1094
return nil, fmt.Errorf("connection closed")
1100
func (a *ADSC) EndpointsJSON() string {
1102
defer a.mutex.Unlock()
1103
out, _ := json.MarshalIndent(a.eds, " ", " ")
1107
func ConfigInitialRequests() []*discovery.DiscoveryRequest {
1108
out := make([]*discovery.DiscoveryRequest, 0, len(collections.Pilot.All())+1)
1109
out = append(out, &discovery.DiscoveryRequest{
1110
TypeUrl: gvk.MeshConfig.String(),
1112
for _, sch := range collections.Pilot.All() {
1113
out = append(out, &discovery.DiscoveryRequest{
1114
TypeUrl: sch.GroupVersionKind().String(),
1121
func (a *ADSC) sendRsc(typeurl string, rsc []string) {
1122
ex := a.Received[typeurl]
1126
version = ex.VersionInfo
1129
_ = a.stream.Send(&discovery.DiscoveryRequest{
1130
ResponseNonce: nonce,
1131
VersionInfo: version,
1138
func (a *ADSC) ack(msg *discovery.DiscoveryResponse) {
1139
var resources []string
1141
if strings.HasPrefix(msg.TypeUrl, v3.DebugType) {
1147
if msg.TypeUrl == v3.EndpointType {
1148
for c := range a.edsClusters {
1149
resources = append(resources, c)
1152
if msg.TypeUrl == v3.RouteType {
1153
for r := range a.routes {
1154
resources = append(resources, r)
1158
_ = a.stream.Send(&discovery.DiscoveryRequest{
1159
ResponseNonce: msg.Nonce,
1160
TypeUrl: msg.TypeUrl,
1162
VersionInfo: msg.VersionInfo,
1163
ResourceNames: resources,
1168
func (a *ADSC) GetHTTPListeners() map[string]*listener.Listener {
1170
defer a.mutex.Unlock()
1171
return a.httpListeners
1175
func (a *ADSC) GetTCPListeners() map[string]*listener.Listener {
1177
defer a.mutex.RUnlock()
1178
return a.tcpListeners
1182
func (a *ADSC) GetEdsClusters() map[string]*cluster.Cluster {
1184
defer a.mutex.RUnlock()
1185
return a.edsClusters
1189
func (a *ADSC) GetClusters() map[string]*cluster.Cluster {
1191
defer a.mutex.RUnlock()
1196
func (a *ADSC) GetRoutes() map[string]*route.RouteConfiguration {
1198
defer a.mutex.RUnlock()
1203
func (a *ADSC) GetEndpoints() map[string]*endpoint.ClusterLoadAssignment {
1205
defer a.mutex.RUnlock()
1209
func (a *ADSC) handleMCP(groupVersionKind config.GroupVersionKind, resources []*anypb.Any) {
1215
existingConfigs := a.Store.List(groupVersionKind, "")
1217
received := make(map[string]*config.Config)
1218
for _, rsc := range resources {
1219
m := &mcp.Resource{}
1220
err := rsc.UnmarshalTo(m)
1222
adscLog.Warnf("Error unmarshalling received MCP config %v", err)
1225
newCfg, err := a.mcpToPilot(m)
1227
adscLog.Warnf("Invalid data: %v (%v)", err, string(rsc.Value))
1233
received[newCfg.Namespace+"/"+newCfg.Name] = newCfg
1235
newCfg.GroupVersionKind = groupVersionKind
1236
oldCfg := a.Store.Get(newCfg.GroupVersionKind, newCfg.Name, newCfg.Namespace)
1239
if _, err = a.Store.Create(*newCfg); err != nil {
1240
adscLog.Warnf("Error adding a new resource to the store %v", err)
1243
} else if oldCfg.ResourceVersion != newCfg.ResourceVersion || newCfg.ResourceVersion == "" {
1245
newCfg.Annotations[mem.ResourceVersion] = newCfg.ResourceVersion
1246
newCfg.ResourceVersion = oldCfg.ResourceVersion
1247
if _, err = a.Store.Update(*newCfg); err != nil {
1248
adscLog.Warnf("Error updating an existing resource in the store %v", err)
1252
if a.LocalCacheDir != "" {
1253
strResponse, err := json.MarshalIndent(newCfg, " ", " ")
1255
adscLog.Warnf("Error marshaling received MCP config %v", err)
1258
err = os.WriteFile(a.LocalCacheDir+"_res."+
1259
newCfg.GroupVersionKind.Kind+"."+newCfg.Namespace+"."+newCfg.Name+".json", strResponse, 0o644)
1261
adscLog.Warnf("Error writing received MCP config to local file %v", err)
1267
for _, config := range existingConfigs {
1268
if _, ok := received[config.Namespace+"/"+config.Name]; !ok {
1269
if err := a.Store.Delete(config.GroupVersionKind, config.Name, config.Namespace, nil); err != nil {
1270
adscLog.Warnf("Error deleting an outdated resource from the store %v", err)
1273
if a.LocalCacheDir != "" {
1274
err := os.Remove(a.LocalCacheDir + "_res." +
1275
config.GroupVersionKind.Kind + "." + config.Namespace + "." + config.Name + ".json")
1277
adscLog.Warnf("Error deleting received MCP config to local file %v", err)