9
"github.com/go-logr/logr"
10
"github.com/pkg/errors"
12
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
13
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
14
config_core "github.com/kumahq/kuma/pkg/config/core"
15
store_config "github.com/kumahq/kuma/pkg/config/core/resources/store"
16
"github.com/kumahq/kuma/pkg/core"
17
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
18
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
19
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
20
"github.com/kumahq/kuma/pkg/core/resources/model"
21
"github.com/kumahq/kuma/pkg/core/resources/registry"
22
"github.com/kumahq/kuma/pkg/core/resources/store"
23
"github.com/kumahq/kuma/pkg/core/runtime"
24
"github.com/kumahq/kuma/pkg/core/runtime/component"
25
"github.com/kumahq/kuma/pkg/core/user"
26
"github.com/kumahq/kuma/pkg/kds/client"
27
"github.com/kumahq/kuma/pkg/kds/mux"
28
kds_server "github.com/kumahq/kuma/pkg/kds/server"
29
"github.com/kumahq/kuma/pkg/kds/service"
30
sync_store "github.com/kumahq/kuma/pkg/kds/store"
31
"github.com/kumahq/kuma/pkg/kds/util"
32
kds_client_v2 "github.com/kumahq/kuma/pkg/kds/v2/client"
33
kds_server_v2 "github.com/kumahq/kuma/pkg/kds/v2/server"
34
kds_sync_store_v2 "github.com/kumahq/kuma/pkg/kds/v2/store"
35
kuma_log "github.com/kumahq/kuma/pkg/log"
36
resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s"
37
k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"
38
util_proto "github.com/kumahq/kuma/pkg/util/proto"
42
kdsGlobalLog = core.Log.WithName("kds-global")
43
kdsDeltaGlobalLog = core.Log.WithName("kds-delta-global")
46
func Setup(rt runtime.Runtime) error {
47
if rt.Config().Mode != config_core.Global {
51
reg := registry.Global()
52
kdsServer, err := kds_server.New(
55
reg.ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector)),
57
rt.Config().Multizone.Global.KDS.RefreshInterval.Duration,
58
rt.KDSContext().GlobalProvidedFilter,
59
rt.KDSContext().GlobalResourceMapper,
60
rt.Config().Multizone.Global.KDS.NackBackoff.Duration,
66
kdsServerV2, err := kds_server_v2.New(
69
reg.ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector)),
71
rt.Config().Multizone.Global.KDS.RefreshInterval.Duration,
72
rt.KDSContext().GlobalProvidedFilter,
73
rt.KDSContext().GlobalResourceMapper,
74
rt.Config().Multizone.Global.KDS.NackBackoff.Duration,
80
resourceSyncer := sync_store.NewResourceSyncer(kdsGlobalLog, rt.ResourceStore())
81
resourceSyncerV2, err := kds_sync_store_v2.NewResourceSyncer(kdsDeltaGlobalLog, rt.ResourceStore(), rt.Transactions(), rt.Metrics(), rt.Extensions())
85
kubeFactory := resources_k8s.NewSimpleKubeFactory()
86
onSessionStarted := mux.OnSessionStartedFunc(func(session mux.Session) error {
87
log := kdsGlobalLog.WithValues("peer-id", session.PeerID())
88
log = kuma_log.AddFieldsFromCtx(log, session.ClientStream().Context(), rt.Extensions())
89
log.Info("new session created")
91
if err := kdsServer.StreamKumaResources(session.ServerStream()); err != nil {
92
log.Error(err, "StreamKumaResources finished with an error")
95
log.V(1).Info("StreamKumaResources finished gracefully")
98
kdsStream := client.NewKDSStream(session.ClientStream(), session.PeerID(), "") // we only care about Zone CP config. Zone CP should not receive Global CP config.
99
if err := createZoneIfAbsent(session.ClientStream().Context(), log, session.PeerID(), rt.ResourceManager()); err != nil {
100
log.Error(err, "Global CP could not create a zone")
101
return errors.New("Global CP could not create a zone") // send back message without details. Zone CP will retry
103
sink := client.NewKDSSink(log, reg.ObjectTypes(model.HasKDSFlag(model.ZoneToGlobalFlag)), kdsStream, Callbacks(resourceSyncer, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory))
105
if err := sink.Receive(); err != nil {
106
log.Error(err, "KDSSink finished with an error")
107
session.SetError(err)
109
log.V(1).Info("KDSSink finished gracefully")
115
onGlobalToZoneSyncConnect := mux.OnGlobalToZoneSyncConnectFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer, errChan chan error) {
116
zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
120
log := kdsDeltaGlobalLog.WithValues("peer-id", zoneID)
121
log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
122
log.Info("Global To Zone new session created")
123
if err := createZoneIfAbsent(stream.Context(), log, zoneID, rt.ResourceManager()); err != nil {
124
errChan <- errors.Wrap(err, "Global CP could not create a zone")
126
if err := kdsServerV2.GlobalToZoneSync(stream); err != nil {
129
log.V(1).Info("GlobalToZoneSync finished gracefully")
133
onZoneToGlobalSyncConnect := mux.OnZoneToGlobalSyncConnectFunc(func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer, errChan chan error) {
134
zoneID, err := util.ClientIDFromIncomingCtx(stream.Context())
138
log := kdsDeltaGlobalLog.WithValues("peer-id", zoneID)
139
log = kuma_log.AddFieldsFromCtx(log, stream.Context(), rt.Extensions())
140
kdsStream := kds_client_v2.NewDeltaKDSStream(stream, zoneID, rt, "")
141
sink := kds_client_v2.NewKDSSyncClient(
143
reg.ObjectTypes(model.HasKDSFlag(model.ZoneToGlobalFlag)),
145
kds_sync_store_v2.GlobalSyncCallback(stream.Context(), resourceSyncerV2, rt.Config().Store.Type == store_config.KubernetesStore, kubeFactory, rt.Config().Store.Kubernetes.SystemNamespace),
146
rt.Config().Multizone.Global.KDS.ResponseBackoff.Duration,
149
if err := sink.Receive(); err != nil {
150
errChan <- errors.Wrap(err, "KDSSyncClient finished with an error")
152
log.V(1).Info("KDSSyncClient finished gracefully")
156
var streamInterceptors []service.StreamInterceptor
157
for _, filter := range rt.KDSContext().GlobalServerFiltersV2 {
158
streamInterceptors = append(streamInterceptors, filter)
161
if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
162
zwLog := kdsGlobalLog.WithName("zone-watch")
163
zw, err := mux.NewZoneWatch(
165
rt.Config().Multizone.Global.KDS.ZoneHealthCheck,
168
rt.ReadOnlyResourceManager(),
172
return errors.Wrap(err, "couldn't create ZoneWatch")
174
if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
178
return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer(
180
rt.KDSContext().GlobalServerFilters,
181
rt.KDSContext().ServerStreamInterceptors,
182
rt.KDSContext().ServerUnaryInterceptor,
183
*rt.Config().Multizone.Global.KDS,
185
service.NewGlobalKDSServiceServer(
187
rt.KDSContext().EnvoyAdminRPCs,
188
rt.ResourceManager(),
192
rt.Config().Store.Upsert,
194
rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration,
196
mux.NewKDSSyncServiceServer(
198
onGlobalToZoneSyncConnect,
199
onZoneToGlobalSyncConnect,
200
rt.KDSContext().GlobalServerFiltersV2,
207
func createZoneIfAbsent(ctx context.Context, log logr.Logger, name string, resManager core_manager.ResourceManager) error {
208
ctx = user.Ctx(ctx, user.ControlPlane)
209
if err := resManager.Get(ctx, system.NewZoneResource(), store.GetByKey(name, model.NoMesh)); err != nil {
210
if !store.IsResourceNotFound(err) {
213
log.Info("creating Zone", "name", name)
214
zone := &system.ZoneResource{
215
Spec: &system_proto.Zone{
216
Enabled: util_proto.Bool(true),
219
if err := resManager.Create(ctx, zone, store.CreateByKey(name, model.NoMesh)); err != nil {
226
func Callbacks(s sync_store.ResourceSyncer, k8sStore bool, kubeFactory resources_k8s.KubeFactory) *client.Callbacks {
227
return &client.Callbacks{
228
OnResourcesReceived: func(clusterName string, rs model.ResourceList) error {
229
supportsHashSuffixes := !isOldZone(rs)
231
if !supportsHashSuffixes {
232
// todo: remove in 2 releases after 2.6.x
233
util.AddPrefixToNames(rs.GetItems(), clusterName)
236
for _, r := range rs.GetItems() {
237
r.SetMeta(util.CloneResourceMeta(r.GetMeta(),
238
util.WithLabel(mesh_proto.ZoneTag, clusterName),
239
util.WithLabel(mesh_proto.ResourceOriginLabel, string(mesh_proto.ZoneResourceOrigin)),
244
// if type of Store is Kubernetes then we want to store upstream resources in dedicated Namespace.
245
// KubernetesStore parses Name and considers substring after the last dot as a Namespace's Name.
246
kubeObject, err := kubeFactory.NewObject(rs.NewItem())
248
return errors.Wrap(err, "could not convert object")
250
if kubeObject.Scope() == k8s_model.ScopeNamespace {
251
util.AddSuffixToNames(rs.GetItems(), "default")
255
if rs.GetItemType() == core_mesh.ZoneIngressType {
256
for _, zi := range rs.(*core_mesh.ZoneIngressResourceList).Items {
257
zi.Spec.Zone = clusterName
259
} else if rs.GetItemType() == core_mesh.ZoneEgressType {
260
for _, ze := range rs.(*core_mesh.ZoneEgressResourceList).Items {
261
ze.Spec.Zone = clusterName
265
return s.Sync(rs, sync_store.PrefilterBy(func(r model.Resource) bool {
266
if !supportsHashSuffixes {
267
// todo: remove in 2 releases after 2.6.x
268
return strings.HasPrefix(r.GetMeta().GetName(), fmt.Sprintf("%s.", clusterName))
270
return r.GetMeta().GetLabels()[mesh_proto.ZoneTag] == clusterName
271
}), sync_store.Zone(clusterName))
276
// isOldZone checks if zone is running not the latest version of Kuma CP and doesn't support hash suffixes
277
func isOldZone(rs model.ResourceList) bool {
278
if len(rs.GetItems()) == 0 {
279
// if there are no resources it doesn't matter if it's old or new Zone
283
r := rs.GetItems()[0]
284
_, exist := r.GetMeta().GetLabels()[mesh_proto.ZoneTag]