8
. "github.com/onsi/ginkgo/v2"
9
. "github.com/onsi/gomega"
11
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
12
system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
13
"github.com/kumahq/kuma/pkg/core"
14
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
15
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
16
"github.com/kumahq/kuma/pkg/core/resources/model"
17
"github.com/kumahq/kuma/pkg/core/resources/registry"
18
"github.com/kumahq/kuma/pkg/core/resources/store"
19
"github.com/kumahq/kuma/pkg/kds/global"
20
sync_store "github.com/kumahq/kuma/pkg/kds/store"
21
sync_store_v2 "github.com/kumahq/kuma/pkg/kds/v2/store"
22
core_metrics "github.com/kumahq/kuma/pkg/metrics"
23
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
24
"github.com/kumahq/kuma/pkg/test/grpc"
25
kds_setup "github.com/kumahq/kuma/pkg/test/kds/setup"
26
util_proto "github.com/kumahq/kuma/pkg/util/proto"
29
var _ = Describe("Global Sync", func() {
30
var zoneStores []store.ResourceStore
31
var globalStore store.ResourceStore
34
dataplaneFunc := func(zone, service string) *mesh_proto.Dataplane {
35
return &mesh_proto.Dataplane{
36
Networking: &mesh_proto.Dataplane_Networking{
37
Address: "192.168.0.1",
38
Inbound: []*mesh_proto.Dataplane_Networking_Inbound{{
40
Tags: map[string]string{
41
mesh_proto.ZoneTag: zone,
42
mesh_proto.ServiceTag: service,
45
Outbound: []*mesh_proto.Dataplane_Networking_Outbound{
47
Tags: map[string]string{
48
mesh_proto.ServiceTag: "web",
49
mesh_proto.ProtocolTag: "http",
57
VerifyResourcesWereSynchronizedToGlobal := func() {
58
for i := 0; i < 10; i++ {
59
dp := dataplaneFunc("kuma-cluster-1", fmt.Sprintf("service-1-%d", i))
60
err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-1-%d", i), "mesh-1"))
61
Expect(err).ToNot(HaveOccurred())
63
Eventually(func() int {
64
actual := mesh.DataplaneResourceList{}
65
err := globalStore.List(context.Background(), &actual)
66
Expect(err).ToNot(HaveOccurred())
67
return len(actual.Items)
68
}, "5s", "100ms").Should(Equal(10))
72
VerifyResourcesWereSynchronizedIndependentlyForEachZone := func() {
73
for i := 0; i < 10; i++ {
74
dp := dataplaneFunc("kuma-cluster-1", fmt.Sprintf("service-1-%d", i))
75
err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-1-%d", i), "mesh-1"))
76
Expect(err).ToNot(HaveOccurred())
79
for i := 0; i < 10; i++ {
80
dp := dataplaneFunc("kuma-cluster-2", fmt.Sprintf("service-2-%d", i))
81
err := zoneStores[1].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-2-%d", i), "mesh-1"))
82
Expect(err).ToNot(HaveOccurred())
85
Eventually(func() int {
86
actual := mesh.DataplaneResourceList{}
87
err := globalStore.List(context.Background(), &actual)
88
Expect(err).ToNot(HaveOccurred())
89
return len(actual.Items)
90
}, "3s", "100ms").Should(Equal(20))
92
err := zoneStores[0].Delete(context.Background(), mesh.NewDataplaneResource(), store.DeleteByKey("dp-1-0", "mesh-1"))
93
Expect(err).ToNot(HaveOccurred())
95
err = zoneStores[0].Delete(context.Background(), mesh.NewDataplaneResource(), store.DeleteByKey("dp-1-1", "mesh-1"))
96
Expect(err).ToNot(HaveOccurred())
98
Eventually(func() int {
99
actual := mesh.DataplaneResourceList{}
100
err := globalStore.List(context.Background(), &actual)
101
Expect(err).ToNot(HaveOccurred())
102
return len(actual.Items)
103
}, "5s", "100ms").Should(Equal(18))
108
VerifySupportForTheSameNameOfDataplanesInDifferentClusters := func() {
109
dp1 := dataplaneFunc("kuma-cluster-1", "backend")
110
err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp1}, store.CreateByKey("dp-1", "mesh-1"))
111
Expect(err).ToNot(HaveOccurred())
113
dp2 := dataplaneFunc("kuma-cluster-2", "web")
114
err = zoneStores[1].Create(context.Background(), &mesh.DataplaneResource{Spec: dp2}, store.CreateByKey("dp-1", "mesh-1"))
115
Expect(err).ToNot(HaveOccurred())
117
Eventually(func() int {
118
actual := mesh.DataplaneResourceList{}
119
err := globalStore.List(context.Background(), &actual)
120
Expect(err).ToNot(HaveOccurred())
121
return len(actual.Items)
122
}, "3s", "100ms").Should(Equal(2))
125
VerifyUpToDateListOfProvidedType := func() {
126
excludeTypes := map[model.ResourceType]bool{
127
mesh.DataplaneInsightType: true,
128
mesh.DataplaneType: true,
129
mesh.DataplaneOverviewType: true,
130
mesh.ServiceOverviewType: true,
133
// take all mesh-scoped types and exclude types that won't be synced
134
actualProvidedTypes := registry.Global().ObjectTypes(model.HasScope(model.ScopeMesh), model.TypeFilterFn(func(descriptor model.ResourceTypeDescriptor) bool {
135
return !excludeTypes[descriptor.Name]
138
// plus 4 global-scope types
139
extraTypes := []model.ResourceType{
141
mesh.ZoneIngressType,
143
system.GlobalSecretType,
146
actualProvidedTypes = append(actualProvidedTypes, extraTypes...)
147
Expect(actualProvidedTypes).To(ConsistOf(registry.Global().ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector))))
150
Context("KDS v1", func() {
151
var globalSyncer sync_store.ResourceSyncer
155
const zoneName = "zone-%d"
157
// Start `numOfZones` Kuma CP Zone
158
serverStreams := []*grpc.MockServerStream{}
159
wg := &sync.WaitGroup{}
160
zoneStores = []store.ResourceStore{}
161
for i := 0; i < numOfZones; i++ {
162
zoneStore := memory.NewStore()
163
srv, err := kds_setup.NewKdsServerBuilder(zoneStore).AsZone(fmt.Sprintf(zoneName, i)).Sotw()
164
Expect(err).ToNot(HaveOccurred())
165
serverStream := grpc.NewMockServerStream()
172
Expect(srv.StreamKumaResources(serverStream)).To(Succeed())
175
serverStreams = append(serverStreams, serverStream)
176
zoneStores = append(zoneStores, zoneStore)
179
// Start 1 Kuma CP Global
180
globalStore = memory.NewStore()
181
globalSyncer = sync_store.NewResourceSyncer(core.Log, globalStore)
182
stopCh := make(chan struct{})
183
clientStreams := []*grpc.MockClientStream{}
184
for _, ss := range serverStreams {
185
clientStreams = append(clientStreams, ss.ClientStream(stopCh))
187
kds_setup.StartClient(clientStreams, []model.ResourceType{mesh.DataplaneType}, stopCh, global.Callbacks(globalSyncer, false, nil))
189
// Create Zone resources for each Kuma CP Zone
190
for i := 0; i < numOfZones; i++ {
191
zone := &system.ZoneResource{Spec: &system_proto.Zone{Enabled: util_proto.Bool(true)}}
192
err := globalStore.Create(context.Background(), zone, store.CreateByKey(fmt.Sprintf(zoneName, i), model.NoMesh))
193
Expect(err).ToNot(HaveOccurred())
202
It("should add resource to global store after adding it to Zone", func() {
203
VerifyResourcesWereSynchronizedToGlobal()
206
It("should sync resources independently for each Zone", func() {
207
VerifyResourcesWereSynchronizedIndependentlyForEachZone()
210
It("should support same dataplane names through clusters", func() {
211
VerifySupportForTheSameNameOfDataplanesInDifferentClusters()
214
It("should have up to date list of provided types", func() {
215
VerifyUpToDateListOfProvidedType()
219
Context("Delta KDS", func() {
220
var globalSyncer sync_store_v2.ResourceSyncer
224
const zoneName = "zone-%d"
226
// Start `numOfZones` Kuma CP Zone
227
serverStreams := []*grpc.MockDeltaServerStream{}
228
wg := &sync.WaitGroup{}
229
zoneStores = []store.ResourceStore{}
230
for i := 0; i < numOfZones; i++ {
231
zoneStore := memory.NewStore()
232
srv, err := kds_setup.NewKdsServerBuilder(zoneStore).AsZone(fmt.Sprintf(zoneName, i)).Delta()
233
Expect(err).ToNot(HaveOccurred())
234
serverStream := grpc.NewMockDeltaServerStream()
241
Expect(srv.ZoneToGlobal(serverStream)).To(Succeed())
244
serverStreams = append(serverStreams, serverStream)
245
zoneStores = append(zoneStores, zoneStore)
248
// Start 1 Kuma CP Global
249
globalStore = memory.NewStore()
250
metrics, err := core_metrics.NewMetrics("")
251
Expect(err).ToNot(HaveOccurred())
252
globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, store.NoTransactions{}, metrics, context.Background())
253
Expect(err).ToNot(HaveOccurred())
254
stopCh := make(chan struct{})
255
clientStreams := []*grpc.MockDeltaClientStream{}
256
for _, ss := range serverStreams {
257
clientStreams = append(clientStreams, ss.ClientStream(stopCh))
259
kds_setup.StartDeltaClient(clientStreams, []model.ResourceType{mesh.DataplaneType}, stopCh, sync_store_v2.GlobalSyncCallback(context.Background(), globalSyncer, false, nil, "kuma-system"))
261
// Create Zone resources for each Kuma CP Zone
262
for i := 0; i < numOfZones; i++ {
263
zone := &system.ZoneResource{Spec: &system_proto.Zone{Enabled: util_proto.Bool(true)}}
264
err := globalStore.Create(context.Background(), zone, store.CreateByKey(fmt.Sprintf(zoneName, i), model.NoMesh))
265
Expect(err).ToNot(HaveOccurred())
274
It("should add resource to global store after adding it to Zone", func() {
275
VerifyResourcesWereSynchronizedToGlobal()
278
It("should sync resources independently for each Zone", func() {
279
VerifyResourcesWereSynchronizedIndependentlyForEachZone()
282
It("should support same dataplane names through clusters", func() {
283
VerifySupportForTheSameNameOfDataplanesInDifferentClusters()
286
It("should have up to date list of provided types", func() {
287
VerifyUpToDateListOfProvidedType()