kuma

Форк
0
/
context.go 
271 строка · 9.7 Кб
1
package context
2

3
import (
4
	"context"
5
	"fmt"
6
	"reflect"
7
	"strings"
8

9
	"github.com/pkg/errors"
10
	"google.golang.org/grpc"
11
	"google.golang.org/protobuf/proto"
12
	"google.golang.org/protobuf/reflect/protoreflect"
13
	"google.golang.org/protobuf/types/known/wrapperspb"
14

15
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
16
	system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
17
	kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
18
	config_core "github.com/kumahq/kuma/pkg/config/core"
19
	"github.com/kumahq/kuma/pkg/core"
20
	config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
21
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
22
	"github.com/kumahq/kuma/pkg/core/resources/apis/system"
23
	"github.com/kumahq/kuma/pkg/core/resources/manager"
24
	core_model "github.com/kumahq/kuma/pkg/core/resources/model"
25
	"github.com/kumahq/kuma/pkg/core/resources/store"
26
	"github.com/kumahq/kuma/pkg/kds"
27
	"github.com/kumahq/kuma/pkg/kds/hash"
28
	"github.com/kumahq/kuma/pkg/kds/mux"
29
	"github.com/kumahq/kuma/pkg/kds/reconcile"
30
	"github.com/kumahq/kuma/pkg/kds/service"
31
	"github.com/kumahq/kuma/pkg/kds/util"
32
	zone_tokens "github.com/kumahq/kuma/pkg/tokens/builtin/zone"
33
	"github.com/kumahq/kuma/pkg/tokens/builtin/zoneingress"
34
	"github.com/kumahq/kuma/pkg/util/rsa"
35
)
36

37
var log = core.Log.WithName("kds")
38

39
type Context struct {
40
	ZoneClientCtx         context.Context
41
	GlobalProvidedFilter  reconcile.ResourceFilter
42
	ZoneProvidedFilter    reconcile.ResourceFilter
43
	GlobalServerFilters   []mux.Filter
44
	GlobalServerFiltersV2 []mux.FilterV2
45
	// Configs contains the names of system.ConfigResource that will be transferred from Global to Zone
46
	Configs map[string]bool
47

48
	GlobalResourceMapper reconcile.ResourceMapper
49
	ZoneResourceMapper   reconcile.ResourceMapper
50

51
	EnvoyAdminRPCs           service.EnvoyAdminRPCs
52
	ServerStreamInterceptors []grpc.StreamServerInterceptor
53
	ServerUnaryInterceptor   []grpc.UnaryServerInterceptor
54
}
55

56
func DefaultContext(
57
	ctx context.Context,
58
	manager manager.ResourceManager,
59
	cfg kuma_cp.Config,
60
) *Context {
61
	configs := map[string]bool{
62
		config_manager.ClusterIdConfigKey: true,
63
	}
64

65
	globalMappers := []reconcile.ResourceMapper{
66
		UpdateResourceMeta(util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.GlobalResourceOrigin))),
67
		reconcile.If(
68
			reconcile.And(
69
				reconcile.TypeIs(system.GlobalSecretType),
70
				reconcile.NameHasPrefix(zone_tokens.SigningKeyPrefix),
71
			),
72
			MapZoneTokenSigningKeyGlobalToPublicKey),
73
		reconcile.If(
74
			reconcile.IsKubernetes(cfg.Store.Type),
75
			RemoveK8sSystemNamespaceSuffixMapper(cfg.Store.Kubernetes.SystemNamespace)),
76
		reconcile.If(
77
			reconcile.And(
78
				reconcile.ScopeIs(core_model.ScopeMesh),
79
				// secrets already named with mesh prefix for uniqueness on k8s, also Zone CP expects secret names to be in
80
				// particular format to be able to reference them
81
				reconcile.Not(reconcile.TypeIs(system.SecretType)),
82
			),
83
			HashSuffixMapper(true)),
84
	}
85

86
	zoneMappers := []reconcile.ResourceMapper{
87
		UpdateResourceMeta(
88
			util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.ZoneResourceOrigin)),
89
			util.WithLabel(mesh_proto.ZoneTag, cfg.Multizone.Zone.Name),
90
		),
91
		MapInsightResourcesZeroGeneration,
92
		reconcile.If(
93
			reconcile.IsKubernetes(cfg.Store.Type),
94
			RemoveK8sSystemNamespaceSuffixMapper(cfg.Store.Kubernetes.SystemNamespace)),
95
		HashSuffixMapper(false, mesh_proto.ZoneTag, mesh_proto.KubeNamespaceTag),
96
	}
97

98
	return &Context{
99
		ZoneClientCtx:        ctx,
100
		GlobalProvidedFilter: GlobalProvidedFilter(manager, configs),
101
		ZoneProvidedFilter:   ZoneProvidedFilter,
102
		Configs:              configs,
103
		GlobalResourceMapper: CompositeResourceMapper(globalMappers...),
104
		ZoneResourceMapper:   CompositeResourceMapper(zoneMappers...),
105
		EnvoyAdminRPCs:       service.NewEnvoyAdminRPCs(),
106
	}
107
}
108

109
// CompositeResourceMapper combines the given ResourceMappers into
110
// a single ResourceMapper which calls each in order. If an error
111
// occurs, the first one is returned and no further mappers are executed.
112
func CompositeResourceMapper(mappers ...reconcile.ResourceMapper) reconcile.ResourceMapper {
113
	return func(features kds.Features, r core_model.Resource) (core_model.Resource, error) {
114
		var err error
115
		for _, mapper := range mappers {
116
			if mapper == nil {
117
				continue
118
			}
119

120
			r, err = mapper(features, r)
121
			if err != nil {
122
				return r, err
123
			}
124
		}
125
		return r, nil
126
	}
127
}
128

129
type specWithDiscoverySubscriptions interface {
130
	GetSubscriptions() []*mesh_proto.DiscoverySubscription
131
	ProtoReflect() protoreflect.Message
132
}
133

134
// MapInsightResourcesZeroGeneration zeros "generation" field in resources for which
135
// the field has only local relevance. This prevents reconciliation from unnecessarily
136
// deeming the object to have changed.
137
func MapInsightResourcesZeroGeneration(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
138
	if spec, ok := r.GetSpec().(specWithDiscoverySubscriptions); ok {
139
		spec = proto.Clone(spec).(specWithDiscoverySubscriptions)
140
		for _, sub := range spec.GetSubscriptions() {
141
			sub.Generation = 0
142
		}
143

144
		meta := r.GetMeta()
145
		resType := reflect.TypeOf(r).Elem()
146

147
		newR := reflect.New(resType).Interface().(core_model.Resource)
148
		newR.SetMeta(meta)
149
		if err := newR.SetSpec(spec.(core_model.ResourceSpec)); err != nil {
150
			panic(any(errors.Wrap(err, "error setting spec on resource")))
151
		}
152

153
		return newR, nil
154
	}
155

156
	return r, nil
157
}
158

159
func MapZoneTokenSigningKeyGlobalToPublicKey(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
160
	signingKeyBytes := r.(*system.GlobalSecretResource).Spec.GetData().GetValue()
161
	publicKeyBytes, err := rsa.FromPrivateKeyPEMBytesToPublicKeyPEMBytes(signingKeyBytes)
162
	if err != nil {
163
		return nil, err
164
	}
165

166
	publicSigningKeyResource := system.NewGlobalSecretResource()
167
	newResName := strings.ReplaceAll(
168
		r.GetMeta().GetName(),
169
		zone_tokens.SigningKeyPrefix,
170
		zone_tokens.SigningPublicKeyPrefix,
171
	)
172
	publicSigningKeyResource.SetMeta(util.CloneResourceMeta(r.GetMeta(), util.WithName(newResName)))
173

174
	if err := publicSigningKeyResource.SetSpec(&system_proto.Secret{
175
		Data: &wrapperspb.BytesValue{Value: publicKeyBytes},
176
	}); err != nil {
177
		return nil, err
178
	}
179

180
	return publicSigningKeyResource, nil
181
}
182

183
// RemoveK8sSystemNamespaceSuffixMapper is a mapper responsible for removing control plane system namespace suffixes
184
// from names of resources if resources are stored in kubernetes.
185
func RemoveK8sSystemNamespaceSuffixMapper(k8sSystemNamespace string) reconcile.ResourceMapper {
186
	return func(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
187
		newObj := r.Descriptor().NewObject()
188
		dotSuffix := fmt.Sprintf(".%s", k8sSystemNamespace)
189
		newName := strings.TrimSuffix(r.GetMeta().GetName(), dotSuffix)
190
		newMeta := util.CloneResourceMeta(r.GetMeta(), util.WithName(newName))
191
		newObj.SetMeta(newMeta)
192
		_ = newObj.SetSpec(r.GetSpec())
193
		return newObj, nil
194
	}
195
}
196

197
// HashSuffixMapper returns mapper that adds a hash suffix to the name during KDS sync
198
func HashSuffixMapper(checkKDSFeature bool, labelsToUse ...string) reconcile.ResourceMapper {
199
	return func(features kds.Features, r core_model.Resource) (core_model.Resource, error) {
200
		if checkKDSFeature && !features.HasFeature(kds.FeatureHashSuffix) {
201
			return r, nil
202
		}
203

204
		name := core_model.GetDisplayName(r)
205
		values := make([]string, 0, len(labelsToUse))
206
		for _, lbl := range labelsToUse {
207
			values = append(values, r.GetMeta().GetLabels()[lbl])
208
		}
209

210
		newObj := r.Descriptor().NewObject()
211
		newMeta := util.CloneResourceMeta(r.GetMeta(), util.WithName(hash.HashedName(r.GetMeta().GetMesh(), name, values...)))
212
		newObj.SetMeta(newMeta)
213
		_ = newObj.SetSpec(r.GetSpec())
214

215
		return newObj, nil
216
	}
217
}
218

219
func UpdateResourceMeta(fs ...util.CloneResourceMetaOpt) reconcile.ResourceMapper {
220
	return func(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
221
		r.SetMeta(util.CloneResourceMeta(r.GetMeta(), fs...))
222
		return r, nil
223
	}
224
}
225

226
func GlobalProvidedFilter(rm manager.ResourceManager, configs map[string]bool) reconcile.ResourceFilter {
227
	return func(ctx context.Context, clusterID string, features kds.Features, r core_model.Resource) bool {
228
		resName := r.GetMeta().GetName()
229

230
		switch {
231
		case r.Descriptor().Name == system.ConfigType:
232
			return configs[resName]
233
		case r.Descriptor().Name == system.GlobalSecretType:
234
			return util.ResourceNameHasAtLeastOneOfPrefixes(resName, []string{
235
				zoneingress.ZoneIngressSigningKeyPrefix,
236
				zone_tokens.SigningKeyPrefix,
237
			}...)
238
		case r.Descriptor().KDSFlags.Has(core_model.GlobalToAllButOriginalZoneFlag):
239
			zoneTag := util.ZoneTag(r)
240

241
			if clusterID == zoneTag {
242
				// don't need to sync resource to the zone where resource is originated from
243
				return false
244
			}
245

246
			zone := system.NewZoneResource()
247
			if err := rm.Get(ctx, zone, store.GetByKey(zoneTag, core_model.NoMesh)); err != nil {
248
				log.Error(err, "failed to get zone", "zone", zoneTag)
249
				// since there is no explicit 'enabled: false' then we don't
250
				// make any strong decisions which might affect connectivity
251
				return true
252
			}
253

254
			return zone.Spec.IsEnabled()
255
		default:
256
			return core_model.IsLocallyOriginated(config_core.Global, r)
257
		}
258
	}
259
}
260

261
func ZoneProvidedFilter(_ context.Context, localZone string, _ kds.Features, r core_model.Resource) bool {
262
	if zi, ok := r.(*core_mesh.ZoneIngressResource); ok {
263
		// Old zones don't have a 'kuma.io/zone' label on ZoneIngress, when upgrading to the new 2.6 version
264
		// we don't want Zone CP to sync ZoneIngresses without 'kuma.io/zone' label to Global pretending
265
		// they're originating here. That's why upgrade from 2.5 to 2.6 (and 2.7) requires casting resource
266
		// to *core_mesh.ZoneIngressResource and checking its 'spec.zone' field.
267
		// todo: remove in 2 releases after 2.6.x
268
		return !zi.IsRemoteIngress(localZone)
269
	}
270
	return core_model.IsLocallyOriginated(config_core.Zone, r) || r.Descriptor().KDSFlags == core_model.ZoneToGlobalFlag
271
}
272

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.