kuma
219 строк · 7.1 Кб
1package envoyadmin
2
3import (
4"context"
5"fmt"
6"reflect"
7
8"github.com/pkg/errors"
9
10mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
11"github.com/kumahq/kuma/pkg/core"
12core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
13core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
14"github.com/kumahq/kuma/pkg/core/resources/manager"
15core_model "github.com/kumahq/kuma/pkg/core/resources/model"
16core_store "github.com/kumahq/kuma/pkg/core/resources/store"
17"github.com/kumahq/kuma/pkg/envoy/admin"
18"github.com/kumahq/kuma/pkg/intercp/catalog"
19"github.com/kumahq/kuma/pkg/kds/service"
20"github.com/kumahq/kuma/pkg/multitenant"
21)
22
23var clientLog = core.Log.WithName("intercp").WithName("envoyadmin").WithName("client")
24
25type NewClientFn = func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error)
26
27type forwardingKdsEnvoyAdminClient struct {
28resManager manager.ReadOnlyResourceManager
29cat catalog.Catalog
30instanceID string
31newClientFn NewClientFn
32fallbackClient admin.EnvoyAdminClient
33}
34
35// NewForwardingEnvoyAdminClient returns EnvoyAdminClient which is only used on Global CP in multizone environment.
36// It forwards the request to an instance of the Global CP to which Zone CP of given DPP is connected.
37//
38// For example:
39// We have 2 instances of Global CP (ins-1, ins-2). Dataplane "backend" is in zone "east".
40// The leader CP of zone "east" is connected to ins-1.
41// If we execute config dump for "backend" on ins-1, we follow the regular flow of pkg/envoy/admin/kds_client.go
42// If we execute config dump for "backend" on ins-2, we forward the request to ins-1 and then execute the regular flow.
43func NewForwardingEnvoyAdminClient(
44resManager manager.ReadOnlyResourceManager,
45cat catalog.Catalog,
46instanceID string,
47newClientFn NewClientFn,
48fallbackClient admin.EnvoyAdminClient,
49) admin.EnvoyAdminClient {
50return &forwardingKdsEnvoyAdminClient{
51resManager: resManager,
52cat: cat,
53instanceID: instanceID,
54newClientFn: newClientFn,
55fallbackClient: fallbackClient,
56}
57}
58
59var _ admin.EnvoyAdminClient = &forwardingKdsEnvoyAdminClient{}
60
61func (f *forwardingKdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
62panic("not implemented")
63}
64
65func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
66ctx = appendTenantMetadata(ctx)
67instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ConfigDumpRPC)
68if err != nil {
69return nil, err
70}
71f.logIntendedAction(proxy, instanceID)
72if instanceID == f.instanceID {
73return f.fallbackClient.ConfigDump(ctx, proxy)
74}
75client, err := f.clientForInstanceID(ctx, instanceID)
76if err != nil {
77return nil, err
78}
79req := &mesh_proto.XDSConfigRequest{
80ResourceType: string(proxy.Descriptor().Name),
81ResourceName: proxy.GetMeta().GetName(),
82ResourceMesh: proxy.GetMeta().GetMesh(),
83}
84resp, err := client.XDSConfig(ctx, req)
85if err != nil {
86return nil, err
87}
88if resp != nil && resp.GetError() != "" {
89return nil, &ForwardKDSRequestError{reason: resp.GetError()}
90}
91return resp.GetConfig(), nil
92}
93
94func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
95ctx = appendTenantMetadata(ctx)
96instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.StatsRPC)
97if err != nil {
98return nil, err
99}
100f.logIntendedAction(proxy, instanceID)
101if instanceID == f.instanceID {
102return f.fallbackClient.Stats(ctx, proxy)
103}
104client, err := f.clientForInstanceID(ctx, instanceID)
105if err != nil {
106return nil, err
107}
108req := &mesh_proto.StatsRequest{
109ResourceType: string(proxy.Descriptor().Name),
110ResourceName: proxy.GetMeta().GetName(),
111ResourceMesh: proxy.GetMeta().GetMesh(),
112}
113resp, err := client.Stats(ctx, req)
114if err != nil {
115return nil, err
116}
117if resp != nil && resp.GetError() != "" {
118return nil, &ForwardKDSRequestError{reason: resp.GetError()}
119}
120return resp.GetStats(), nil
121}
122
123func (f *forwardingKdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
124ctx = appendTenantMetadata(ctx)
125instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ClustersRPC)
126if err != nil {
127return nil, err
128}
129f.logIntendedAction(proxy, instanceID)
130if instanceID == f.instanceID {
131return f.fallbackClient.Clusters(ctx, proxy)
132}
133client, err := f.clientForInstanceID(ctx, instanceID)
134if err != nil {
135return nil, err
136}
137req := &mesh_proto.ClustersRequest{
138ResourceType: string(proxy.Descriptor().Name),
139ResourceName: proxy.GetMeta().GetName(),
140ResourceMesh: proxy.GetMeta().GetMesh(),
141}
142resp, err := client.Clusters(ctx, req)
143if err != nil {
144return nil, err
145}
146if resp != nil && resp.GetError() != "" {
147return nil, &ForwardKDSRequestError{reason: resp.GetError()}
148}
149return resp.GetClusters(), nil
150}
151
152func (f *forwardingKdsEnvoyAdminClient) logIntendedAction(proxy core_model.ResourceWithAddress, instanceID string) {
153log := clientLog.WithValues(
154"name", proxy.GetMeta().GetName(),
155"mesh", proxy.GetMeta().GetMesh(),
156"type", proxy.Descriptor().Name,
157"instanceID", instanceID,
158)
159if instanceID == f.instanceID {
160log.V(1).Info("zone CP of the resource is connected to this Global CP instance. Executing operation")
161} else {
162log.V(1).Info("zone CP of the resource is connected to other Global CP instance. Forwarding the request")
163}
164}
165
166func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string, rpcName string) (string, error) {
167zoneInsightRes := core_system.NewZoneInsightResource()
168if err := f.resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
169return "", err
170}
171streams := zoneInsightRes.Spec.GetEnvoyAdminStreams()
172var globalInstanceID string
173switch rpcName {
174case service.ConfigDumpRPC:
175globalInstanceID = streams.GetConfigDumpGlobalInstanceId()
176case service.StatsRPC:
177globalInstanceID = streams.GetStatsGlobalInstanceId()
178case service.ClustersRPC:
179globalInstanceID = streams.GetClustersGlobalInstanceId()
180default:
181return "", errors.Errorf("invalid operation %s", rpcName)
182}
183if globalInstanceID == "" {
184return "", &StreamNotConnectedError{rpcName: rpcName}
185}
186return globalInstanceID, nil
187}
188
189func (f *forwardingKdsEnvoyAdminClient) clientForInstanceID(ctx context.Context, instanceID string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
190instance, err := catalog.InstanceOfID(multitenant.WithTenant(ctx, multitenant.GlobalTenantID), f.cat, instanceID)
191if err != nil {
192return nil, err
193}
194return f.newClientFn(instance.InterCpURL())
195}
196
197type StreamNotConnectedError struct {
198rpcName string
199}
200
201func (e *StreamNotConnectedError) Error() string {
202return fmt.Sprintf("stream to execute %s operations is not yet connected", e.rpcName)
203}
204
205func (e *StreamNotConnectedError) Is(err error) bool {
206return reflect.TypeOf(e) == reflect.TypeOf(err)
207}
208
209type ForwardKDSRequestError struct {
210reason string
211}
212
213func (e *ForwardKDSRequestError) Error() string {
214return e.reason
215}
216
217func (e *ForwardKDSRequestError) Is(err error) bool {
218return reflect.TypeOf(e) == reflect.TypeOf(err)
219}
220