9
"github.com/pkg/errors"
11
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
12
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
13
config_util "github.com/kumahq/kuma/pkg/config"
14
config_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
15
"github.com/kumahq/kuma/pkg/config/core/resources/store"
16
"github.com/kumahq/kuma/pkg/core"
17
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
18
core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
19
"github.com/kumahq/kuma/pkg/core/resources/manager"
20
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
21
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
22
"github.com/kumahq/kuma/pkg/kds/service"
23
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
24
"github.com/kumahq/kuma/pkg/util/k8s"
27
type kdsEnvoyAdminClient struct {
28
rpcs service.EnvoyAdminRPCs
29
resManager manager.ReadOnlyResourceManager
32
func NewKDSEnvoyAdminClient(rpcs service.EnvoyAdminRPCs, resManager manager.ReadOnlyResourceManager) EnvoyAdminClient {
33
return &kdsEnvoyAdminClient{
35
resManager: resManager,
39
var _ EnvoyAdminClient = &kdsEnvoyAdminClient{}
41
func (k *kdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
42
panic("not implemented")
45
func (k *kdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
46
zone := core_model.ZoneOfResource(proxy)
47
nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
49
return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: err.Error()}
51
reqId := core.NewUUID()
52
tenantZoneID := service.TenantZoneClientIDFromCtx(ctx, zone)
54
err = k.rpcs.XDSConfigDump.Send(tenantZoneID.String(), &mesh_proto.XDSConfigRequest{
56
ResourceType: string(proxy.Descriptor().Name),
57
ResourceName: nameInZone, // send the name which without the added prefix
58
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
61
return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: err.Error()}
64
defer k.rpcs.XDSConfigDump.DeleteWatch(tenantZoneID.String(), reqId)
65
ch := make(chan util_grpc.ReverseUnaryMessage)
66
if err := k.rpcs.XDSConfigDump.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
67
return nil, errors.Wrapf(err, "could not watch the response")
74
configResp, ok := resp.(*mesh_proto.XDSConfigResponse)
76
return nil, errors.New("invalid request type")
78
if configResp.GetError() != "" {
79
return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: configResp.GetError()}
81
return configResp.GetConfig(), nil
85
func (k *kdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
86
zone := core_model.ZoneOfResource(proxy)
87
nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
89
return nil, &KDSTransportError{requestType: "StatsRequest", reason: err.Error()}
91
reqId := core.NewUUID()
92
tenantZoneId := service.TenantZoneClientIDFromCtx(ctx, zone)
94
err = k.rpcs.Stats.Send(tenantZoneId.String(), &mesh_proto.StatsRequest{
96
ResourceType: string(proxy.Descriptor().Name),
97
ResourceName: nameInZone, // send the name which without the added prefix
98
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
101
return nil, &KDSTransportError{requestType: "StatsRequest", reason: err.Error()}
104
defer k.rpcs.Stats.DeleteWatch(tenantZoneId.String(), reqId)
105
ch := make(chan util_grpc.ReverseUnaryMessage)
106
if err := k.rpcs.Stats.WatchResponse(tenantZoneId.String(), reqId, ch); err != nil {
107
return nil, errors.Wrapf(err, "could not watch the response")
112
return nil, ctx.Err()
114
statsResp, ok := resp.(*mesh_proto.StatsResponse)
116
return nil, errors.New("invalid request type")
118
if statsResp.GetError() != "" {
119
return nil, &KDSTransportError{requestType: "StatsRequest", reason: statsResp.GetError()}
121
return statsResp.GetStats(), nil
125
func (k *kdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
126
zone := core_model.ZoneOfResource(proxy)
127
nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
129
return nil, &KDSTransportError{requestType: "ClustersRequest", reason: err.Error()}
131
reqId := core.NewUUID()
132
tenantZoneID := service.TenantZoneClientIDFromCtx(ctx, zone)
134
err = k.rpcs.Clusters.Send(tenantZoneID.String(), &mesh_proto.ClustersRequest{
136
ResourceType: string(proxy.Descriptor().Name),
137
ResourceName: nameInZone, // send the name which without the added prefix
138
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
141
return nil, &KDSTransportError{requestType: "ClustersRequest", reason: err.Error()}
144
defer k.rpcs.Clusters.DeleteWatch(tenantZoneID.String(), reqId)
145
ch := make(chan util_grpc.ReverseUnaryMessage)
146
if err := k.rpcs.Clusters.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
147
return nil, errors.Wrapf(err, "could not watch the response")
152
return nil, ctx.Err()
154
clustersResp, ok := resp.(*mesh_proto.ClustersResponse)
156
return nil, errors.New("invalid request type")
158
if clustersResp.GetError() != "" {
159
return nil, &KDSTransportError{requestType: "ClustersRequest", reason: clustersResp.GetError()}
161
return clustersResp.GetClusters(), nil
167
resManager manager.ReadOnlyResourceManager,
168
r core_model.Resource,
170
name := core_model.GetDisplayName(r)
171
zone := core_model.ZoneOfResource(r)
172
// we need to check for the legacy name which starts with zoneName
173
if strings.HasPrefix(r.GetMeta().GetName(), zone) {
176
storeType, err := getZoneStoreType(ctx, resManager, zone)
180
// only K8s needs namespace added to the resource name
181
if storeType != store.KubernetesStore {
185
if ns := r.GetMeta().GetLabels()[mesh_proto.KubeNamespaceTag]; ns != "" {
186
name = k8s.K8sNamespacedNameToCoreName(name, ns)
191
func getZoneStoreType(
193
resManager manager.ReadOnlyResourceManager,
195
) (store.StoreType, error) {
196
zoneInsightRes := core_system.NewZoneInsightResource()
197
if err := resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
200
subscription := zoneInsightRes.Spec.GetLastSubscription()
201
if !subscription.IsOnline() {
202
return "", fmt.Errorf("zone is offline")
204
kdsSubscription, ok := subscription.(*system_proto.KDSSubscription)
206
return "", fmt.Errorf("cannot map subscription")
208
config := kdsSubscription.GetConfig()
209
cfg := &config_cp.Config{}
210
if err := config_util.FromYAML([]byte(config), cfg); err != nil {
211
return "", fmt.Errorf("cannot read control-plane configuration")
213
return cfg.Store.Type, nil
216
type KDSTransportError struct {
221
func (e *KDSTransportError) Error() string {
223
return fmt.Sprintf("could not send %s", e.requestType)
225
return fmt.Sprintf("could not send %s: %s", e.requestType, e.reason)
229
func (e *KDSTransportError) Is(err error) bool {
230
return reflect.TypeOf(e) == reflect.TypeOf(err)