kuma

Форк
0
/
forwarding_kds_client.go 
219 строк · 7.1 Кб
1
package envoyadmin
2

3
import (
4
	"context"
5
	"fmt"
6
	"reflect"
7

8
	"github.com/pkg/errors"
9

10
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
11
	"github.com/kumahq/kuma/pkg/core"
12
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
13
	core_system "github.com/kumahq/kuma/pkg/core/resources/apis/system"
14
	"github.com/kumahq/kuma/pkg/core/resources/manager"
15
	core_model "github.com/kumahq/kuma/pkg/core/resources/model"
16
	core_store "github.com/kumahq/kuma/pkg/core/resources/store"
17
	"github.com/kumahq/kuma/pkg/envoy/admin"
18
	"github.com/kumahq/kuma/pkg/intercp/catalog"
19
	"github.com/kumahq/kuma/pkg/kds/service"
20
	"github.com/kumahq/kuma/pkg/multitenant"
21
)
22

23
var clientLog = core.Log.WithName("intercp").WithName("envoyadmin").WithName("client")
24

25
type NewClientFn = func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error)
26

27
type forwardingKdsEnvoyAdminClient struct {
28
	resManager     manager.ReadOnlyResourceManager
29
	cat            catalog.Catalog
30
	instanceID     string
31
	newClientFn    NewClientFn
32
	fallbackClient admin.EnvoyAdminClient
33
}
34

35
// NewForwardingEnvoyAdminClient returns EnvoyAdminClient which is only used on Global CP in multizone environment.
36
// It forwards the request to an instance of the Global CP to which Zone CP of given DPP is connected.
37
//
38
// For example:
39
// We have 2 instances of Global CP (ins-1, ins-2). Dataplane "backend" is in zone "east".
40
// The leader CP of zone "east" is connected to ins-1.
41
// If we execute config dump for "backend" on ins-1, we follow the regular flow of pkg/envoy/admin/kds_client.go
42
// If we execute config dump for "backend" on ins-2, we forward the request to ins-1 and then execute the regular flow.
43
func NewForwardingEnvoyAdminClient(
44
	resManager manager.ReadOnlyResourceManager,
45
	cat catalog.Catalog,
46
	instanceID string,
47
	newClientFn NewClientFn,
48
	fallbackClient admin.EnvoyAdminClient,
49
) admin.EnvoyAdminClient {
50
	return &forwardingKdsEnvoyAdminClient{
51
		resManager:     resManager,
52
		cat:            cat,
53
		instanceID:     instanceID,
54
		newClientFn:    newClientFn,
55
		fallbackClient: fallbackClient,
56
	}
57
}
58

59
var _ admin.EnvoyAdminClient = &forwardingKdsEnvoyAdminClient{}
60

61
func (f *forwardingKdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
62
	panic("not implemented")
63
}
64

65
func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
66
	ctx = appendTenantMetadata(ctx)
67
	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ConfigDumpRPC)
68
	if err != nil {
69
		return nil, err
70
	}
71
	f.logIntendedAction(proxy, instanceID)
72
	if instanceID == f.instanceID {
73
		return f.fallbackClient.ConfigDump(ctx, proxy)
74
	}
75
	client, err := f.clientForInstanceID(ctx, instanceID)
76
	if err != nil {
77
		return nil, err
78
	}
79
	req := &mesh_proto.XDSConfigRequest{
80
		ResourceType: string(proxy.Descriptor().Name),
81
		ResourceName: proxy.GetMeta().GetName(),
82
		ResourceMesh: proxy.GetMeta().GetMesh(),
83
	}
84
	resp, err := client.XDSConfig(ctx, req)
85
	if err != nil {
86
		return nil, err
87
	}
88
	if resp != nil && resp.GetError() != "" {
89
		return nil, &ForwardKDSRequestError{reason: resp.GetError()}
90
	}
91
	return resp.GetConfig(), nil
92
}
93

94
func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
95
	ctx = appendTenantMetadata(ctx)
96
	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.StatsRPC)
97
	if err != nil {
98
		return nil, err
99
	}
100
	f.logIntendedAction(proxy, instanceID)
101
	if instanceID == f.instanceID {
102
		return f.fallbackClient.Stats(ctx, proxy)
103
	}
104
	client, err := f.clientForInstanceID(ctx, instanceID)
105
	if err != nil {
106
		return nil, err
107
	}
108
	req := &mesh_proto.StatsRequest{
109
		ResourceType: string(proxy.Descriptor().Name),
110
		ResourceName: proxy.GetMeta().GetName(),
111
		ResourceMesh: proxy.GetMeta().GetMesh(),
112
	}
113
	resp, err := client.Stats(ctx, req)
114
	if err != nil {
115
		return nil, err
116
	}
117
	if resp != nil && resp.GetError() != "" {
118
		return nil, &ForwardKDSRequestError{reason: resp.GetError()}
119
	}
120
	return resp.GetStats(), nil
121
}
122

123
func (f *forwardingKdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
124
	ctx = appendTenantMetadata(ctx)
125
	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ClustersRPC)
126
	if err != nil {
127
		return nil, err
128
	}
129
	f.logIntendedAction(proxy, instanceID)
130
	if instanceID == f.instanceID {
131
		return f.fallbackClient.Clusters(ctx, proxy)
132
	}
133
	client, err := f.clientForInstanceID(ctx, instanceID)
134
	if err != nil {
135
		return nil, err
136
	}
137
	req := &mesh_proto.ClustersRequest{
138
		ResourceType: string(proxy.Descriptor().Name),
139
		ResourceName: proxy.GetMeta().GetName(),
140
		ResourceMesh: proxy.GetMeta().GetMesh(),
141
	}
142
	resp, err := client.Clusters(ctx, req)
143
	if err != nil {
144
		return nil, err
145
	}
146
	if resp != nil && resp.GetError() != "" {
147
		return nil, &ForwardKDSRequestError{reason: resp.GetError()}
148
	}
149
	return resp.GetClusters(), nil
150
}
151

152
func (f *forwardingKdsEnvoyAdminClient) logIntendedAction(proxy core_model.ResourceWithAddress, instanceID string) {
153
	log := clientLog.WithValues(
154
		"name", proxy.GetMeta().GetName(),
155
		"mesh", proxy.GetMeta().GetMesh(),
156
		"type", proxy.Descriptor().Name,
157
		"instanceID", instanceID,
158
	)
159
	if instanceID == f.instanceID {
160
		log.V(1).Info("zone CP of the resource is connected to this Global CP instance. Executing operation")
161
	} else {
162
		log.V(1).Info("zone CP of the resource is connected to other Global CP instance. Forwarding the request")
163
	}
164
}
165

166
func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string, rpcName string) (string, error) {
167
	zoneInsightRes := core_system.NewZoneInsightResource()
168
	if err := f.resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
169
		return "", err
170
	}
171
	streams := zoneInsightRes.Spec.GetEnvoyAdminStreams()
172
	var globalInstanceID string
173
	switch rpcName {
174
	case service.ConfigDumpRPC:
175
		globalInstanceID = streams.GetConfigDumpGlobalInstanceId()
176
	case service.StatsRPC:
177
		globalInstanceID = streams.GetStatsGlobalInstanceId()
178
	case service.ClustersRPC:
179
		globalInstanceID = streams.GetClustersGlobalInstanceId()
180
	default:
181
		return "", errors.Errorf("invalid operation %s", rpcName)
182
	}
183
	if globalInstanceID == "" {
184
		return "", &StreamNotConnectedError{rpcName: rpcName}
185
	}
186
	return globalInstanceID, nil
187
}
188

189
func (f *forwardingKdsEnvoyAdminClient) clientForInstanceID(ctx context.Context, instanceID string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
190
	instance, err := catalog.InstanceOfID(multitenant.WithTenant(ctx, multitenant.GlobalTenantID), f.cat, instanceID)
191
	if err != nil {
192
		return nil, err
193
	}
194
	return f.newClientFn(instance.InterCpURL())
195
}
196

197
type StreamNotConnectedError struct {
198
	rpcName string
199
}
200

201
func (e *StreamNotConnectedError) Error() string {
202
	return fmt.Sprintf("stream to execute %s operations is not yet connected", e.rpcName)
203
}
204

205
func (e *StreamNotConnectedError) Is(err error) bool {
206
	return reflect.TypeOf(e) == reflect.TypeOf(err)
207
}
208

209
type ForwardKDSRequestError struct {
210
	reason string
211
}
212

213
func (e *ForwardKDSRequestError) Error() string {
214
	return e.reason
215
}
216

217
func (e *ForwardKDSRequestError) Is(err error) bool {
218
	return reflect.TypeOf(e) == reflect.TypeOf(err)
219
}
220

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.