2
// __ _____ __ ___ ___ __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
// \ V V / __/ (_| |\ V /| | (_| | || __/
5
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
7
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
9
// CONTACT: hello@weaviate.io
26
"github.com/hashicorp/raft"
27
"github.com/sirupsen/logrus"
28
"github.com/stretchr/testify/assert"
29
"github.com/stretchr/testify/mock"
30
"github.com/weaviate/weaviate/cluster/proto/api"
31
cmd "github.com/weaviate/weaviate/cluster/proto/api"
32
command "github.com/weaviate/weaviate/cluster/proto/api"
33
"github.com/weaviate/weaviate/cluster/utils"
34
"github.com/weaviate/weaviate/entities/models"
35
"github.com/weaviate/weaviate/usecases/sharding"
36
gproto "google.golang.org/protobuf/proto"
37
"google.golang.org/protobuf/reflect/protoreflect"
41
errAny = errors.New("any error")
42
Anything = mock.Anything
45
func TestServiceEndpoints(t *testing.T) {
46
ctx := context.Background()
47
m := NewMockStore(t, "Node-1", utils.MustGetFreeTCPPort())
48
addr := fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
49
m.indexer.On("Open", Anything).Return(nil)
50
m.indexer.On("Close", Anything).Return(nil)
51
m.indexer.On("AddClass", Anything).Return(nil)
52
m.indexer.On("RestoreClassDir", Anything).Return(nil)
53
m.indexer.On("UpdateClass", Anything).Return(nil)
54
m.indexer.On("DeleteClass", Anything).Return(nil)
55
m.indexer.On("AddProperty", Anything, Anything).Return(nil)
56
m.indexer.On("UpdateShardStatus", Anything).Return(nil)
57
m.indexer.On("AddTenants", Anything, Anything).Return(nil)
58
m.indexer.On("UpdateTenants", Anything, Anything).Return(nil)
59
m.indexer.On("DeleteTenants", Anything, Anything).Return(nil)
60
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
62
m.parser.On("ParseClass", mock.Anything).Return(nil)
63
m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
65
srv := NewService(m.store, nil)
68
_, err := srv.Execute(&command.ApplyRequest{})
69
assert.ErrorIs(t, err, ErrLeaderNotFound)
70
assert.ErrorIs(t, srv.Join(ctx, m.store.nodeID, addr, true), ErrLeaderNotFound)
71
assert.ErrorIs(t, srv.Remove(ctx, m.store.nodeID), ErrLeaderNotFound)
73
// Deadline exceeded while waiting for DB to be restored
75
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*30)
77
assert.ErrorIs(t, srv.WaitUntilDBRestored(ctx, 5*time.Millisecond, make(chan struct{})), context.DeadlineExceeded)
82
assert.Nil(t, srv.Open(ctx, m.indexer))
84
// node lose leadership after service call
85
assert.ErrorIs(t, srv.store.Join(m.store.nodeID, addr, true), ErrNotLeader)
86
assert.ErrorIs(t, srv.store.Remove(m.store.nodeID), ErrNotLeader)
89
assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
91
assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*1, make(chan struct{})))
92
assert.True(t, tryNTimesWithWait(10, time.Millisecond*200, srv.Ready))
93
tryNTimesWithWait(20, time.Millisecond*100, srv.store.IsLeader)
94
assert.True(t, srv.store.IsLeader())
95
schema := srv.SchemaReader()
96
assert.Equal(t, schema.Len(), 0)
99
_, err = srv.AddClass(nil, nil)
100
assert.ErrorIs(t, err, errBadRequest)
101
assert.Equal(t, schema.ClassEqual("C"), "")
103
cls := &models.Class{
105
MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true},
107
ss := &sharding.State{PartitioningEnabled: true, Physical: map[string]sharding.Physical{"T0": {Name: "T0"}}}
108
version0, err := srv.AddClass(cls, ss)
110
assert.Equal(t, schema.ClassEqual("C"), "C")
112
// Add same class again
113
_, err = srv.AddClass(cls, ss)
115
assert.Equal(t, "class name C already exists", err.Error())
118
_, err = srv.AddClass(&models.Class{Class: "c"}, ss)
119
assert.ErrorIs(t, err, errClassExists)
121
// QueryReadOnlyClass
122
readOnlyVClass, err := srv.QueryReadOnlyClasses(cls.Class)
123
assert.NoError(t, err)
124
assert.NotNil(t, readOnlyVClass[cls.Class].Class)
125
assert.Equal(t, cls, readOnlyVClass[cls.Class].Class)
128
getSchema, err := srv.QuerySchema()
129
assert.NoError(t, err)
130
assert.NotNil(t, getSchema)
131
assert.Equal(t, models.Schema{Classes: []*models.Class{readOnlyVClass[cls.Class].Class}}, getSchema)
134
getTenantsAll, _, err := srv.QueryTenants(cls.Class, []string{})
135
assert.NoError(t, err)
136
assert.NotNil(t, getTenantsAll)
137
assert.Equal(t, []*models.Tenant{{
139
ActivityStatus: models.TenantActivityStatusHOT,
143
getTenantsOne, _, err := srv.QueryTenants(cls.Class, []string{"T0"})
144
assert.NoError(t, err)
145
assert.NotNil(t, getTenantsOne)
146
assert.Equal(t, []*models.Tenant{{
148
ActivityStatus: models.TenantActivityStatusHOT,
152
getTenantsNone, _, err := srv.QueryTenants(cls.Class, []string{"T"})
153
assert.NoError(t, err)
154
assert.NotNil(t, getTenantsNone)
155
assert.Equal(t, []*models.Tenant{}, getTenantsNone)
158
getTenantShards, _, err := srv.QueryTenantsShards(cls.Class, "T0")
159
for tenant, status := range getTenantShards {
161
assert.Equal(t, "T0", tenant)
162
assert.Equal(t, models.TenantActivityStatusHOT, status)
165
// QueryShardOwner - Err
166
_, _, err = srv.QueryShardOwner(cls.Class, "T0")
167
assert.NotNil(t, err)
170
mc := srv.SchemaReader().metaClass(cls.Class)
171
mc.Sharding = sharding.State{Physical: map[string]sharding.Physical{"T0": {BelongsToNodes: []string{"N0"}}}}
172
getShardOwner, _, err := srv.QueryShardOwner(cls.Class, "T0")
174
assert.Equal(t, "N0", getShardOwner)
176
// QueryShardingState
177
mc.Sharding = sharding.State{Physical: map[string]sharding.Physical{"T0": {BelongsToNodes: []string{"N0"}}}}
178
getShardingState, _, err := srv.QueryShardingState(cls.Class)
180
assert.Equal(t, &mc.Sharding, getShardingState)
185
MultiTenancy: models.MultiTenancyConfig{Enabled: true},
186
ReplicationFactor: 1,
189
_, err = srv.UpdateClass(nil, nil)
190
assert.ErrorIs(t, err, errBadRequest)
191
cls.MultiTenancyConfig = &models.MultiTenancyConfig{Enabled: true}
192
cls.ReplicationConfig = &models.ReplicationConfig{Factor: 1}
193
ss.Physical = map[string]sharding.Physical{"T0": {Name: "T0"}}
194
version, err := srv.UpdateClass(cls, nil)
195
info.ClassVersion = version
196
info.ShardVersion = version0
198
assert.Nil(t, srv.store.WaitForAppliedIndex(ctx, time.Millisecond*10, version))
199
assert.Equal(t, info, schema.ClassInfo("C"))
200
assert.ErrorIs(t, srv.store.WaitForAppliedIndex(ctx, time.Millisecond*10, srv.store.lastAppliedIndex.Load()+1), ErrDeadlineExceeded)
203
_, err = srv.DeleteClass("X")
205
_, err = srv.DeleteClass("C")
207
assert.Equal(t, ClassInfo{}, schema.ClassInfo("C"))
210
_, err = srv.RestoreClass(nil, nil)
211
assert.ErrorIs(t, err, errBadRequest)
212
version, err = srv.RestoreClass(cls, ss)
214
info.ClassVersion = version
215
info.ShardVersion = version
216
assert.Equal(t, info, schema.ClassInfo("C"))
219
_, err = srv.AddProperty("C", nil)
220
assert.ErrorIs(t, err, errBadRequest)
221
_, err = srv.AddProperty("", &models.Property{Name: "P1"})
222
assert.ErrorIs(t, err, errBadRequest)
223
version, err = srv.AddProperty("C", &models.Property{Name: "P1"})
225
info.ClassVersion = version
227
assert.Equal(t, info, schema.ClassInfo("C"))
230
_, err = srv.UpdateShardStatus("", "A", "ACTIVE")
231
assert.ErrorIs(t, err, errBadRequest)
232
_, err = srv.UpdateShardStatus("C", "", "ACTIVE")
233
assert.ErrorIs(t, err, errBadRequest)
234
_, err = srv.UpdateShardStatus("C", "A", "ACTIVE")
238
_, err = srv.AddTenants("", &command.AddTenantsRequest{})
239
assert.ErrorIs(t, err, errBadRequest)
240
version, err = srv.AddTenants("C", &command.AddTenantsRequest{
241
ClusterNodes: []string{"Node-1"},
242
Tenants: []*command.Tenant{nil, {Name: "T2", Status: "S1"}, nil},
245
info.ShardVersion = version
247
assert.Equal(t, schema.ClassInfo("C"), info)
250
_, err = srv.UpdateTenants("", &command.UpdateTenantsRequest{})
251
assert.ErrorIs(t, err, errBadRequest)
252
_, err = srv.UpdateTenants("C", &command.UpdateTenantsRequest{Tenants: []*command.Tenant{{Name: "T2", Status: "S2"}}})
256
_, err = srv.DeleteTenants("", &command.DeleteTenantsRequest{})
257
assert.ErrorIs(t, err, errBadRequest)
258
version, err = srv.DeleteTenants("C", &command.DeleteTenantsRequest{Tenants: []string{"T0", "Tn"}})
261
info.ShardVersion = version
262
assert.Equal(t, info, schema.ClassInfo("C"))
263
assert.Equal(t, "S2", schema.CopyShardingState("C").Physical["T2"].Status)
266
assert.Nil(t, srv.Join(ctx, m.store.nodeID, addr, true))
267
assert.True(t, srv.store.IsLeader())
268
assert.Nil(t, srv.Join(ctx, m.store.nodeID, addr, false))
269
assert.True(t, srv.store.IsLeader())
270
assert.ErrorContains(t, srv.Remove(ctx, m.store.nodeID), "configuration")
271
assert.True(t, srv.store.IsLeader())
276
assert.Equal(t, "Leader", stats["raft"].(map[string]string)["state"])
277
// stats:leader_address
278
leaderAddress := string(stats["leader_address"].(raft.ServerAddress))
279
splitAddress := strings.Split(leaderAddress, ":")
280
assert.Len(t, splitAddress, 2)
281
ipAddress, portStr := splitAddress[0], splitAddress[1]
282
assert.Equal(t, "127.0.0.1", ipAddress)
283
port, err := strconv.Atoi(portStr)
285
t.Errorf("Port should have been parsable as an int but was: %v", portStr)
287
assert.GreaterOrEqual(t, port, 0)
289
leaderID := string(stats["leader_id"].(raft.ServerID))
290
assert.Equal(t, m.store.nodeID, leaderID)
293
assert.Nil(t, srv.store.raft.Barrier(2*time.Second).Error())
294
assert.Nil(t, srv.store.raft.Snapshot().Error())
296
// restore from snapshot
297
assert.Nil(t, srv.Close(ctx))
298
srv.store.db.Schema.clear()
302
srv = NewService(m.store, nil)
303
assert.Nil(t, srv.Open(ctx, m.indexer))
304
assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
305
assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*1, make(chan struct{})))
306
assert.True(t, tryNTimesWithWait(10, time.Millisecond*200, srv.Ready))
307
tryNTimesWithWait(20, time.Millisecond*100, srv.store.IsLeader)
308
clInfo := srv.store.db.Schema.ClassInfo("C")
309
assert.Equal(t, info, clInfo)
312
// Runs the provided function `predicate` up to `n` times, sleeping `sleepDuration` between each
313
// function call until `f` returns true or returns false if all `n` calls return false.
314
// Useful in tests which require an unknown but bounded delay where the component under test has
315
// a way to indicate when it's ready to proceed.
316
func tryNTimesWithWait(n int, sleepDuration time.Duration, predicate func() bool) bool {
317
for i := 0; i < n; i++ {
321
time.Sleep(sleepDuration)
326
func TestServiceStoreInit(t *testing.T) {
328
ctx = context.Background()
329
m = NewMockStore(t, "Node-1", 9093)
331
addr = fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
335
assert.ErrorIs(t, store.Join(m.store.nodeID, addr, true), ErrNotOpen)
336
assert.ErrorIs(t, store.Remove(m.store.nodeID), ErrNotOpen)
337
assert.ErrorIs(t, store.Notify(m.store.nodeID, addr), ErrNotOpen)
340
store.open.Store(true)
341
assert.Nil(t, store.Open(ctx))
344
store.bootstrapExpect = 0
345
assert.Nil(t, store.Notify("A", "localhost:123"))
348
store.bootstrapExpect = 2
349
assert.Nil(t, store.Notify("A", "localhost:123"))
352
func TestServiceClose(t *testing.T) {
353
ctx := context.Background()
354
m := NewMockStore(t, "Node-1", utils.MustGetFreeTCPPort())
355
addr := fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
358
srv := NewService(m.store, nil)
359
m.indexer.On("Open", mock.Anything).Return(nil)
360
assert.Nil(t, srv.Open(ctx, m.indexer))
361
assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
362
close := make(chan struct{})
364
time.Sleep(time.Second)
368
assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*10, close))
370
assert.Less(t, after.Sub(now), 2*time.Second)
373
func TestServicePanics(t *testing.T) {
374
m := NewMockStore(t, "Node-1", 9091)
376
// Assert Correct Response Type
377
ret := m.store.Apply(&raft.Log{Type: raft.LogNoop})
378
resp, ok := ret.(Response)
380
assert.Equal(t, resp, Response{})
383
assert.Panics(t, func() { m.store.Apply(&raft.Log{}) })
385
// Not a Valid Payload
386
assert.Panics(t, func() { m.store.Apply(&raft.Log{Data: []byte("a")}) })
388
// Cannot Open File Store
389
m.indexer.On("Open", mock.Anything).Return(errAny)
390
assert.Panics(t, func() { m.store.loadDatabase(context.TODO()) })
393
func TestStoreApply(t *testing.T) {
394
doFirst := func(m *MockStore) {
395
m.indexer.On("Open", mock.Anything).Return(nil)
396
m.parser.On("ParseClass", mock.Anything).Return(nil)
397
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
400
cls := &models.Class{Class: "C1", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
401
ss := &sharding.State{Physical: map[string]sharding.Physical{"T1": {
403
BelongsToNodes: []string{"THIS"},
406
BelongsToNodes: []string{"THIS"},
413
doBefore func(*MockStore)
414
doAfter func(*MockStore) error
417
name: "AddClass/Unmarshal",
418
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_CLASS,
419
nil, &cmd.AddTenantsRequest{})},
420
resp: Response{Error: errBadRequest},
424
name: "AddClass/StateIsNil",
425
req: raft.Log{Data: cmdAsBytes("C2",
426
cmd.ApplyRequest_TYPE_ADD_CLASS,
427
cmd.AddClassRequest{Class: cls, State: nil},
429
resp: Response{Error: errBadRequest},
430
doBefore: func(m *MockStore) {
431
m.indexer.On("Open", mock.Anything).Return(nil)
435
name: "AddClass/ParseClass",
436
req: raft.Log{Data: cmdAsBytes("C2",
437
cmd.ApplyRequest_TYPE_ADD_CLASS,
438
cmd.AddClassRequest{Class: cls, State: ss},
440
resp: Response{Error: errBadRequest},
441
doBefore: func(m *MockStore) {
442
m.indexer.On("Open", mock.Anything).Return(nil)
443
m.parser.On("ParseClass", mock.Anything).Return(errAny)
447
name: "AddClass/Success",
448
req: raft.Log{Data: cmdAsBytes("C1",
449
cmd.ApplyRequest_TYPE_ADD_CLASS,
450
cmd.AddClassRequest{Class: cls, State: ss},
452
resp: Response{Error: nil},
454
doAfter: func(ms *MockStore) error {
455
_, ok := ms.store.db.Schema.Classes["C1"]
457
return fmt.Errorf("class is missing")
463
name: "AddClass/DBError",
466
Data: cmdAsBytes("C1",
467
cmd.ApplyRequest_TYPE_ADD_CLASS,
468
cmd.AddClassRequest{Class: cls, State: ss},
471
resp: Response{Error: errAny},
472
doBefore: func(ms *MockStore) {
474
ms.indexer.On("AddClass", mock.Anything).Return(errAny)
478
name: "AddClass/AlreadyExists",
479
req: raft.Log{Data: cmdAsBytes("C1",
480
cmd.ApplyRequest_TYPE_ADD_CLASS,
481
cmd.AddClassRequest{Class: cls, State: ss},
483
resp: Response{Error: errSchema},
484
doBefore: func(m *MockStore) {
485
m.indexer.On("Open", mock.Anything).Return(nil)
486
m.parser.On("ParseClass", mock.Anything).Return(nil)
487
m.store.db.Schema.addClass(cls, ss, 1)
491
name: "RestoreClass/Success",
492
req: raft.Log{Data: cmdAsBytes("C1",
493
cmd.ApplyRequest_TYPE_RESTORE_CLASS,
494
cmd.AddClassRequest{Class: cls, State: ss},
496
resp: Response{Error: nil},
497
doBefore: func(m *MockStore) {
498
m.indexer.On("Open", mock.Anything).Return(nil)
499
m.parser.On("ParseClass", mock.Anything).Return(nil)
500
m.indexer.On("RestoreClassDir", cls.Class).Return(nil)
501
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
503
doAfter: func(ms *MockStore) error {
504
_, ok := ms.store.db.Schema.Classes["C1"]
506
return fmt.Errorf("class is missing")
512
name: "UpdateClass/Unmarshal",
513
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_CLASS,
514
nil, &cmd.AddTenantsRequest{})},
515
resp: Response{Error: errBadRequest},
519
name: "UpdateClass/ClassNotFound",
520
req: raft.Log{Data: cmdAsBytes("C1",
521
cmd.ApplyRequest_TYPE_UPDATE_CLASS,
522
cmd.UpdateClassRequest{Class: cls, State: nil},
524
resp: Response{Error: errSchema},
525
doBefore: func(m *MockStore) {
526
m.indexer.On("Open", mock.Anything).Return(nil)
527
m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
531
name: "UpdateClass/ParseUpdate",
532
req: raft.Log{Data: cmdAsBytes("C2",
533
cmd.ApplyRequest_TYPE_UPDATE_CLASS,
534
cmd.UpdateClassRequest{Class: cls, State: nil},
536
resp: Response{Error: errBadRequest},
537
doBefore: func(m *MockStore) {
538
m.indexer.On("Open", mock.Anything).Return(nil)
539
m.store.db.Schema.addClass(cls, ss, 1)
540
m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(nil, errAny)
544
name: "UpdateClass/Success",
545
req: raft.Log{Data: cmdAsBytes("C1",
546
cmd.ApplyRequest_TYPE_UPDATE_CLASS,
547
cmd.UpdateClassRequest{Class: cls, State: nil},
549
resp: Response{Error: nil},
550
doBefore: func(m *MockStore) {
551
m.indexer.On("Open", mock.Anything).Return(nil)
552
m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
553
m.store.db.Schema.addClass(cls, ss, 1)
554
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
558
name: "DeleteClass/Success",
559
req: raft.Log{Data: cmdAsBytes("C1",
560
cmd.ApplyRequest_TYPE_DELETE_CLASS, nil,
562
resp: Response{Error: nil},
563
doBefore: func(m *MockStore) {
564
m.indexer.On("Open", mock.Anything).Return(nil)
565
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
567
doAfter: func(ms *MockStore) error {
568
if _, ok := ms.store.db.Schema.Classes["C1"]; ok {
569
return fmt.Errorf("class still exits")
575
name: "AddProperty/Unmarshal",
576
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
577
nil, &cmd.AddTenantsRequest{})},
578
resp: Response{Error: errBadRequest},
582
name: "AddProperty/ClassNotFound",
583
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
584
cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil)},
585
resp: Response{Error: errSchema},
589
name: "AddProperty/Nil",
591
Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
592
cmd.AddPropertyRequest{Properties: nil}, nil),
594
resp: Response{Error: errBadRequest},
595
doBefore: func(m *MockStore) {
597
m.store.db.Schema.addClass(cls, ss, 1)
601
name: "AddProperty/Success",
603
Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
604
cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil),
606
resp: Response{Error: nil},
607
doBefore: func(m *MockStore) {
608
m.indexer.On("Open", mock.Anything).Return(nil)
609
m.store.db.Schema.addClass(cls, ss, 1)
610
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
612
doAfter: func(ms *MockStore) error {
614
for _, p := range ms.store.db.Schema.Classes["C1"].Class.Properties {
621
return fmt.Errorf("property is missing")
627
name: "UpdateShard/Unmarshal",
628
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
629
nil, &cmd.AddTenantsRequest{})},
630
resp: Response{Error: errBadRequest},
634
name: "UpdateShard/Success",
635
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
636
cmd.UpdateShardStatusRequest{Class: "C1"}, nil)},
637
resp: Response{Error: nil},
641
name: "AddTenant/Unmarshal",
642
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, cmd.AddClassRequest{}, nil)},
643
resp: Response{Error: errBadRequest},
647
name: "AddTenant/ClassNotFound",
648
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, nil, &cmd.AddTenantsRequest{
649
Tenants: []*command.Tenant{nil, {Name: "T1"}, nil},
651
resp: Response{Error: errSchema},
655
name: "AddTenant/Success",
656
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, nil, &cmd.AddTenantsRequest{
657
ClusterNodes: []string{"THIS"},
658
Tenants: []*command.Tenant{nil, {Name: "T1"}, nil},
660
resp: Response{Error: nil},
661
doBefore: func(m *MockStore) {
662
m.indexer.On("Open", mock.Anything).Return(nil)
663
m.store.db.Schema.addClass(cls, &sharding.State{
664
Physical: map[string]sharding.Physical{"T1": {}},
667
doAfter: func(ms *MockStore) error {
668
if _, ok := ms.store.db.Schema.Classes["C1"].Sharding.Physical["T1"]; !ok {
669
return fmt.Errorf("tenant is missing")
675
name: "UpdateTenant/Unmarshal",
676
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT, cmd.AddClassRequest{}, nil)},
677
resp: Response{Error: errBadRequest},
681
name: "UpdateTenant/ClassNotFound",
682
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
683
nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{nil, {Name: "T1"}, nil}})},
684
resp: Response{Error: errSchema},
688
name: "UpdateTenant/NoFound",
689
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
690
nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{
691
{Name: "T1", Status: models.TenantActivityStatusCOLD},
693
resp: Response{Error: errSchema},
694
doBefore: func(m *MockStore) {
695
ss := &sharding.State{Physical: map[string]sharding.Physical{}}
697
m.store.db.Schema.addClass(cls, ss, 1)
701
name: "UpdateTenant/Success",
702
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
703
nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{
704
{Name: "T1", Status: models.TenantActivityStatusCOLD},
705
{Name: "T2", Status: models.TenantActivityStatusCOLD},
706
{Name: "T3", Status: models.TenantActivityStatusCOLD},
708
resp: Response{Error: nil},
709
doBefore: func(m *MockStore) {
710
ss := &sharding.State{Physical: map[string]sharding.Physical{"T1": {
712
BelongsToNodes: []string{"THIS"},
713
Status: models.TenantActivityStatusHOT,
716
BelongsToNodes: []string{"THIS"},
717
Status: models.TenantActivityStatusCOLD,
720
BelongsToNodes: []string{"NODE-2"},
721
Status: models.TenantActivityStatusHOT,
723
m.indexer.On("Open", mock.Anything).Return(nil)
724
m.store.db.Schema.addClass(cls, ss, 1)
726
doAfter: func(ms *MockStore) error {
727
want := map[string]sharding.Physical{"T1": {
729
BelongsToNodes: []string{"THIS"},
730
Status: models.TenantActivityStatusCOLD,
733
BelongsToNodes: []string{"THIS"},
734
Status: models.TenantActivityStatusCOLD,
737
BelongsToNodes: []string{"NODE-2"},
738
Status: models.TenantActivityStatusCOLD,
740
cls := ms.store.db.Schema.Classes["C1"]
741
if got := cls.Sharding.Physical; !reflect.DeepEqual(got, want) {
742
return fmt.Errorf("physical state want: %v got: %v", want, got)
748
name: "DeleteTenant/Unmarshal",
749
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT, cmd.AddClassRequest{}, nil)},
750
resp: Response{Error: errBadRequest},
754
name: "DeleteTenant/ClassNotFound",
755
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT,
756
nil, &cmd.DeleteTenantsRequest{Tenants: []string{"T1", "T2"}})},
757
resp: Response{Error: errSchema},
761
name: "DeleteTenant/Success",
762
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT,
763
nil, &cmd.DeleteTenantsRequest{Tenants: []string{"T1", "T2"}})},
764
resp: Response{Error: nil},
765
doBefore: func(m *MockStore) {
766
m.indexer.On("Open", mock.Anything).Return(nil)
767
m.store.db.Schema.addClass(cls, &sharding.State{Physical: map[string]sharding.Physical{"T1": {}}}, 1)
769
doAfter: func(ms *MockStore) error {
770
if len(ms.store.db.Schema.Classes["C1"].Sharding.Physical) != 0 {
771
return fmt.Errorf("sharding state mus be empty after deletion")
778
for _, tc := range tests {
779
m := NewMockStore(t, "Node-1", 9091)
780
store := m.Store(tc.doBefore)
781
ret := store.Apply(&tc.req)
782
resp, ok := ret.(Response)
784
t.Errorf("%s: response has wrong type", tc.name)
786
if got, want := resp.Error, tc.resp.Error; want != nil {
787
if !errors.Is(resp.Error, tc.resp.Error) {
788
t.Errorf("%s: error want: %v got: %v", tc.name, want, got)
790
} else if got != nil {
791
t.Errorf("%s: error want: nil got: %v", tc.name, got)
793
if tc.doAfter != nil {
794
if err := tc.doAfter(&m); err != nil {
795
t.Errorf("%s check updates: %v", tc.name, err)
797
m.indexer.AssertExpectations(t)
798
m.parser.AssertExpectations(t)
803
func cmdAsBytes(class string,
804
cmdType cmd.ApplyRequest_Type,
805
jsonSubCmd interface{},
806
rpcSubCmd protoreflect.ProtoMessage,
812
if rpcSubCmd != nil {
813
subData, err = gproto.Marshal(rpcSubCmd)
815
panic("proto.Marshal: " + err.Error())
817
} else if jsonSubCmd != nil {
818
subData, err = json.Marshal(jsonSubCmd)
820
panic("json.Marshal( " + err.Error())
824
cmd := command.ApplyRequest{
829
data, err := gproto.Marshal(&cmd)
837
type MockStore struct {
845
func NewMockStore(t *testing.T, nodeID string, raftPort int) MockStore {
846
indexer := &MockIndexer{}
847
parser := &MockParser{}
848
logger := NewMockLogger(t)
855
WorkDir: t.TempDir(),
861
HeartbeatTimeout: 1 * time.Second,
862
ElectionTimeout: 1 * time.Second,
863
RecoveryTimeout: 500 * time.Millisecond,
864
SnapshotInterval: 2 * time.Second,
865
SnapshotThreshold: 125,
868
AddrResolver: &MockAddressResolver{},
869
Logger: logger.Logger,
870
UpdateWaitTimeout: time.Millisecond * 50,
878
func (m *MockStore) Store(doBefore func(*MockStore)) *Store {
885
type MockLogger struct {
887
Logger *logrus.Logger
890
func NewMockLogger(t *testing.T) MockLogger {
891
buf := new(bytes.Buffer)
895
m.Logger = logrus.New()
896
m.Logger.SetFormatter(&logrus.JSONFormatter{})
900
type MockAddressResolver struct {
901
f func(id string) string
904
func (m *MockAddressResolver) NodeAddress(id string) string {
911
type MockIndexer struct {
915
func (m *MockIndexer) AddClass(req cmd.AddClassRequest) error {
916
args := m.Called(req)
920
func (m *MockIndexer) RestoreClassDir(class string) error {
921
args := m.Called(class)
925
func (m *MockIndexer) UpdateClass(req cmd.UpdateClassRequest) error {
926
args := m.Called(req)
930
func (m *MockIndexer) UpdateIndex(req cmd.UpdateClassRequest) error {
931
args := m.Called(req)
935
func (m *MockIndexer) ReloadLocalDB(ctx context.Context, all []api.UpdateClassRequest) error {
939
func (m *MockIndexer) DeleteClass(name string) error {
940
args := m.Called(name)
944
func (m *MockIndexer) AddProperty(class string, req cmd.AddPropertyRequest) error {
945
args := m.Called(class, req)
949
func (m *MockIndexer) AddTenants(class string, req *cmd.AddTenantsRequest) error {
950
args := m.Called(class, req)
954
func (m *MockIndexer) UpdateTenants(class string, req *cmd.UpdateTenantsRequest) error {
955
args := m.Called(class, req)
959
func (m *MockIndexer) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) error {
960
args := m.Called(class, req)
964
func (m *MockIndexer) UpdateShardStatus(req *cmd.UpdateShardStatusRequest) error {
965
args := m.Called(req)
969
func (m *MockIndexer) GetShardsStatus(class, tenant string) (models.ShardStatusList, error) {
970
args := m.Called(class, tenant)
971
return models.ShardStatusList{}, args.Error(1)
974
func (m *MockIndexer) Open(ctx context.Context) error {
975
args := m.Called(ctx)
979
func (m *MockIndexer) Close(ctx context.Context) error {
980
args := m.Called(ctx)
984
func (m *MockIndexer) TriggerSchemaUpdateCallbacks() {
988
type MockParser struct {
992
func (m *MockParser) ParseClass(class *models.Class) error {
993
args := m.Called(class)
997
func (m *MockParser) ParseClassUpdate(class, update *models.Class) (*models.Class, error) {
998
args := m.Called(class)
999
return update, args.Error(1)