9
"github.com/pkg/errors"
10
"google.golang.org/grpc"
11
"google.golang.org/protobuf/proto"
12
"google.golang.org/protobuf/reflect/protoreflect"
13
"google.golang.org/protobuf/types/known/wrapperspb"
15
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
16
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
17
kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
18
config_core "github.com/kumahq/kuma/pkg/config/core"
19
"github.com/kumahq/kuma/pkg/core"
20
config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
21
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
22
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
23
"github.com/kumahq/kuma/pkg/core/resources/manager"
24
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
25
"github.com/kumahq/kuma/pkg/core/resources/store"
26
"github.com/kumahq/kuma/pkg/kds"
27
"github.com/kumahq/kuma/pkg/kds/hash"
28
"github.com/kumahq/kuma/pkg/kds/mux"
29
"github.com/kumahq/kuma/pkg/kds/reconcile"
30
"github.com/kumahq/kuma/pkg/kds/service"
31
"github.com/kumahq/kuma/pkg/kds/util"
32
zone_tokens "github.com/kumahq/kuma/pkg/tokens/builtin/zone"
33
"github.com/kumahq/kuma/pkg/tokens/builtin/zoneingress"
34
"github.com/kumahq/kuma/pkg/util/rsa"
37
var log = core.Log.WithName("kds")
40
ZoneClientCtx context.Context
41
GlobalProvidedFilter reconcile.ResourceFilter
42
ZoneProvidedFilter reconcile.ResourceFilter
43
GlobalServerFilters []mux.Filter
44
GlobalServerFiltersV2 []mux.FilterV2
45
// Configs contains the names of system.ConfigResource that will be transferred from Global to Zone
46
Configs map[string]bool
48
GlobalResourceMapper reconcile.ResourceMapper
49
ZoneResourceMapper reconcile.ResourceMapper
51
EnvoyAdminRPCs service.EnvoyAdminRPCs
52
ServerStreamInterceptors []grpc.StreamServerInterceptor
53
ServerUnaryInterceptor []grpc.UnaryServerInterceptor
58
manager manager.ResourceManager,
61
configs := map[string]bool{
62
config_manager.ClusterIdConfigKey: true,
65
globalMappers := []reconcile.ResourceMapper{
66
UpdateResourceMeta(util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.GlobalResourceOrigin))),
69
reconcile.TypeIs(system.GlobalSecretType),
70
reconcile.NameHasPrefix(zone_tokens.SigningKeyPrefix),
72
MapZoneTokenSigningKeyGlobalToPublicKey),
74
reconcile.IsKubernetes(cfg.Store.Type),
75
RemoveK8sSystemNamespaceSuffixMapper(cfg.Store.Kubernetes.SystemNamespace)),
78
reconcile.ScopeIs(core_model.ScopeMesh),
79
// secrets already named with mesh prefix for uniqueness on k8s, also Zone CP expects secret names to be in
80
// particular format to be able to reference them
81
reconcile.Not(reconcile.TypeIs(system.SecretType)),
83
HashSuffixMapper(true)),
86
zoneMappers := []reconcile.ResourceMapper{
88
util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.ZoneResourceOrigin)),
89
util.WithLabel(mesh_proto.ZoneTag, cfg.Multizone.Zone.Name),
91
MapInsightResourcesZeroGeneration,
93
reconcile.IsKubernetes(cfg.Store.Type),
94
RemoveK8sSystemNamespaceSuffixMapper(cfg.Store.Kubernetes.SystemNamespace)),
95
HashSuffixMapper(false, mesh_proto.ZoneTag, mesh_proto.KubeNamespaceTag),
100
GlobalProvidedFilter: GlobalProvidedFilter(manager, configs),
101
ZoneProvidedFilter: ZoneProvidedFilter,
103
GlobalResourceMapper: CompositeResourceMapper(globalMappers...),
104
ZoneResourceMapper: CompositeResourceMapper(zoneMappers...),
105
EnvoyAdminRPCs: service.NewEnvoyAdminRPCs(),
109
// CompositeResourceMapper combines the given ResourceMappers into
110
// a single ResourceMapper which calls each in order. If an error
111
// occurs, the first one is returned and no further mappers are executed.
112
func CompositeResourceMapper(mappers ...reconcile.ResourceMapper) reconcile.ResourceMapper {
113
return func(features kds.Features, r core_model.Resource) (core_model.Resource, error) {
115
for _, mapper := range mappers {
120
r, err = mapper(features, r)
129
type specWithDiscoverySubscriptions interface {
130
GetSubscriptions() []*mesh_proto.DiscoverySubscription
131
ProtoReflect() protoreflect.Message
134
// MapInsightResourcesZeroGeneration zeros "generation" field in resources for which
135
// the field has only local relevance. This prevents reconciliation from unnecessarily
136
// deeming the object to have changed.
137
func MapInsightResourcesZeroGeneration(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
138
if spec, ok := r.GetSpec().(specWithDiscoverySubscriptions); ok {
139
spec = proto.Clone(spec).(specWithDiscoverySubscriptions)
140
for _, sub := range spec.GetSubscriptions() {
145
resType := reflect.TypeOf(r).Elem()
147
newR := reflect.New(resType).Interface().(core_model.Resource)
149
if err := newR.SetSpec(spec.(core_model.ResourceSpec)); err != nil {
150
panic(any(errors.Wrap(err, "error setting spec on resource")))
159
func MapZoneTokenSigningKeyGlobalToPublicKey(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
160
signingKeyBytes := r.(*system.GlobalSecretResource).Spec.GetData().GetValue()
161
publicKeyBytes, err := rsa.FromPrivateKeyPEMBytesToPublicKeyPEMBytes(signingKeyBytes)
166
publicSigningKeyResource := system.NewGlobalSecretResource()
167
newResName := strings.ReplaceAll(
168
r.GetMeta().GetName(),
169
zone_tokens.SigningKeyPrefix,
170
zone_tokens.SigningPublicKeyPrefix,
172
publicSigningKeyResource.SetMeta(util.CloneResourceMeta(r.GetMeta(), util.WithName(newResName)))
174
if err := publicSigningKeyResource.SetSpec(&system_proto.Secret{
175
Data: &wrapperspb.BytesValue{Value: publicKeyBytes},
180
return publicSigningKeyResource, nil
183
// RemoveK8sSystemNamespaceSuffixMapper is a mapper responsible for removing control plane system namespace suffixes
184
// from names of resources if resources are stored in kubernetes.
185
func RemoveK8sSystemNamespaceSuffixMapper(k8sSystemNamespace string) reconcile.ResourceMapper {
186
return func(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
187
newObj := r.Descriptor().NewObject()
188
dotSuffix := fmt.Sprintf(".%s", k8sSystemNamespace)
189
newName := strings.TrimSuffix(r.GetMeta().GetName(), dotSuffix)
190
newMeta := util.CloneResourceMeta(r.GetMeta(), util.WithName(newName))
191
newObj.SetMeta(newMeta)
192
_ = newObj.SetSpec(r.GetSpec())
197
// HashSuffixMapper returns mapper that adds a hash suffix to the name during KDS sync
198
func HashSuffixMapper(checkKDSFeature bool, labelsToUse ...string) reconcile.ResourceMapper {
199
return func(features kds.Features, r core_model.Resource) (core_model.Resource, error) {
200
if checkKDSFeature && !features.HasFeature(kds.FeatureHashSuffix) {
204
name := core_model.GetDisplayName(r)
205
values := make([]string, 0, len(labelsToUse))
206
for _, lbl := range labelsToUse {
207
values = append(values, r.GetMeta().GetLabels()[lbl])
210
newObj := r.Descriptor().NewObject()
211
newMeta := util.CloneResourceMeta(r.GetMeta(), util.WithName(hash.HashedName(r.GetMeta().GetMesh(), name, values...)))
212
newObj.SetMeta(newMeta)
213
_ = newObj.SetSpec(r.GetSpec())
219
func UpdateResourceMeta(fs ...util.CloneResourceMetaOpt) reconcile.ResourceMapper {
220
return func(_ kds.Features, r core_model.Resource) (core_model.Resource, error) {
221
r.SetMeta(util.CloneResourceMeta(r.GetMeta(), fs...))
226
func GlobalProvidedFilter(rm manager.ResourceManager, configs map[string]bool) reconcile.ResourceFilter {
227
return func(ctx context.Context, clusterID string, features kds.Features, r core_model.Resource) bool {
228
resName := r.GetMeta().GetName()
231
case r.Descriptor().Name == system.ConfigType:
232
return configs[resName]
233
case r.Descriptor().Name == system.GlobalSecretType:
234
return util.ResourceNameHasAtLeastOneOfPrefixes(resName, []string{
235
zoneingress.ZoneIngressSigningKeyPrefix,
236
zone_tokens.SigningKeyPrefix,
238
case r.Descriptor().KDSFlags.Has(core_model.GlobalToAllButOriginalZoneFlag):
239
zoneTag := util.ZoneTag(r)
241
if clusterID == zoneTag {
242
// don't need to sync resource to the zone where resource is originated from
246
zone := system.NewZoneResource()
247
if err := rm.Get(ctx, zone, store.GetByKey(zoneTag, core_model.NoMesh)); err != nil {
248
log.Error(err, "failed to get zone", "zone", zoneTag)
249
// since there is no explicit 'enabled: false' then we don't
250
// make any strong decisions which might affect connectivity
254
return zone.Spec.IsEnabled()
256
return core_model.IsLocallyOriginated(config_core.Global, r)
261
func ZoneProvidedFilter(_ context.Context, localZone string, _ kds.Features, r core_model.Resource) bool {
262
if zi, ok := r.(*core_mesh.ZoneIngressResource); ok {
263
// Old zones don't have a 'kuma.io/zone' label on ZoneIngress, when upgrading to the new 2.6 version
264
// we don't want Zone CP to sync ZoneIngresses without 'kuma.io/zone' label to Global pretending
265
// they're originating here. That's why upgrade from 2.5 to 2.6 (and 2.7) requires casting resource
266
// to *core_mesh.ZoneIngressResource and checking its 'spec.zone' field.
267
// todo: remove in 2 releases after 2.6.x
268
return !zi.IsRemoteIngress(localZone)
270
return core_model.IsLocallyOriginated(config_core.Zone, r) || r.Descriptor().KDSFlags == core_model.ZoneToGlobalFlag