kuma

Форк
0
/
kds_client.go 
231 строка · 7.6 Кб
1
package admin
2

3
import (
4
	"context"
5
	"fmt"
6
	"reflect"
7
	"strings"
8

9
	"github.com/pkg/errors"
10

11
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
12
	system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
13
	config_util "github.com/kumahq/kuma/pkg/config"
14
	config_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
15
	"github.com/kumahq/kuma/pkg/config/core/resources/store"
16
	"github.com/kumahq/kuma/pkg/core"
17
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
18
	core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
19
	"github.com/kumahq/kuma/pkg/core/resources/manager"
20
	core_model "github.com/kumahq/kuma/pkg/core/resources/model"
21
	core_store "github.com/kumahq/kuma/pkg/core/resources/store"
22
	"github.com/kumahq/kuma/pkg/kds/service"
23
	util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
24
	"github.com/kumahq/kuma/pkg/util/k8s"
25
)
26

27
type kdsEnvoyAdminClient struct {
28
	rpcs       service.EnvoyAdminRPCs
29
	resManager manager.ReadOnlyResourceManager
30
}
31

32
func NewKDSEnvoyAdminClient(rpcs service.EnvoyAdminRPCs, resManager manager.ReadOnlyResourceManager) EnvoyAdminClient {
33
	return &kdsEnvoyAdminClient{
34
		rpcs:       rpcs,
35
		resManager: resManager,
36
	}
37
}
38

39
var _ EnvoyAdminClient = &kdsEnvoyAdminClient{}
40

41
func (k *kdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
42
	panic("not implemented")
43
}
44

45
func (k *kdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
46
	zone := core_model.ZoneOfResource(proxy)
47
	nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
48
	if err != nil {
49
		return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: err.Error()}
50
	}
51
	reqId := core.NewUUID()
52
	tenantZoneID := service.TenantZoneClientIDFromCtx(ctx, zone)
53

54
	err = k.rpcs.XDSConfigDump.Send(tenantZoneID.String(), &mesh_proto.XDSConfigRequest{
55
		RequestId:    reqId,
56
		ResourceType: string(proxy.Descriptor().Name),
57
		ResourceName: nameInZone,                // send the name which without the added prefix
58
		ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
59
	})
60
	if err != nil {
61
		return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: err.Error()}
62
	}
63

64
	defer k.rpcs.XDSConfigDump.DeleteWatch(tenantZoneID.String(), reqId)
65
	ch := make(chan util_grpc.ReverseUnaryMessage)
66
	if err := k.rpcs.XDSConfigDump.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
67
		return nil, errors.Wrapf(err, "could not watch the response")
68
	}
69

70
	select {
71
	case <-ctx.Done():
72
		return nil, ctx.Err()
73
	case resp := <-ch:
74
		configResp, ok := resp.(*mesh_proto.XDSConfigResponse)
75
		if !ok {
76
			return nil, errors.New("invalid request type")
77
		}
78
		if configResp.GetError() != "" {
79
			return nil, &KDSTransportError{requestType: "XDSConfigRequest", reason: configResp.GetError()}
80
		}
81
		return configResp.GetConfig(), nil
82
	}
83
}
84

85
func (k *kdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
86
	zone := core_model.ZoneOfResource(proxy)
87
	nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
88
	if err != nil {
89
		return nil, &KDSTransportError{requestType: "StatsRequest", reason: err.Error()}
90
	}
91
	reqId := core.NewUUID()
92
	tenantZoneId := service.TenantZoneClientIDFromCtx(ctx, zone)
93

94
	err = k.rpcs.Stats.Send(tenantZoneId.String(), &mesh_proto.StatsRequest{
95
		RequestId:    reqId,
96
		ResourceType: string(proxy.Descriptor().Name),
97
		ResourceName: nameInZone,                // send the name which without the added prefix
98
		ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
99
	})
100
	if err != nil {
101
		return nil, &KDSTransportError{requestType: "StatsRequest", reason: err.Error()}
102
	}
103

104
	defer k.rpcs.Stats.DeleteWatch(tenantZoneId.String(), reqId)
105
	ch := make(chan util_grpc.ReverseUnaryMessage)
106
	if err := k.rpcs.Stats.WatchResponse(tenantZoneId.String(), reqId, ch); err != nil {
107
		return nil, errors.Wrapf(err, "could not watch the response")
108
	}
109

110
	select {
111
	case <-ctx.Done():
112
		return nil, ctx.Err()
113
	case resp := <-ch:
114
		statsResp, ok := resp.(*mesh_proto.StatsResponse)
115
		if !ok {
116
			return nil, errors.New("invalid request type")
117
		}
118
		if statsResp.GetError() != "" {
119
			return nil, &KDSTransportError{requestType: "StatsRequest", reason: statsResp.GetError()}
120
		}
121
		return statsResp.GetStats(), nil
122
	}
123
}
124

125
func (k *kdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
126
	zone := core_model.ZoneOfResource(proxy)
127
	nameInZone, err := resNameInZone(ctx, k.resManager, proxy)
128
	if err != nil {
129
		return nil, &KDSTransportError{requestType: "ClustersRequest", reason: err.Error()}
130
	}
131
	reqId := core.NewUUID()
132
	tenantZoneID := service.TenantZoneClientIDFromCtx(ctx, zone)
133

134
	err = k.rpcs.Clusters.Send(tenantZoneID.String(), &mesh_proto.ClustersRequest{
135
		RequestId:    reqId,
136
		ResourceType: string(proxy.Descriptor().Name),
137
		ResourceName: nameInZone,                // send the name which without the added prefix
138
		ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
139
	})
140
	if err != nil {
141
		return nil, &KDSTransportError{requestType: "ClustersRequest", reason: err.Error()}
142
	}
143

144
	defer k.rpcs.Clusters.DeleteWatch(tenantZoneID.String(), reqId)
145
	ch := make(chan util_grpc.ReverseUnaryMessage)
146
	if err := k.rpcs.Clusters.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
147
		return nil, errors.Wrapf(err, "could not watch the response")
148
	}
149

150
	select {
151
	case <-ctx.Done():
152
		return nil, ctx.Err()
153
	case resp := <-ch:
154
		clustersResp, ok := resp.(*mesh_proto.ClustersResponse)
155
		if !ok {
156
			return nil, errors.New("invalid request type")
157
		}
158
		if clustersResp.GetError() != "" {
159
			return nil, &KDSTransportError{requestType: "ClustersRequest", reason: clustersResp.GetError()}
160
		}
161
		return clustersResp.GetClusters(), nil
162
	}
163
}
164

165
func resNameInZone(
166
	ctx context.Context,
167
	resManager manager.ReadOnlyResourceManager,
168
	r core_model.Resource,
169
) (string, error) {
170
	name := core_model.GetDisplayName(r)
171
	zone := core_model.ZoneOfResource(r)
172
	// we need to check for the legacy name which starts with zoneName
173
	if strings.HasPrefix(r.GetMeta().GetName(), zone) {
174
		return name, nil
175
	}
176
	storeType, err := getZoneStoreType(ctx, resManager, zone)
177
	if err != nil {
178
		return "", err
179
	}
180
	// only K8s needs namespace added to the resource name
181
	if storeType != store.KubernetesStore {
182
		return name, nil
183
	}
184

185
	if ns := r.GetMeta().GetLabels()[mesh_proto.KubeNamespaceTag]; ns != "" {
186
		name = k8s.K8sNamespacedNameToCoreName(name, ns)
187
	}
188
	return name, nil
189
}
190

191
func getZoneStoreType(
192
	ctx context.Context,
193
	resManager manager.ReadOnlyResourceManager,
194
	zone string,
195
) (store.StoreType, error) {
196
	zoneInsightRes := core_system.NewZoneInsightResource()
197
	if err := resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
198
		return "", err
199
	}
200
	subscription := zoneInsightRes.Spec.GetLastSubscription()
201
	if !subscription.IsOnline() {
202
		return "", fmt.Errorf("zone is offline")
203
	}
204
	kdsSubscription, ok := subscription.(*system_proto.KDSSubscription)
205
	if !ok {
206
		return "", fmt.Errorf("cannot map subscription")
207
	}
208
	config := kdsSubscription.GetConfig()
209
	cfg := &config_cp.Config{}
210
	if err := config_util.FromYAML([]byte(config), cfg); err != nil {
211
		return "", fmt.Errorf("cannot read control-plane configuration")
212
	}
213
	return cfg.Store.Type, nil
214
}
215

216
type KDSTransportError struct {
217
	requestType string
218
	reason      string
219
}
220

221
func (e *KDSTransportError) Error() string {
222
	if e.reason == "" {
223
		return fmt.Sprintf("could not send %s", e.requestType)
224
	} else {
225
		return fmt.Sprintf("could not send %s: %s", e.requestType, e.reason)
226
	}
227
}
228

229
func (e *KDSTransportError) Is(err error) bool {
230
	return reflect.TypeOf(e) == reflect.TypeOf(err)
231
}
232

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.