kuma

Форк
0
/
components_test.go 
290 строк · 9.7 Кб
1
package global_test
2

3
import (
4
	"context"
5
	"fmt"
6
	"sync"
7

8
	. "github.com/onsi/ginkgo/v2"
9
	. "github.com/onsi/gomega"
10

11
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
12
	system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
13
	"github.com/kumahq/kuma/pkg/core"
14
	"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
15
	"github.com/kumahq/kuma/pkg/core/resources/apis/system"
16
	"github.com/kumahq/kuma/pkg/core/resources/model"
17
	"github.com/kumahq/kuma/pkg/core/resources/registry"
18
	"github.com/kumahq/kuma/pkg/core/resources/store"
19
	"github.com/kumahq/kuma/pkg/kds/global"
20
	sync_store "github.com/kumahq/kuma/pkg/kds/store"
21
	sync_store_v2 "github.com/kumahq/kuma/pkg/kds/v2/store"
22
	core_metrics "github.com/kumahq/kuma/pkg/metrics"
23
	"github.com/kumahq/kuma/pkg/plugins/resources/memory"
24
	"github.com/kumahq/kuma/pkg/test/grpc"
25
	kds_setup "github.com/kumahq/kuma/pkg/test/kds/setup"
26
	util_proto "github.com/kumahq/kuma/pkg/util/proto"
27
)
28

29
var _ = Describe("Global Sync", func() {
30
	var zoneStores []store.ResourceStore
31
	var globalStore store.ResourceStore
32
	var closeFunc func()
33

34
	dataplaneFunc := func(zone, service string) *mesh_proto.Dataplane {
35
		return &mesh_proto.Dataplane{
36
			Networking: &mesh_proto.Dataplane_Networking{
37
				Address: "192.168.0.1",
38
				Inbound: []*mesh_proto.Dataplane_Networking_Inbound{{
39
					Port: 1212,
40
					Tags: map[string]string{
41
						mesh_proto.ZoneTag:    zone,
42
						mesh_proto.ServiceTag: service,
43
					},
44
				}},
45
				Outbound: []*mesh_proto.Dataplane_Networking_Outbound{
46
					{
47
						Tags: map[string]string{
48
							mesh_proto.ServiceTag:  "web",
49
							mesh_proto.ProtocolTag: "http",
50
						},
51
					},
52
				},
53
			},
54
		}
55
	}
56

57
	VerifyResourcesWereSynchronizedToGlobal := func() {
58
		for i := 0; i < 10; i++ {
59
			dp := dataplaneFunc("kuma-cluster-1", fmt.Sprintf("service-1-%d", i))
60
			err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-1-%d", i), "mesh-1"))
61
			Expect(err).ToNot(HaveOccurred())
62
		}
63
		Eventually(func() int {
64
			actual := mesh.DataplaneResourceList{}
65
			err := globalStore.List(context.Background(), &actual)
66
			Expect(err).ToNot(HaveOccurred())
67
			return len(actual.Items)
68
		}, "5s", "100ms").Should(Equal(10))
69
		closeFunc()
70
	}
71

72
	VerifyResourcesWereSynchronizedIndependentlyForEachZone := func() {
73
		for i := 0; i < 10; i++ {
74
			dp := dataplaneFunc("kuma-cluster-1", fmt.Sprintf("service-1-%d", i))
75
			err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-1-%d", i), "mesh-1"))
76
			Expect(err).ToNot(HaveOccurred())
77
		}
78

79
		for i := 0; i < 10; i++ {
80
			dp := dataplaneFunc("kuma-cluster-2", fmt.Sprintf("service-2-%d", i))
81
			err := zoneStores[1].Create(context.Background(), &mesh.DataplaneResource{Spec: dp}, store.CreateByKey(fmt.Sprintf("dp-2-%d", i), "mesh-1"))
82
			Expect(err).ToNot(HaveOccurred())
83
		}
84

85
		Eventually(func() int {
86
			actual := mesh.DataplaneResourceList{}
87
			err := globalStore.List(context.Background(), &actual)
88
			Expect(err).ToNot(HaveOccurred())
89
			return len(actual.Items)
90
		}, "3s", "100ms").Should(Equal(20))
91

92
		err := zoneStores[0].Delete(context.Background(), mesh.NewDataplaneResource(), store.DeleteByKey("dp-1-0", "mesh-1"))
93
		Expect(err).ToNot(HaveOccurred())
94

95
		err = zoneStores[0].Delete(context.Background(), mesh.NewDataplaneResource(), store.DeleteByKey("dp-1-1", "mesh-1"))
96
		Expect(err).ToNot(HaveOccurred())
97

98
		Eventually(func() int {
99
			actual := mesh.DataplaneResourceList{}
100
			err := globalStore.List(context.Background(), &actual)
101
			Expect(err).ToNot(HaveOccurred())
102
			return len(actual.Items)
103
		}, "5s", "100ms").Should(Equal(18))
104

105
		closeFunc()
106
	}
107

108
	VerifySupportForTheSameNameOfDataplanesInDifferentClusters := func() {
109
		dp1 := dataplaneFunc("kuma-cluster-1", "backend")
110
		err := zoneStores[0].Create(context.Background(), &mesh.DataplaneResource{Spec: dp1}, store.CreateByKey("dp-1", "mesh-1"))
111
		Expect(err).ToNot(HaveOccurred())
112

113
		dp2 := dataplaneFunc("kuma-cluster-2", "web")
114
		err = zoneStores[1].Create(context.Background(), &mesh.DataplaneResource{Spec: dp2}, store.CreateByKey("dp-1", "mesh-1"))
115
		Expect(err).ToNot(HaveOccurred())
116

117
		Eventually(func() int {
118
			actual := mesh.DataplaneResourceList{}
119
			err := globalStore.List(context.Background(), &actual)
120
			Expect(err).ToNot(HaveOccurred())
121
			return len(actual.Items)
122
		}, "3s", "100ms").Should(Equal(2))
123
	}
124

125
	VerifyUpToDateListOfProvidedType := func() {
126
		excludeTypes := map[model.ResourceType]bool{
127
			mesh.DataplaneInsightType:  true,
128
			mesh.DataplaneType:         true,
129
			mesh.DataplaneOverviewType: true,
130
			mesh.ServiceOverviewType:   true,
131
		}
132

133
		// take all mesh-scoped types and exclude types that won't be synced
134
		actualProvidedTypes := registry.Global().ObjectTypes(model.HasScope(model.ScopeMesh), model.TypeFilterFn(func(descriptor model.ResourceTypeDescriptor) bool {
135
			return !excludeTypes[descriptor.Name]
136
		}))
137

138
		// plus 4 global-scope types
139
		extraTypes := []model.ResourceType{
140
			mesh.MeshType,
141
			mesh.ZoneIngressType,
142
			system.ConfigType,
143
			system.GlobalSecretType,
144
		}
145

146
		actualProvidedTypes = append(actualProvidedTypes, extraTypes...)
147
		Expect(actualProvidedTypes).To(ConsistOf(registry.Global().ObjectTypes(model.HasKDSFlag(model.GlobalToZoneSelector))))
148
	}
149

150
	Context("KDS v1", func() {
151
		var globalSyncer sync_store.ResourceSyncer
152

153
		BeforeEach(func() {
154
			const numOfZones = 2
155
			const zoneName = "zone-%d"
156

157
			// Start `numOfZones` Kuma CP Zone
158
			serverStreams := []*grpc.MockServerStream{}
159
			wg := &sync.WaitGroup{}
160
			zoneStores = []store.ResourceStore{}
161
			for i := 0; i < numOfZones; i++ {
162
				zoneStore := memory.NewStore()
163
				srv, err := kds_setup.NewKdsServerBuilder(zoneStore).AsZone(fmt.Sprintf(zoneName, i)).Sotw()
164
				Expect(err).ToNot(HaveOccurred())
165
				serverStream := grpc.NewMockServerStream()
166
				wg.Add(1)
167
				go func() {
168
					defer func() {
169
						wg.Done()
170
						GinkgoRecover()
171
					}()
172
					Expect(srv.StreamKumaResources(serverStream)).To(Succeed())
173
				}()
174

175
				serverStreams = append(serverStreams, serverStream)
176
				zoneStores = append(zoneStores, zoneStore)
177
			}
178

179
			// Start 1 Kuma CP Global
180
			globalStore = memory.NewStore()
181
			globalSyncer = sync_store.NewResourceSyncer(core.Log, globalStore)
182
			stopCh := make(chan struct{})
183
			clientStreams := []*grpc.MockClientStream{}
184
			for _, ss := range serverStreams {
185
				clientStreams = append(clientStreams, ss.ClientStream(stopCh))
186
			}
187
			kds_setup.StartClient(clientStreams, []model.ResourceType{mesh.DataplaneType}, stopCh, global.Callbacks(globalSyncer, false, nil))
188

189
			// Create Zone resources for each Kuma CP Zone
190
			for i := 0; i < numOfZones; i++ {
191
				zone := &system.ZoneResource{Spec: &system_proto.Zone{Enabled: util_proto.Bool(true)}}
192
				err := globalStore.Create(context.Background(), zone, store.CreateByKey(fmt.Sprintf(zoneName, i), model.NoMesh))
193
				Expect(err).ToNot(HaveOccurred())
194
			}
195

196
			closeFunc = func() {
197
				close(stopCh)
198
				wg.Wait()
199
			}
200
		})
201

202
		It("should add resource to global store after adding it to Zone", func() {
203
			VerifyResourcesWereSynchronizedToGlobal()
204
		})
205

206
		It("should sync resources independently for each Zone", func() {
207
			VerifyResourcesWereSynchronizedIndependentlyForEachZone()
208
		})
209

210
		It("should support same dataplane names through clusters", func() {
211
			VerifySupportForTheSameNameOfDataplanesInDifferentClusters()
212
		})
213

214
		It("should have up to date list of provided types", func() {
215
			VerifyUpToDateListOfProvidedType()
216
		})
217
	})
218

219
	Context("Delta KDS", func() {
220
		var globalSyncer sync_store_v2.ResourceSyncer
221

222
		BeforeEach(func() {
223
			const numOfZones = 2
224
			const zoneName = "zone-%d"
225

226
			// Start `numOfZones` Kuma CP Zone
227
			serverStreams := []*grpc.MockDeltaServerStream{}
228
			wg := &sync.WaitGroup{}
229
			zoneStores = []store.ResourceStore{}
230
			for i := 0; i < numOfZones; i++ {
231
				zoneStore := memory.NewStore()
232
				srv, err := kds_setup.NewKdsServerBuilder(zoneStore).AsZone(fmt.Sprintf(zoneName, i)).Delta()
233
				Expect(err).ToNot(HaveOccurred())
234
				serverStream := grpc.NewMockDeltaServerStream()
235
				wg.Add(1)
236
				go func() {
237
					defer func() {
238
						wg.Done()
239
						GinkgoRecover()
240
					}()
241
					Expect(srv.ZoneToGlobal(serverStream)).To(Succeed())
242
				}()
243

244
				serverStreams = append(serverStreams, serverStream)
245
				zoneStores = append(zoneStores, zoneStore)
246
			}
247

248
			// Start 1 Kuma CP Global
249
			globalStore = memory.NewStore()
250
			metrics, err := core_metrics.NewMetrics("")
251
			Expect(err).ToNot(HaveOccurred())
252
			globalSyncer, err = sync_store_v2.NewResourceSyncer(core.Log, globalStore, store.NoTransactions{}, metrics, context.Background())
253
			Expect(err).ToNot(HaveOccurred())
254
			stopCh := make(chan struct{})
255
			clientStreams := []*grpc.MockDeltaClientStream{}
256
			for _, ss := range serverStreams {
257
				clientStreams = append(clientStreams, ss.ClientStream(stopCh))
258
			}
259
			kds_setup.StartDeltaClient(clientStreams, []model.ResourceType{mesh.DataplaneType}, stopCh, sync_store_v2.GlobalSyncCallback(context.Background(), globalSyncer, false, nil, "kuma-system"))
260

261
			// Create Zone resources for each Kuma CP Zone
262
			for i := 0; i < numOfZones; i++ {
263
				zone := &system.ZoneResource{Spec: &system_proto.Zone{Enabled: util_proto.Bool(true)}}
264
				err := globalStore.Create(context.Background(), zone, store.CreateByKey(fmt.Sprintf(zoneName, i), model.NoMesh))
265
				Expect(err).ToNot(HaveOccurred())
266
			}
267

268
			closeFunc = func() {
269
				close(stopCh)
270
				wg.Wait()
271
			}
272
		})
273

274
		It("should add resource to global store after adding it to Zone", func() {
275
			VerifyResourcesWereSynchronizedToGlobal()
276
		})
277

278
		It("should sync resources independently for each Zone", func() {
279
			VerifyResourcesWereSynchronizedIndependentlyForEachZone()
280
		})
281

282
		It("should support same dataplane names through clusters", func() {
283
			VerifySupportForTheSameNameOfDataplanesInDifferentClusters()
284
		})
285

286
		It("should have up to date list of provided types", func() {
287
			VerifyUpToDateListOfProvidedType()
288
		})
289
	})
290
})
291

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.