weaviate

Форк
0
/
store_test.go 
1000 строк · 29.9 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package store
13

14
import (
15
	"bytes"
16
	"context"
17
	"encoding/json"
18
	"errors"
19
	"fmt"
20
	"reflect"
21
	"strconv"
22
	"strings"
23
	"testing"
24
	"time"
25

26
	"github.com/hashicorp/raft"
27
	"github.com/sirupsen/logrus"
28
	"github.com/stretchr/testify/assert"
29
	"github.com/stretchr/testify/mock"
30
	"github.com/weaviate/weaviate/cluster/proto/api"
31
	cmd "github.com/weaviate/weaviate/cluster/proto/api"
32
	command "github.com/weaviate/weaviate/cluster/proto/api"
33
	"github.com/weaviate/weaviate/cluster/utils"
34
	"github.com/weaviate/weaviate/entities/models"
35
	"github.com/weaviate/weaviate/usecases/sharding"
36
	gproto "google.golang.org/protobuf/proto"
37
	"google.golang.org/protobuf/reflect/protoreflect"
38
)
39

40
var (
41
	errAny   = errors.New("any error")
42
	Anything = mock.Anything
43
)
44

45
func TestServiceEndpoints(t *testing.T) {
46
	ctx := context.Background()
47
	m := NewMockStore(t, "Node-1", utils.MustGetFreeTCPPort())
48
	addr := fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
49
	m.indexer.On("Open", Anything).Return(nil)
50
	m.indexer.On("Close", Anything).Return(nil)
51
	m.indexer.On("AddClass", Anything).Return(nil)
52
	m.indexer.On("RestoreClassDir", Anything).Return(nil)
53
	m.indexer.On("UpdateClass", Anything).Return(nil)
54
	m.indexer.On("DeleteClass", Anything).Return(nil)
55
	m.indexer.On("AddProperty", Anything, Anything).Return(nil)
56
	m.indexer.On("UpdateShardStatus", Anything).Return(nil)
57
	m.indexer.On("AddTenants", Anything, Anything).Return(nil)
58
	m.indexer.On("UpdateTenants", Anything, Anything).Return(nil)
59
	m.indexer.On("DeleteTenants", Anything, Anything).Return(nil)
60
	m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
61

62
	m.parser.On("ParseClass", mock.Anything).Return(nil)
63
	m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
64

65
	srv := NewService(m.store, nil)
66

67
	// LeaderNotFound
68
	_, err := srv.Execute(&command.ApplyRequest{})
69
	assert.ErrorIs(t, err, ErrLeaderNotFound)
70
	assert.ErrorIs(t, srv.Join(ctx, m.store.nodeID, addr, true), ErrLeaderNotFound)
71
	assert.ErrorIs(t, srv.Remove(ctx, m.store.nodeID), ErrLeaderNotFound)
72

73
	// Deadline exceeded while waiting for DB to be restored
74
	func() {
75
		ctx, cancel := context.WithTimeout(ctx, time.Millisecond*30)
76
		defer cancel()
77
		assert.ErrorIs(t, srv.WaitUntilDBRestored(ctx, 5*time.Millisecond, make(chan struct{})), context.DeadlineExceeded)
78
	}()
79

80
	// Open
81
	defer srv.Close(ctx)
82
	assert.Nil(t, srv.Open(ctx, m.indexer))
83

84
	// node lose leadership after service call
85
	assert.ErrorIs(t, srv.store.Join(m.store.nodeID, addr, true), ErrNotLeader)
86
	assert.ErrorIs(t, srv.store.Remove(m.store.nodeID), ErrNotLeader)
87

88
	// Connect
89
	assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
90

91
	assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*1, make(chan struct{})))
92
	assert.True(t, tryNTimesWithWait(10, time.Millisecond*200, srv.Ready))
93
	tryNTimesWithWait(20, time.Millisecond*100, srv.store.IsLeader)
94
	assert.True(t, srv.store.IsLeader())
95
	schema := srv.SchemaReader()
96
	assert.Equal(t, schema.Len(), 0)
97

98
	// AddClass
99
	_, err = srv.AddClass(nil, nil)
100
	assert.ErrorIs(t, err, errBadRequest)
101
	assert.Equal(t, schema.ClassEqual("C"), "")
102

103
	cls := &models.Class{
104
		Class:              "C",
105
		MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true},
106
	}
107
	ss := &sharding.State{PartitioningEnabled: true, Physical: map[string]sharding.Physical{"T0": {Name: "T0"}}}
108
	version0, err := srv.AddClass(cls, ss)
109
	assert.Nil(t, err)
110
	assert.Equal(t, schema.ClassEqual("C"), "C")
111

112
	// Add same class again
113
	_, err = srv.AddClass(cls, ss)
114
	assert.Error(t, err)
115
	assert.Equal(t, "class name C already exists", err.Error())
116

117
	// Add similar class
118
	_, err = srv.AddClass(&models.Class{Class: "c"}, ss)
119
	assert.ErrorIs(t, err, errClassExists)
120

121
	// QueryReadOnlyClass
122
	readOnlyVClass, err := srv.QueryReadOnlyClasses(cls.Class)
123
	assert.NoError(t, err)
124
	assert.NotNil(t, readOnlyVClass[cls.Class].Class)
125
	assert.Equal(t, cls, readOnlyVClass[cls.Class].Class)
126

127
	// QuerySchema
128
	getSchema, err := srv.QuerySchema()
129
	assert.NoError(t, err)
130
	assert.NotNil(t, getSchema)
131
	assert.Equal(t, models.Schema{Classes: []*models.Class{readOnlyVClass[cls.Class].Class}}, getSchema)
132

133
	// QueryTenants all
134
	getTenantsAll, _, err := srv.QueryTenants(cls.Class, []string{})
135
	assert.NoError(t, err)
136
	assert.NotNil(t, getTenantsAll)
137
	assert.Equal(t, []*models.Tenant{{
138
		Name:           "T0",
139
		ActivityStatus: models.TenantActivityStatusHOT,
140
	}}, getTenantsAll)
141

142
	// QueryTenants one
143
	getTenantsOne, _, err := srv.QueryTenants(cls.Class, []string{"T0"})
144
	assert.NoError(t, err)
145
	assert.NotNil(t, getTenantsOne)
146
	assert.Equal(t, []*models.Tenant{{
147
		Name:           "T0",
148
		ActivityStatus: models.TenantActivityStatusHOT,
149
	}}, getTenantsOne)
150

151
	// QueryTenants one
152
	getTenantsNone, _, err := srv.QueryTenants(cls.Class, []string{"T"})
153
	assert.NoError(t, err)
154
	assert.NotNil(t, getTenantsNone)
155
	assert.Equal(t, []*models.Tenant{}, getTenantsNone)
156

157
	// Query ShardTenant
158
	getTenantShards, _, err := srv.QueryTenantsShards(cls.Class, "T0")
159
	for tenant, status := range getTenantShards {
160
		assert.Nil(t, err)
161
		assert.Equal(t, "T0", tenant)
162
		assert.Equal(t, models.TenantActivityStatusHOT, status)
163
	}
164

165
	// QueryShardOwner - Err
166
	_, _, err = srv.QueryShardOwner(cls.Class, "T0")
167
	assert.NotNil(t, err)
168

169
	// QueryShardOwner
170
	mc := srv.SchemaReader().metaClass(cls.Class)
171
	mc.Sharding = sharding.State{Physical: map[string]sharding.Physical{"T0": {BelongsToNodes: []string{"N0"}}}}
172
	getShardOwner, _, err := srv.QueryShardOwner(cls.Class, "T0")
173
	assert.Nil(t, err)
174
	assert.Equal(t, "N0", getShardOwner)
175

176
	// QueryShardingState
177
	mc.Sharding = sharding.State{Physical: map[string]sharding.Physical{"T0": {BelongsToNodes: []string{"N0"}}}}
178
	getShardingState, _, err := srv.QueryShardingState(cls.Class)
179
	assert.Nil(t, err)
180
	assert.Equal(t, &mc.Sharding, getShardingState)
181

182
	// UpdateClass
183
	info := ClassInfo{
184
		Exists:            true,
185
		MultiTenancy:      models.MultiTenancyConfig{Enabled: true},
186
		ReplicationFactor: 1,
187
		Tenants:           1,
188
	}
189
	_, err = srv.UpdateClass(nil, nil)
190
	assert.ErrorIs(t, err, errBadRequest)
191
	cls.MultiTenancyConfig = &models.MultiTenancyConfig{Enabled: true}
192
	cls.ReplicationConfig = &models.ReplicationConfig{Factor: 1}
193
	ss.Physical = map[string]sharding.Physical{"T0": {Name: "T0"}}
194
	version, err := srv.UpdateClass(cls, nil)
195
	info.ClassVersion = version
196
	info.ShardVersion = version0
197
	assert.Nil(t, err)
198
	assert.Nil(t, srv.store.WaitForAppliedIndex(ctx, time.Millisecond*10, version))
199
	assert.Equal(t, info, schema.ClassInfo("C"))
200
	assert.ErrorIs(t, srv.store.WaitForAppliedIndex(ctx, time.Millisecond*10, srv.store.lastAppliedIndex.Load()+1), ErrDeadlineExceeded)
201

202
	// DeleteClass
203
	_, err = srv.DeleteClass("X")
204
	assert.Nil(t, err)
205
	_, err = srv.DeleteClass("C")
206
	assert.Nil(t, err)
207
	assert.Equal(t, ClassInfo{}, schema.ClassInfo("C"))
208

209
	// RestoreClass
210
	_, err = srv.RestoreClass(nil, nil)
211
	assert.ErrorIs(t, err, errBadRequest)
212
	version, err = srv.RestoreClass(cls, ss)
213
	assert.Nil(t, err)
214
	info.ClassVersion = version
215
	info.ShardVersion = version
216
	assert.Equal(t, info, schema.ClassInfo("C"))
217

218
	// AddProperty
219
	_, err = srv.AddProperty("C", nil)
220
	assert.ErrorIs(t, err, errBadRequest)
221
	_, err = srv.AddProperty("", &models.Property{Name: "P1"})
222
	assert.ErrorIs(t, err, errBadRequest)
223
	version, err = srv.AddProperty("C", &models.Property{Name: "P1"})
224
	assert.Nil(t, err)
225
	info.ClassVersion = version
226
	info.Properties = 1
227
	assert.Equal(t, info, schema.ClassInfo("C"))
228

229
	// UpdateStatus
230
	_, err = srv.UpdateShardStatus("", "A", "ACTIVE")
231
	assert.ErrorIs(t, err, errBadRequest)
232
	_, err = srv.UpdateShardStatus("C", "", "ACTIVE")
233
	assert.ErrorIs(t, err, errBadRequest)
234
	_, err = srv.UpdateShardStatus("C", "A", "ACTIVE")
235
	assert.Nil(t, err)
236

237
	// AddTenants
238
	_, err = srv.AddTenants("", &command.AddTenantsRequest{})
239
	assert.ErrorIs(t, err, errBadRequest)
240
	version, err = srv.AddTenants("C", &command.AddTenantsRequest{
241
		ClusterNodes: []string{"Node-1"},
242
		Tenants:      []*command.Tenant{nil, {Name: "T2", Status: "S1"}, nil},
243
	})
244
	assert.Nil(t, err)
245
	info.ShardVersion = version
246
	info.Tenants += 1
247
	assert.Equal(t, schema.ClassInfo("C"), info)
248

249
	// UpdateTenants
250
	_, err = srv.UpdateTenants("", &command.UpdateTenantsRequest{})
251
	assert.ErrorIs(t, err, errBadRequest)
252
	_, err = srv.UpdateTenants("C", &command.UpdateTenantsRequest{Tenants: []*command.Tenant{{Name: "T2", Status: "S2"}}})
253
	assert.Nil(t, err)
254

255
	// DeleteTenants
256
	_, err = srv.DeleteTenants("", &command.DeleteTenantsRequest{})
257
	assert.ErrorIs(t, err, errBadRequest)
258
	version, err = srv.DeleteTenants("C", &command.DeleteTenantsRequest{Tenants: []string{"T0", "Tn"}})
259
	assert.Nil(t, err)
260
	info.Tenants -= 1
261
	info.ShardVersion = version
262
	assert.Equal(t, info, schema.ClassInfo("C"))
263
	assert.Equal(t, "S2", schema.CopyShardingState("C").Physical["T2"].Status)
264

265
	// Self Join
266
	assert.Nil(t, srv.Join(ctx, m.store.nodeID, addr, true))
267
	assert.True(t, srv.store.IsLeader())
268
	assert.Nil(t, srv.Join(ctx, m.store.nodeID, addr, false))
269
	assert.True(t, srv.store.IsLeader())
270
	assert.ErrorContains(t, srv.Remove(ctx, m.store.nodeID), "configuration")
271
	assert.True(t, srv.store.IsLeader())
272

273
	// Stats
274
	stats := srv.Stats()
275
	// stats:raft_state
276
	assert.Equal(t, "Leader", stats["raft"].(map[string]string)["state"])
277
	// stats:leader_address
278
	leaderAddress := string(stats["leader_address"].(raft.ServerAddress))
279
	splitAddress := strings.Split(leaderAddress, ":")
280
	assert.Len(t, splitAddress, 2)
281
	ipAddress, portStr := splitAddress[0], splitAddress[1]
282
	assert.Equal(t, "127.0.0.1", ipAddress)
283
	port, err := strconv.Atoi(portStr)
284
	if err != nil {
285
		t.Errorf("Port should have been parsable as an int but was: %v", portStr)
286
	}
287
	assert.GreaterOrEqual(t, port, 0)
288
	// stats:leader_id
289
	leaderID := string(stats["leader_id"].(raft.ServerID))
290
	assert.Equal(t, m.store.nodeID, leaderID)
291

292
	// create snapshot
293
	assert.Nil(t, srv.store.raft.Barrier(2*time.Second).Error())
294
	assert.Nil(t, srv.store.raft.Snapshot().Error())
295

296
	// restore from snapshot
297
	assert.Nil(t, srv.Close(ctx))
298
	srv.store.db.Schema.clear()
299

300
	s := New(m.cfg)
301
	m.store = &s
302
	srv = NewService(m.store, nil)
303
	assert.Nil(t, srv.Open(ctx, m.indexer))
304
	assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
305
	assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*1, make(chan struct{})))
306
	assert.True(t, tryNTimesWithWait(10, time.Millisecond*200, srv.Ready))
307
	tryNTimesWithWait(20, time.Millisecond*100, srv.store.IsLeader)
308
	clInfo := srv.store.db.Schema.ClassInfo("C")
309
	assert.Equal(t, info, clInfo)
310
}
311

312
// Runs the provided function `predicate` up to `n` times, sleeping `sleepDuration` between each
313
// function call until `f` returns true or returns false if all `n` calls return false.
314
// Useful in tests which require an unknown but bounded delay where the component under test has
315
// a way to indicate when it's ready to proceed.
316
func tryNTimesWithWait(n int, sleepDuration time.Duration, predicate func() bool) bool {
317
	for i := 0; i < n; i++ {
318
		if predicate() {
319
			return true
320
		}
321
		time.Sleep(sleepDuration)
322
	}
323
	return false
324
}
325

326
func TestServiceStoreInit(t *testing.T) {
327
	var (
328
		ctx   = context.Background()
329
		m     = NewMockStore(t, "Node-1", 9093)
330
		store = m.store
331
		addr  = fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
332
	)
333

334
	// NotOpen
335
	assert.ErrorIs(t, store.Join(m.store.nodeID, addr, true), ErrNotOpen)
336
	assert.ErrorIs(t, store.Remove(m.store.nodeID), ErrNotOpen)
337
	assert.ErrorIs(t, store.Notify(m.store.nodeID, addr), ErrNotOpen)
338

339
	// Already Open
340
	store.open.Store(true)
341
	assert.Nil(t, store.Open(ctx))
342

343
	// notify non voter
344
	store.bootstrapExpect = 0
345
	assert.Nil(t, store.Notify("A", "localhost:123"))
346

347
	// not enough voter
348
	store.bootstrapExpect = 2
349
	assert.Nil(t, store.Notify("A", "localhost:123"))
350
}
351

352
func TestServiceClose(t *testing.T) {
353
	ctx := context.Background()
354
	m := NewMockStore(t, "Node-1", utils.MustGetFreeTCPPort())
355
	addr := fmt.Sprintf("%s:%d", m.cfg.Host, m.cfg.RaftPort)
356
	s := New(m.cfg)
357
	m.store = &s
358
	srv := NewService(m.store, nil)
359
	m.indexer.On("Open", mock.Anything).Return(nil)
360
	assert.Nil(t, srv.Open(ctx, m.indexer))
361
	assert.Nil(t, srv.store.Notify(m.cfg.NodeID, addr))
362
	close := make(chan struct{})
363
	go func() {
364
		time.Sleep(time.Second)
365
		close <- struct{}{}
366
	}()
367
	now := time.Now()
368
	assert.Nil(t, srv.WaitUntilDBRestored(ctx, time.Second*10, close))
369
	after := time.Now()
370
	assert.Less(t, after.Sub(now), 2*time.Second)
371
}
372

373
func TestServicePanics(t *testing.T) {
374
	m := NewMockStore(t, "Node-1", 9091)
375

376
	// Assert Correct Response Type
377
	ret := m.store.Apply(&raft.Log{Type: raft.LogNoop})
378
	resp, ok := ret.(Response)
379
	assert.True(t, ok)
380
	assert.Equal(t, resp, Response{})
381

382
	// Unknown Command
383
	assert.Panics(t, func() { m.store.Apply(&raft.Log{}) })
384

385
	// Not a Valid Payload
386
	assert.Panics(t, func() { m.store.Apply(&raft.Log{Data: []byte("a")}) })
387

388
	// Cannot Open File Store
389
	m.indexer.On("Open", mock.Anything).Return(errAny)
390
	assert.Panics(t, func() { m.store.loadDatabase(context.TODO()) })
391
}
392

393
func TestStoreApply(t *testing.T) {
394
	doFirst := func(m *MockStore) {
395
		m.indexer.On("Open", mock.Anything).Return(nil)
396
		m.parser.On("ParseClass", mock.Anything).Return(nil)
397
		m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
398
	}
399

400
	cls := &models.Class{Class: "C1", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
401
	ss := &sharding.State{Physical: map[string]sharding.Physical{"T1": {
402
		Name:           "T1",
403
		BelongsToNodes: []string{"THIS"},
404
	}, "T2": {
405
		Name:           "T2",
406
		BelongsToNodes: []string{"THIS"},
407
	}}}
408

409
	tests := []struct {
410
		name     string
411
		req      raft.Log
412
		resp     Response
413
		doBefore func(*MockStore)
414
		doAfter  func(*MockStore) error
415
	}{
416
		{
417
			name: "AddClass/Unmarshal",
418
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_CLASS,
419
				nil, &cmd.AddTenantsRequest{})},
420
			resp:     Response{Error: errBadRequest},
421
			doBefore: doFirst,
422
		},
423
		{
424
			name: "AddClass/StateIsNil",
425
			req: raft.Log{Data: cmdAsBytes("C2",
426
				cmd.ApplyRequest_TYPE_ADD_CLASS,
427
				cmd.AddClassRequest{Class: cls, State: nil},
428
				nil)},
429
			resp: Response{Error: errBadRequest},
430
			doBefore: func(m *MockStore) {
431
				m.indexer.On("Open", mock.Anything).Return(nil)
432
			},
433
		},
434
		{
435
			name: "AddClass/ParseClass",
436
			req: raft.Log{Data: cmdAsBytes("C2",
437
				cmd.ApplyRequest_TYPE_ADD_CLASS,
438
				cmd.AddClassRequest{Class: cls, State: ss},
439
				nil)},
440
			resp: Response{Error: errBadRequest},
441
			doBefore: func(m *MockStore) {
442
				m.indexer.On("Open", mock.Anything).Return(nil)
443
				m.parser.On("ParseClass", mock.Anything).Return(errAny)
444
			},
445
		},
446
		{
447
			name: "AddClass/Success",
448
			req: raft.Log{Data: cmdAsBytes("C1",
449
				cmd.ApplyRequest_TYPE_ADD_CLASS,
450
				cmd.AddClassRequest{Class: cls, State: ss},
451
				nil)},
452
			resp:     Response{Error: nil},
453
			doBefore: doFirst,
454
			doAfter: func(ms *MockStore) error {
455
				_, ok := ms.store.db.Schema.Classes["C1"]
456
				if !ok {
457
					return fmt.Errorf("class is missing")
458
				}
459
				return nil
460
			},
461
		},
462
		{
463
			name: "AddClass/DBError",
464
			req: raft.Log{
465
				Index: 3,
466
				Data: cmdAsBytes("C1",
467
					cmd.ApplyRequest_TYPE_ADD_CLASS,
468
					cmd.AddClassRequest{Class: cls, State: ss},
469
					nil),
470
			},
471
			resp: Response{Error: errAny},
472
			doBefore: func(ms *MockStore) {
473
				doFirst(ms)
474
				ms.indexer.On("AddClass", mock.Anything).Return(errAny)
475
			},
476
		},
477
		{
478
			name: "AddClass/AlreadyExists",
479
			req: raft.Log{Data: cmdAsBytes("C1",
480
				cmd.ApplyRequest_TYPE_ADD_CLASS,
481
				cmd.AddClassRequest{Class: cls, State: ss},
482
				nil)},
483
			resp: Response{Error: errSchema},
484
			doBefore: func(m *MockStore) {
485
				m.indexer.On("Open", mock.Anything).Return(nil)
486
				m.parser.On("ParseClass", mock.Anything).Return(nil)
487
				m.store.db.Schema.addClass(cls, ss, 1)
488
			},
489
		},
490
		{
491
			name: "RestoreClass/Success",
492
			req: raft.Log{Data: cmdAsBytes("C1",
493
				cmd.ApplyRequest_TYPE_RESTORE_CLASS,
494
				cmd.AddClassRequest{Class: cls, State: ss},
495
				nil)},
496
			resp: Response{Error: nil},
497
			doBefore: func(m *MockStore) {
498
				m.indexer.On("Open", mock.Anything).Return(nil)
499
				m.parser.On("ParseClass", mock.Anything).Return(nil)
500
				m.indexer.On("RestoreClassDir", cls.Class).Return(nil)
501
				m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
502
			},
503
			doAfter: func(ms *MockStore) error {
504
				_, ok := ms.store.db.Schema.Classes["C1"]
505
				if !ok {
506
					return fmt.Errorf("class is missing")
507
				}
508
				return nil
509
			},
510
		},
511
		{
512
			name: "UpdateClass/Unmarshal",
513
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_CLASS,
514
				nil, &cmd.AddTenantsRequest{})},
515
			resp:     Response{Error: errBadRequest},
516
			doBefore: doFirst,
517
		},
518
		{
519
			name: "UpdateClass/ClassNotFound",
520
			req: raft.Log{Data: cmdAsBytes("C1",
521
				cmd.ApplyRequest_TYPE_UPDATE_CLASS,
522
				cmd.UpdateClassRequest{Class: cls, State: nil},
523
				nil)},
524
			resp: Response{Error: errSchema},
525
			doBefore: func(m *MockStore) {
526
				m.indexer.On("Open", mock.Anything).Return(nil)
527
				m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
528
			},
529
		},
530
		{
531
			name: "UpdateClass/ParseUpdate",
532
			req: raft.Log{Data: cmdAsBytes("C2",
533
				cmd.ApplyRequest_TYPE_UPDATE_CLASS,
534
				cmd.UpdateClassRequest{Class: cls, State: nil},
535
				nil)},
536
			resp: Response{Error: errBadRequest},
537
			doBefore: func(m *MockStore) {
538
				m.indexer.On("Open", mock.Anything).Return(nil)
539
				m.store.db.Schema.addClass(cls, ss, 1)
540
				m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(nil, errAny)
541
			},
542
		},
543
		{
544
			name: "UpdateClass/Success",
545
			req: raft.Log{Data: cmdAsBytes("C1",
546
				cmd.ApplyRequest_TYPE_UPDATE_CLASS,
547
				cmd.UpdateClassRequest{Class: cls, State: nil},
548
				nil)},
549
			resp: Response{Error: nil},
550
			doBefore: func(m *MockStore) {
551
				m.indexer.On("Open", mock.Anything).Return(nil)
552
				m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
553
				m.store.db.Schema.addClass(cls, ss, 1)
554
				m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
555
			},
556
		},
557
		{
558
			name: "DeleteClass/Success",
559
			req: raft.Log{Data: cmdAsBytes("C1",
560
				cmd.ApplyRequest_TYPE_DELETE_CLASS, nil,
561
				nil)},
562
			resp: Response{Error: nil},
563
			doBefore: func(m *MockStore) {
564
				m.indexer.On("Open", mock.Anything).Return(nil)
565
				m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
566
			},
567
			doAfter: func(ms *MockStore) error {
568
				if _, ok := ms.store.db.Schema.Classes["C1"]; ok {
569
					return fmt.Errorf("class still exits")
570
				}
571
				return nil
572
			},
573
		},
574
		{
575
			name: "AddProperty/Unmarshal",
576
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
577
				nil, &cmd.AddTenantsRequest{})},
578
			resp:     Response{Error: errBadRequest},
579
			doBefore: doFirst,
580
		},
581
		{
582
			name: "AddProperty/ClassNotFound",
583
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
584
				cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil)},
585
			resp:     Response{Error: errSchema},
586
			doBefore: doFirst,
587
		},
588
		{
589
			name: "AddProperty/Nil",
590
			req: raft.Log{
591
				Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
592
					cmd.AddPropertyRequest{Properties: nil}, nil),
593
			},
594
			resp: Response{Error: errBadRequest},
595
			doBefore: func(m *MockStore) {
596
				doFirst(m)
597
				m.store.db.Schema.addClass(cls, ss, 1)
598
			},
599
		},
600
		{
601
			name: "AddProperty/Success",
602
			req: raft.Log{
603
				Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_PROPERTY,
604
					cmd.AddPropertyRequest{Properties: []*models.Property{{Name: "P1"}}}, nil),
605
			},
606
			resp: Response{Error: nil},
607
			doBefore: func(m *MockStore) {
608
				m.indexer.On("Open", mock.Anything).Return(nil)
609
				m.store.db.Schema.addClass(cls, ss, 1)
610
				m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
611
			},
612
			doAfter: func(ms *MockStore) error {
613
				ok := false
614
				for _, p := range ms.store.db.Schema.Classes["C1"].Class.Properties {
615
					if p.Name == "P1" {
616
						ok = true
617
						break
618
					}
619
				}
620
				if !ok {
621
					return fmt.Errorf("property is missing")
622
				}
623
				return nil
624
			},
625
		},
626
		{
627
			name: "UpdateShard/Unmarshal",
628
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
629
				nil, &cmd.AddTenantsRequest{})},
630
			resp:     Response{Error: errBadRequest},
631
			doBefore: doFirst,
632
		},
633
		{
634
			name: "UpdateShard/Success",
635
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
636
				cmd.UpdateShardStatusRequest{Class: "C1"}, nil)},
637
			resp:     Response{Error: nil},
638
			doBefore: doFirst,
639
		},
640
		{
641
			name:     "AddTenant/Unmarshal",
642
			req:      raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, cmd.AddClassRequest{}, nil)},
643
			resp:     Response{Error: errBadRequest},
644
			doBefore: doFirst,
645
		},
646
		{
647
			name: "AddTenant/ClassNotFound",
648
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, nil, &cmd.AddTenantsRequest{
649
				Tenants: []*command.Tenant{nil, {Name: "T1"}, nil},
650
			})},
651
			resp:     Response{Error: errSchema},
652
			doBefore: doFirst,
653
		},
654
		{
655
			name: "AddTenant/Success",
656
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_ADD_TENANT, nil, &cmd.AddTenantsRequest{
657
				ClusterNodes: []string{"THIS"},
658
				Tenants:      []*command.Tenant{nil, {Name: "T1"}, nil},
659
			})},
660
			resp: Response{Error: nil},
661
			doBefore: func(m *MockStore) {
662
				m.indexer.On("Open", mock.Anything).Return(nil)
663
				m.store.db.Schema.addClass(cls, &sharding.State{
664
					Physical: map[string]sharding.Physical{"T1": {}},
665
				}, 1)
666
			},
667
			doAfter: func(ms *MockStore) error {
668
				if _, ok := ms.store.db.Schema.Classes["C1"].Sharding.Physical["T1"]; !ok {
669
					return fmt.Errorf("tenant is missing")
670
				}
671
				return nil
672
			},
673
		},
674
		{
675
			name:     "UpdateTenant/Unmarshal",
676
			req:      raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT, cmd.AddClassRequest{}, nil)},
677
			resp:     Response{Error: errBadRequest},
678
			doBefore: doFirst,
679
		},
680
		{
681
			name: "UpdateTenant/ClassNotFound",
682
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
683
				nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{nil, {Name: "T1"}, nil}})},
684
			resp:     Response{Error: errSchema},
685
			doBefore: doFirst,
686
		},
687
		{
688
			name: "UpdateTenant/NoFound",
689
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
690
				nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{
691
					{Name: "T1", Status: models.TenantActivityStatusCOLD},
692
				}})},
693
			resp: Response{Error: errSchema},
694
			doBefore: func(m *MockStore) {
695
				ss := &sharding.State{Physical: map[string]sharding.Physical{}}
696
				doFirst(m)
697
				m.store.db.Schema.addClass(cls, ss, 1)
698
			},
699
		},
700
		{
701
			name: "UpdateTenant/Success",
702
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_TENANT,
703
				nil, &cmd.UpdateTenantsRequest{Tenants: []*command.Tenant{
704
					{Name: "T1", Status: models.TenantActivityStatusCOLD},
705
					{Name: "T2", Status: models.TenantActivityStatusCOLD},
706
					{Name: "T3", Status: models.TenantActivityStatusCOLD},
707
				}})},
708
			resp: Response{Error: nil},
709
			doBefore: func(m *MockStore) {
710
				ss := &sharding.State{Physical: map[string]sharding.Physical{"T1": {
711
					Name:           "T1",
712
					BelongsToNodes: []string{"THIS"},
713
					Status:         models.TenantActivityStatusHOT,
714
				}, "T2": {
715
					Name:           "T2",
716
					BelongsToNodes: []string{"THIS"},
717
					Status:         models.TenantActivityStatusCOLD,
718
				}, "T3": {
719
					Name:           "T3",
720
					BelongsToNodes: []string{"NODE-2"},
721
					Status:         models.TenantActivityStatusHOT,
722
				}}}
723
				m.indexer.On("Open", mock.Anything).Return(nil)
724
				m.store.db.Schema.addClass(cls, ss, 1)
725
			},
726
			doAfter: func(ms *MockStore) error {
727
				want := map[string]sharding.Physical{"T1": {
728
					Name:           "T1",
729
					BelongsToNodes: []string{"THIS"},
730
					Status:         models.TenantActivityStatusCOLD,
731
				}, "T2": {
732
					Name:           "T2",
733
					BelongsToNodes: []string{"THIS"},
734
					Status:         models.TenantActivityStatusCOLD,
735
				}, "T3": {
736
					Name:           "T3",
737
					BelongsToNodes: []string{"NODE-2"},
738
					Status:         models.TenantActivityStatusCOLD,
739
				}}
740
				cls := ms.store.db.Schema.Classes["C1"]
741
				if got := cls.Sharding.Physical; !reflect.DeepEqual(got, want) {
742
					return fmt.Errorf("physical state want: %v got: %v", want, got)
743
				}
744
				return nil
745
			},
746
		},
747
		{
748
			name:     "DeleteTenant/Unmarshal",
749
			req:      raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT, cmd.AddClassRequest{}, nil)},
750
			resp:     Response{Error: errBadRequest},
751
			doBefore: doFirst,
752
		},
753
		{
754
			name: "DeleteTenant/ClassNotFound",
755
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT,
756
				nil, &cmd.DeleteTenantsRequest{Tenants: []string{"T1", "T2"}})},
757
			resp:     Response{Error: errSchema},
758
			doBefore: doFirst,
759
		},
760
		{
761
			name: "DeleteTenant/Success",
762
			req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_DELETE_TENANT,
763
				nil, &cmd.DeleteTenantsRequest{Tenants: []string{"T1", "T2"}})},
764
			resp: Response{Error: nil},
765
			doBefore: func(m *MockStore) {
766
				m.indexer.On("Open", mock.Anything).Return(nil)
767
				m.store.db.Schema.addClass(cls, &sharding.State{Physical: map[string]sharding.Physical{"T1": {}}}, 1)
768
			},
769
			doAfter: func(ms *MockStore) error {
770
				if len(ms.store.db.Schema.Classes["C1"].Sharding.Physical) != 0 {
771
					return fmt.Errorf("sharding state mus be empty after deletion")
772
				}
773
				return nil
774
			},
775
		},
776
	}
777

778
	for _, tc := range tests {
779
		m := NewMockStore(t, "Node-1", 9091)
780
		store := m.Store(tc.doBefore)
781
		ret := store.Apply(&tc.req)
782
		resp, ok := ret.(Response)
783
		if !ok {
784
			t.Errorf("%s: response has wrong type", tc.name)
785
		}
786
		if got, want := resp.Error, tc.resp.Error; want != nil {
787
			if !errors.Is(resp.Error, tc.resp.Error) {
788
				t.Errorf("%s: error want: %v got: %v", tc.name, want, got)
789
			}
790
		} else if got != nil {
791
			t.Errorf("%s: error want: nil got: %v", tc.name, got)
792
		}
793
		if tc.doAfter != nil {
794
			if err := tc.doAfter(&m); err != nil {
795
				t.Errorf("%s check updates: %v", tc.name, err)
796
			}
797
			m.indexer.AssertExpectations(t)
798
			m.parser.AssertExpectations(t)
799
		}
800
	}
801
}
802

803
func cmdAsBytes(class string,
804
	cmdType cmd.ApplyRequest_Type,
805
	jsonSubCmd interface{},
806
	rpcSubCmd protoreflect.ProtoMessage,
807
) []byte {
808
	var (
809
		subData []byte
810
		err     error
811
	)
812
	if rpcSubCmd != nil {
813
		subData, err = gproto.Marshal(rpcSubCmd)
814
		if err != nil {
815
			panic("proto.Marshal: " + err.Error())
816
		}
817
	} else if jsonSubCmd != nil {
818
		subData, err = json.Marshal(jsonSubCmd)
819
		if err != nil {
820
			panic("json.Marshal( " + err.Error())
821
		}
822
	}
823

824
	cmd := command.ApplyRequest{
825
		Type:       cmdType,
826
		Class:      class,
827
		SubCommand: subData,
828
	}
829
	data, err := gproto.Marshal(&cmd)
830
	if err != nil {
831
		panic(err)
832
	}
833

834
	return data
835
}
836

837
type MockStore struct {
838
	indexer *MockIndexer
839
	parser  *MockParser
840
	logger  MockLogger
841
	cfg     Config
842
	store   *Store
843
}
844

845
func NewMockStore(t *testing.T, nodeID string, raftPort int) MockStore {
846
	indexer := &MockIndexer{}
847
	parser := &MockParser{}
848
	logger := NewMockLogger(t)
849
	ms := MockStore{
850
		indexer: indexer,
851
		parser:  parser,
852
		logger:  logger,
853

854
		cfg: Config{
855
			WorkDir:           t.TempDir(),
856
			NodeID:            nodeID,
857
			Host:              "localhost",
858
			RaftPort:          raftPort,
859
			Voter:             true,
860
			BootstrapExpect:   1,
861
			HeartbeatTimeout:  1 * time.Second,
862
			ElectionTimeout:   1 * time.Second,
863
			RecoveryTimeout:   500 * time.Millisecond,
864
			SnapshotInterval:  2 * time.Second,
865
			SnapshotThreshold: 125,
866
			DB:                indexer,
867
			Parser:            parser,
868
			AddrResolver:      &MockAddressResolver{},
869
			Logger:            logger.Logger,
870
			UpdateWaitTimeout: time.Millisecond * 50,
871
		},
872
	}
873
	s := New(ms.cfg)
874
	ms.store = &s
875
	return ms
876
}
877

878
func (m *MockStore) Store(doBefore func(*MockStore)) *Store {
879
	if doBefore != nil {
880
		doBefore(m)
881
	}
882
	return m.store
883
}
884

885
type MockLogger struct {
886
	buf    *bytes.Buffer
887
	Logger *logrus.Logger
888
}
889

890
func NewMockLogger(t *testing.T) MockLogger {
891
	buf := new(bytes.Buffer)
892
	m := MockLogger{
893
		buf: buf,
894
	}
895
	m.Logger = logrus.New()
896
	m.Logger.SetFormatter(&logrus.JSONFormatter{})
897
	return m
898
}
899

900
type MockAddressResolver struct {
901
	f func(id string) string
902
}
903

904
func (m *MockAddressResolver) NodeAddress(id string) string {
905
	if m.f != nil {
906
		return m.f(id)
907
	}
908
	return "127.0.0.1"
909
}
910

911
type MockIndexer struct {
912
	mock.Mock
913
}
914

915
func (m *MockIndexer) AddClass(req cmd.AddClassRequest) error {
916
	args := m.Called(req)
917
	return args.Error(0)
918
}
919

920
func (m *MockIndexer) RestoreClassDir(class string) error {
921
	args := m.Called(class)
922
	return args.Error(0)
923
}
924

925
func (m *MockIndexer) UpdateClass(req cmd.UpdateClassRequest) error {
926
	args := m.Called(req)
927
	return args.Error(0)
928
}
929

930
func (m *MockIndexer) UpdateIndex(req cmd.UpdateClassRequest) error {
931
	args := m.Called(req)
932
	return args.Error(0)
933
}
934

935
func (m *MockIndexer) ReloadLocalDB(ctx context.Context, all []api.UpdateClassRequest) error {
936
	return nil
937
}
938

939
func (m *MockIndexer) DeleteClass(name string) error {
940
	args := m.Called(name)
941
	return args.Error(0)
942
}
943

944
func (m *MockIndexer) AddProperty(class string, req cmd.AddPropertyRequest) error {
945
	args := m.Called(class, req)
946
	return args.Error(0)
947
}
948

949
func (m *MockIndexer) AddTenants(class string, req *cmd.AddTenantsRequest) error {
950
	args := m.Called(class, req)
951
	return args.Error(0)
952
}
953

954
func (m *MockIndexer) UpdateTenants(class string, req *cmd.UpdateTenantsRequest) error {
955
	args := m.Called(class, req)
956
	return args.Error(0)
957
}
958

959
func (m *MockIndexer) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) error {
960
	args := m.Called(class, req)
961
	return args.Error(0)
962
}
963

964
func (m *MockIndexer) UpdateShardStatus(req *cmd.UpdateShardStatusRequest) error {
965
	args := m.Called(req)
966
	return args.Error(0)
967
}
968

969
func (m *MockIndexer) GetShardsStatus(class, tenant string) (models.ShardStatusList, error) {
970
	args := m.Called(class, tenant)
971
	return models.ShardStatusList{}, args.Error(1)
972
}
973

974
func (m *MockIndexer) Open(ctx context.Context) error {
975
	args := m.Called(ctx)
976
	return args.Error(0)
977
}
978

979
func (m *MockIndexer) Close(ctx context.Context) error {
980
	args := m.Called(ctx)
981
	return args.Error(0)
982
}
983

984
func (m *MockIndexer) TriggerSchemaUpdateCallbacks() {
985
	m.Called()
986
}
987

988
type MockParser struct {
989
	mock.Mock
990
}
991

992
func (m *MockParser) ParseClass(class *models.Class) error {
993
	args := m.Called(class)
994
	return args.Error(0)
995
}
996

997
func (m *MockParser) ParseClassUpdate(class, update *models.Class) (*models.Class, error) {
998
	args := m.Called(class)
999
	return update, args.Error(1)
1000
}
1001

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.