2
// __ _____ __ ___ ___ __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
// \ V V / __/ (_| |\ V /| | (_| | || __/
5
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
7
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
9
// CONTACT: hello@weaviate.io
20
"github.com/stretchr/testify/assert"
21
"github.com/stretchr/testify/mock"
22
"github.com/weaviate/weaviate/entities/models"
23
"github.com/weaviate/weaviate/usecases/sharding"
26
func TestVersionedSchemaReaderShardReplicas(t *testing.T) {
28
ctx = context.Background()
30
Classes: make(map[string]*metaClass),
32
vsc = versionedSchema{
34
WaitForUpdate: func(ctx context.Context, version uint64) error { return nil },
38
_, _, err := sc.ShardReplicas("C", "S")
39
assert.ErrorIs(t, err, errClassNotFound)
42
ss := &sharding.State{Physical: make(map[string]sharding.Physical)}
44
sc.addClass(&models.Class{Class: "C"}, ss, 1)
46
_, err = vsc.ShardReplicas(ctx, "C", "S", 1)
47
assert.ErrorIs(t, err, errShardNotFound)
50
nodes := []string{"A", "B"}
51
ss.Physical["S"] = sharding.Physical{BelongsToNodes: nodes}
52
res, err := vsc.ShardReplicas(ctx, "C", "S", 1)
54
assert.Equal(t, nodes, res)
57
func TestVersionedSchemaReaderClass(t *testing.T) {
59
ctx = context.Background()
61
f = func(ctx context.Context, version uint64) error { return retErr }
62
nodes = []string{"N1", "N2"}
64
Classes: make(map[string]*metaClass),
65
shardReader: &MockShardReader{},
68
sc = versionedSchema{s, f}
72
cls, err := sc.ReadOnlyClass(ctx, "C", 1)
75
ss, err := sc.CopyShardingState(ctx, "C", 1)
78
mt, err := sc.MultiTenancy(ctx, "C", 1)
79
assert.Equal(t, mt, models.MultiTenancyConfig{})
82
info, err := sc.ClassInfo(ctx, "C", 1)
83
assert.Equal(t, ClassInfo{}, info)
86
_, err = sc.ShardReplicas(ctx, "C", "S", 1)
87
assert.ErrorIs(t, err, errClassNotFound)
88
_, err = sc.ShardOwner(ctx, "C", "S", 1)
89
assert.ErrorIs(t, err, errClassNotFound)
90
err = sc.Read(ctx, "C", 1, func(c *models.Class, s *sharding.State) error { return nil })
91
assert.ErrorIs(t, err, errClassNotFound)
94
cls1 := &models.Class{Class: "C"}
95
ss1 := &sharding.State{Physical: map[string]sharding.Physical{
97
"S2": {Status: "A", BelongsToNodes: nodes},
100
assert.Nil(t, sc.schema.addClass(cls1, ss1, 1))
101
info, err = sc.ClassInfo(ctx, "C", 1)
102
assert.Equal(t, ClassInfo{
103
ReplicationFactor: 1,
105
ShardVersion: 1, Exists: true, Tenants: len(ss1.Physical),
109
cls, err = sc.ReadOnlyClass(ctx, "C", 1)
110
assert.Equal(t, cls, cls1)
112
mt, err = sc.MultiTenancy(ctx, "D", 1)
113
assert.Equal(t, models.MultiTenancyConfig{}, mt)
117
_, err = sc.ShardOwner(ctx, "C", "S1", 1)
118
assert.ErrorContains(t, err, "node not found")
119
_, err = sc.ShardOwner(ctx, "C", "Sx", 1)
120
assert.ErrorIs(t, err, errShardNotFound)
121
shards, _, err := sc.TenantsShards(ctx, 1, "C", "S2")
122
assert.Empty(t, shards)
124
shard, err := sc.ShardFromUUID(ctx, "Cx", nil, 1)
125
assert.Empty(t, shard)
129
cls2 := &models.Class{Class: "D", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
130
ss2 := &sharding.State{
131
PartitioningEnabled: true,
132
Physical: map[string]sharding.Physical{"S1": {Status: "A", BelongsToNodes: nodes}},
134
sc.schema.addClass(cls2, ss2, 1)
135
cls, err = sc.ReadOnlyClass(ctx, "D", 1)
136
assert.Equal(t, cls, cls2, 1)
139
mt, err = sc.MultiTenancy(ctx, "D", 1)
140
assert.Equal(t, models.MultiTenancyConfig{Enabled: true}, mt)
144
owner, err := sc.ShardOwner(ctx, "D", "S1", 1)
146
assert.Equal(t, owner, "N1")
149
shards, _, err = sc.TenantsShards(ctx, 1, "D", "S1")
150
assert.Equal(t, shards, map[string]string{"S1": "A"})
151
assert.Equal(t, shards["S1"], "A")
154
shards, _, err = sc.TenantsShards(ctx, 1, "D", "Sx")
155
assert.Empty(t, shards)
158
reader := func(c *models.Class, s *sharding.State) error { return nil }
159
assert.Nil(t, sc.Read(ctx, "C", 1, reader))
160
retErr = ErrDeadlineExceeded
161
assert.ErrorIs(t, sc.Read(ctx, "C", 1, reader), ErrDeadlineExceeded)
165
func TestSchemaReaderShardReplicas(t *testing.T) {
167
Classes: make(map[string]*metaClass),
169
rsc := retrySchema{sc, versionedSchema{}}
171
_, _, err := sc.ShardReplicas("C", "S")
172
assert.ErrorIs(t, err, errClassNotFound)
175
ss := &sharding.State{Physical: make(map[string]sharding.Physical)}
177
sc.addClass(&models.Class{Class: "C"}, ss, 1)
179
_, err = rsc.ShardReplicas("C", "S")
180
assert.ErrorIs(t, err, errShardNotFound)
182
// two replicas found
183
nodes := []string{"A", "B"}
184
ss.Physical["S"] = sharding.Physical{BelongsToNodes: nodes}
185
res, err := rsc.ShardReplicas("C", "S")
187
assert.Equal(t, nodes, res)
190
func TestSchemaReaderClass(t *testing.T) {
192
nodes = []string{"N1", "N2"}
194
Classes: make(map[string]*metaClass),
195
shardReader: &MockShardReader{},
197
sc = retrySchema{s, versionedSchema{}}
201
assert.Nil(t, sc.ReadOnlyClass("C"))
202
assert.Nil(t, sc.CopyShardingState("C"))
203
assert.Equal(t, sc.ReadOnlySchema(), models.Schema{Classes: make([]*models.Class, 0)})
204
assert.Equal(t, sc.MultiTenancy("C"), models.MultiTenancyConfig{})
206
_, err := sc.ShardReplicas("C", "S")
207
assert.ErrorIs(t, err, errClassNotFound)
208
_, err = sc.ShardOwner("C", "S")
209
assert.ErrorIs(t, err, errClassNotFound)
210
err = sc.Read("C", func(c *models.Class, s *sharding.State) error { return nil })
211
assert.ErrorIs(t, err, errClassNotFound)
214
cls1 := &models.Class{Class: "C"}
215
ss1 := &sharding.State{Physical: map[string]sharding.Physical{
217
"S2": {Status: "A", BelongsToNodes: nodes},
220
sc.schema.addClass(cls1, ss1, 1)
221
assert.Equal(t, sc.ReadOnlyClass("C"), cls1)
222
assert.Equal(t, sc.MultiTenancy("D"), models.MultiTenancyConfig{})
223
assert.Nil(t, sc.Read("C", func(c *models.Class, s *sharding.State) error { return nil }))
226
_, err = sc.ShardOwner("C", "S1")
227
assert.ErrorContains(t, err, "node not found")
228
_, err = sc.ShardOwner("C", "Sx")
229
assert.ErrorIs(t, err, errShardNotFound)
230
shard, _ := sc.TenantsShards("C", "S2")
231
assert.Empty(t, shard)
232
assert.Empty(t, sc.ShardFromUUID("Cx", nil))
234
_, err = sc.GetShardsStatus("C", "")
238
cls2 := &models.Class{Class: "D", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
239
ss2 := &sharding.State{
240
PartitioningEnabled: true,
241
Physical: map[string]sharding.Physical{"S1": {Status: "A", BelongsToNodes: nodes}},
243
sc.schema.addClass(cls2, ss2, 1)
244
assert.Equal(t, sc.ReadOnlyClass("D"), cls2)
245
assert.Equal(t, sc.MultiTenancy("D"), models.MultiTenancyConfig{Enabled: true})
247
assert.ElementsMatch(t, sc.ReadOnlySchema().Classes, []*models.Class{cls1, cls2})
250
owner, err := sc.ShardOwner("D", "S1")
252
assert.Equal(t, owner, "N1")
255
shards, _ := sc.TenantsShards("D", "S1")
256
assert.Equal(t, shards["S1"], "A")
257
shards, _ = sc.TenantsShards("D", "Sx")
258
assert.Empty(t, shards)
261
func TestSchemaSnapshot(t *testing.T) {
264
sc = NewSchema(node, &MockIndexer{})
265
parser = &MockParser{}
267
cls = &models.Class{Class: "C"}
268
ss = &sharding.State{
269
Physical: map[string]sharding.Physical{
271
"S2": {Status: "A", BelongsToNodes: []string{"A", "B"}},
275
ss.SetLocalName(node)
276
assert.Nil(t, sc.addClass(cls, ss, 1))
277
parser.On("ParseClass", mock.Anything).Return(nil)
280
sink := &MockSnapshotSink{}
281
assert.Nil(t, sc.Persist(sink))
284
sc2 := NewSchema("N1", &MockIndexer{})
285
assert.Nil(t, sc2.Restore(sink, parser))
286
assert.Equal(t, sc.Classes, sc2.Classes)
289
sink2 := &MockSnapshotSink{wErr: errAny, rErr: errAny}
290
assert.ErrorContains(t, sc.Persist(sink2), "encode")
293
assert.ErrorContains(t, sc.Restore(sink2, parser), "decode")
296
parser2 := &MockParser{}
297
parser2.On("ParseClass", mock.Anything).Return(errAny)
299
sink3 := &MockSnapshotSink{}
300
assert.Nil(t, sc.Persist(sink3))
301
assert.ErrorContains(t, sc.Restore(sink3, parser2), "pars")
304
type MockShardReader struct {
305
lst models.ShardStatusList
309
func (m *MockShardReader) GetShardsStatus(class, tenant string) (models.ShardStatusList, error) {
313
type MockSnapshotSink struct {
320
func (MockSnapshotSink) ID() string { return "ID" }
321
func (MockSnapshotSink) Cancel() error { return nil }
323
func (m *MockSnapshotSink) Write(p []byte) (n int, err error) {
327
return m.buf.Write(p)
330
func (m *MockSnapshotSink) Close() error { return nil }
332
func (m *MockSnapshotSink) Read(p []byte) (n int, err error) {