weaviate

Форк
0
/
schema_test.go 
337 строк · 9.3 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package store
13

14
import (
15
	"bytes"
16
	"context"
17
	"io"
18
	"testing"
19

20
	"github.com/stretchr/testify/assert"
21
	"github.com/stretchr/testify/mock"
22
	"github.com/weaviate/weaviate/entities/models"
23
	"github.com/weaviate/weaviate/usecases/sharding"
24
)
25

26
func TestVersionedSchemaReaderShardReplicas(t *testing.T) {
27
	var (
28
		ctx = context.Background()
29
		sc  = &schema{
30
			Classes: make(map[string]*metaClass),
31
		}
32
		vsc = versionedSchema{
33
			schema:        sc,
34
			WaitForUpdate: func(ctx context.Context, version uint64) error { return nil },
35
		}
36
	)
37
	// class not found
38
	_, _, err := sc.ShardReplicas("C", "S")
39
	assert.ErrorIs(t, err, errClassNotFound)
40

41
	// shard not found
42
	ss := &sharding.State{Physical: make(map[string]sharding.Physical)}
43

44
	sc.addClass(&models.Class{Class: "C"}, ss, 1)
45

46
	_, err = vsc.ShardReplicas(ctx, "C", "S", 1)
47
	assert.ErrorIs(t, err, errShardNotFound)
48

49
	// two replicas found
50
	nodes := []string{"A", "B"}
51
	ss.Physical["S"] = sharding.Physical{BelongsToNodes: nodes}
52
	res, err := vsc.ShardReplicas(ctx, "C", "S", 1)
53
	assert.Nil(t, err)
54
	assert.Equal(t, nodes, res)
55
}
56

57
func TestVersionedSchemaReaderClass(t *testing.T) {
58
	var (
59
		ctx    = context.Background()
60
		retErr error
61
		f      = func(ctx context.Context, version uint64) error { return retErr }
62
		nodes  = []string{"N1", "N2"}
63
		s      = &schema{
64
			Classes:     make(map[string]*metaClass),
65
			shardReader: &MockShardReader{},
66
		}
67

68
		sc = versionedSchema{s, f}
69
	)
70

71
	// class not found
72
	cls, err := sc.ReadOnlyClass(ctx, "C", 1)
73
	assert.Nil(t, cls)
74
	assert.Nil(t, err)
75
	ss, err := sc.CopyShardingState(ctx, "C", 1)
76
	assert.Nil(t, ss)
77
	assert.Nil(t, err)
78
	mt, err := sc.MultiTenancy(ctx, "C", 1)
79
	assert.Equal(t, mt, models.MultiTenancyConfig{})
80
	assert.Nil(t, err)
81

82
	info, err := sc.ClassInfo(ctx, "C", 1)
83
	assert.Equal(t, ClassInfo{}, info)
84
	assert.Nil(t, err)
85

86
	_, err = sc.ShardReplicas(ctx, "C", "S", 1)
87
	assert.ErrorIs(t, err, errClassNotFound)
88
	_, err = sc.ShardOwner(ctx, "C", "S", 1)
89
	assert.ErrorIs(t, err, errClassNotFound)
90
	err = sc.Read(ctx, "C", 1, func(c *models.Class, s *sharding.State) error { return nil })
91
	assert.ErrorIs(t, err, errClassNotFound)
92

93
	// Add Simple class
94
	cls1 := &models.Class{Class: "C"}
95
	ss1 := &sharding.State{Physical: map[string]sharding.Physical{
96
		"S1": {Status: "A"},
97
		"S2": {Status: "A", BelongsToNodes: nodes},
98
	}}
99

100
	assert.Nil(t, sc.schema.addClass(cls1, ss1, 1))
101
	info, err = sc.ClassInfo(ctx, "C", 1)
102
	assert.Equal(t, ClassInfo{
103
		ReplicationFactor: 1,
104
		ClassVersion:      1,
105
		ShardVersion:      1, Exists: true, Tenants: len(ss1.Physical),
106
	}, info)
107
	assert.Nil(t, err)
108

109
	cls, err = sc.ReadOnlyClass(ctx, "C", 1)
110
	assert.Equal(t, cls, cls1)
111
	assert.Nil(t, err)
112
	mt, err = sc.MultiTenancy(ctx, "D", 1)
113
	assert.Equal(t, models.MultiTenancyConfig{}, mt)
114
	assert.Nil(t, err)
115

116
	// Shards
117
	_, err = sc.ShardOwner(ctx, "C", "S1", 1)
118
	assert.ErrorContains(t, err, "node not found")
119
	_, err = sc.ShardOwner(ctx, "C", "Sx", 1)
120
	assert.ErrorIs(t, err, errShardNotFound)
121
	shards, _, err := sc.TenantsShards(ctx, 1, "C", "S2")
122
	assert.Empty(t, shards)
123
	assert.Nil(t, err)
124
	shard, err := sc.ShardFromUUID(ctx, "Cx", nil, 1)
125
	assert.Empty(t, shard)
126
	assert.Nil(t, err)
127

128
	// Add MT Class
129
	cls2 := &models.Class{Class: "D", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
130
	ss2 := &sharding.State{
131
		PartitioningEnabled: true,
132
		Physical:            map[string]sharding.Physical{"S1": {Status: "A", BelongsToNodes: nodes}},
133
	}
134
	sc.schema.addClass(cls2, ss2, 1)
135
	cls, err = sc.ReadOnlyClass(ctx, "D", 1)
136
	assert.Equal(t, cls, cls2, 1)
137
	assert.Nil(t, err)
138

139
	mt, err = sc.MultiTenancy(ctx, "D", 1)
140
	assert.Equal(t, models.MultiTenancyConfig{Enabled: true}, mt)
141
	assert.Nil(t, err)
142

143
	// ShardOwner
144
	owner, err := sc.ShardOwner(ctx, "D", "S1", 1)
145
	assert.Nil(t, err)
146
	assert.Equal(t, owner, "N1")
147

148
	// TenantShard
149
	shards, _, err = sc.TenantsShards(ctx, 1, "D", "S1")
150
	assert.Equal(t, shards, map[string]string{"S1": "A"})
151
	assert.Equal(t, shards["S1"], "A")
152
	assert.Nil(t, err)
153

154
	shards, _, err = sc.TenantsShards(ctx, 1, "D", "Sx")
155
	assert.Empty(t, shards)
156
	assert.Nil(t, err)
157

158
	reader := func(c *models.Class, s *sharding.State) error { return nil }
159
	assert.Nil(t, sc.Read(ctx, "C", 1, reader))
160
	retErr = ErrDeadlineExceeded
161
	assert.ErrorIs(t, sc.Read(ctx, "C", 1, reader), ErrDeadlineExceeded)
162
	retErr = nil
163
}
164

165
func TestSchemaReaderShardReplicas(t *testing.T) {
166
	sc := &schema{
167
		Classes: make(map[string]*metaClass),
168
	}
169
	rsc := retrySchema{sc, versionedSchema{}}
170
	// class not found
171
	_, _, err := sc.ShardReplicas("C", "S")
172
	assert.ErrorIs(t, err, errClassNotFound)
173

174
	// shard not found
175
	ss := &sharding.State{Physical: make(map[string]sharding.Physical)}
176

177
	sc.addClass(&models.Class{Class: "C"}, ss, 1)
178

179
	_, err = rsc.ShardReplicas("C", "S")
180
	assert.ErrorIs(t, err, errShardNotFound)
181

182
	// two replicas found
183
	nodes := []string{"A", "B"}
184
	ss.Physical["S"] = sharding.Physical{BelongsToNodes: nodes}
185
	res, err := rsc.ShardReplicas("C", "S")
186
	assert.Nil(t, err)
187
	assert.Equal(t, nodes, res)
188
}
189

190
func TestSchemaReaderClass(t *testing.T) {
191
	var (
192
		nodes = []string{"N1", "N2"}
193
		s     = &schema{
194
			Classes:     make(map[string]*metaClass),
195
			shardReader: &MockShardReader{},
196
		}
197
		sc = retrySchema{s, versionedSchema{}}
198
	)
199

200
	// class not found
201
	assert.Nil(t, sc.ReadOnlyClass("C"))
202
	assert.Nil(t, sc.CopyShardingState("C"))
203
	assert.Equal(t, sc.ReadOnlySchema(), models.Schema{Classes: make([]*models.Class, 0)})
204
	assert.Equal(t, sc.MultiTenancy("C"), models.MultiTenancyConfig{})
205

206
	_, err := sc.ShardReplicas("C", "S")
207
	assert.ErrorIs(t, err, errClassNotFound)
208
	_, err = sc.ShardOwner("C", "S")
209
	assert.ErrorIs(t, err, errClassNotFound)
210
	err = sc.Read("C", func(c *models.Class, s *sharding.State) error { return nil })
211
	assert.ErrorIs(t, err, errClassNotFound)
212

213
	// Add Simple class
214
	cls1 := &models.Class{Class: "C"}
215
	ss1 := &sharding.State{Physical: map[string]sharding.Physical{
216
		"S1": {Status: "A"},
217
		"S2": {Status: "A", BelongsToNodes: nodes},
218
	}}
219

220
	sc.schema.addClass(cls1, ss1, 1)
221
	assert.Equal(t, sc.ReadOnlyClass("C"), cls1)
222
	assert.Equal(t, sc.MultiTenancy("D"), models.MultiTenancyConfig{})
223
	assert.Nil(t, sc.Read("C", func(c *models.Class, s *sharding.State) error { return nil }))
224

225
	// Shards
226
	_, err = sc.ShardOwner("C", "S1")
227
	assert.ErrorContains(t, err, "node not found")
228
	_, err = sc.ShardOwner("C", "Sx")
229
	assert.ErrorIs(t, err, errShardNotFound)
230
	shard, _ := sc.TenantsShards("C", "S2")
231
	assert.Empty(t, shard)
232
	assert.Empty(t, sc.ShardFromUUID("Cx", nil))
233

234
	_, err = sc.GetShardsStatus("C", "")
235
	assert.Nil(t, err)
236

237
	// Add MT Class
238
	cls2 := &models.Class{Class: "D", MultiTenancyConfig: &models.MultiTenancyConfig{Enabled: true}}
239
	ss2 := &sharding.State{
240
		PartitioningEnabled: true,
241
		Physical:            map[string]sharding.Physical{"S1": {Status: "A", BelongsToNodes: nodes}},
242
	}
243
	sc.schema.addClass(cls2, ss2, 1)
244
	assert.Equal(t, sc.ReadOnlyClass("D"), cls2)
245
	assert.Equal(t, sc.MultiTenancy("D"), models.MultiTenancyConfig{Enabled: true})
246

247
	assert.ElementsMatch(t, sc.ReadOnlySchema().Classes, []*models.Class{cls1, cls2})
248

249
	// ShardOwner
250
	owner, err := sc.ShardOwner("D", "S1")
251
	assert.Nil(t, err)
252
	assert.Equal(t, owner, "N1")
253

254
	// TenantShard
255
	shards, _ := sc.TenantsShards("D", "S1")
256
	assert.Equal(t, shards["S1"], "A")
257
	shards, _ = sc.TenantsShards("D", "Sx")
258
	assert.Empty(t, shards)
259
}
260

261
func TestSchemaSnapshot(t *testing.T) {
262
	var (
263
		node   = "N1"
264
		sc     = NewSchema(node, &MockIndexer{})
265
		parser = &MockParser{}
266

267
		cls = &models.Class{Class: "C"}
268
		ss  = &sharding.State{
269
			Physical: map[string]sharding.Physical{
270
				"S1": {Status: "A"},
271
				"S2": {Status: "A", BelongsToNodes: []string{"A", "B"}},
272
			},
273
		}
274
	)
275
	ss.SetLocalName(node)
276
	assert.Nil(t, sc.addClass(cls, ss, 1))
277
	parser.On("ParseClass", mock.Anything).Return(nil)
278

279
	// Create Snapshot
280
	sink := &MockSnapshotSink{}
281
	assert.Nil(t, sc.Persist(sink))
282

283
	// restore snapshot
284
	sc2 := NewSchema("N1", &MockIndexer{})
285
	assert.Nil(t, sc2.Restore(sink, parser))
286
	assert.Equal(t, sc.Classes, sc2.Classes)
287

288
	// Encoding error
289
	sink2 := &MockSnapshotSink{wErr: errAny, rErr: errAny}
290
	assert.ErrorContains(t, sc.Persist(sink2), "encode")
291

292
	// Decoding Error
293
	assert.ErrorContains(t, sc.Restore(sink2, parser), "decode")
294

295
	// Parsing Error
296
	parser2 := &MockParser{}
297
	parser2.On("ParseClass", mock.Anything).Return(errAny)
298

299
	sink3 := &MockSnapshotSink{}
300
	assert.Nil(t, sc.Persist(sink3))
301
	assert.ErrorContains(t, sc.Restore(sink3, parser2), "pars")
302
}
303

304
type MockShardReader struct {
305
	lst models.ShardStatusList
306
	err error
307
}
308

309
func (m *MockShardReader) GetShardsStatus(class, tenant string) (models.ShardStatusList, error) {
310
	return m.lst, m.err
311
}
312

313
type MockSnapshotSink struct {
314
	buf bytes.Buffer
315
	io.WriteCloser
316
	wErr error
317
	rErr error
318
}
319

320
func (MockSnapshotSink) ID() string    { return "ID" }
321
func (MockSnapshotSink) Cancel() error { return nil }
322

323
func (m *MockSnapshotSink) Write(p []byte) (n int, err error) {
324
	if m.wErr != nil {
325
		return 0, m.wErr
326
	}
327
	return m.buf.Write(p)
328
}
329

330
func (m *MockSnapshotSink) Close() error { return nil }
331

332
func (m *MockSnapshotSink) Read(p []byte) (n int, err error) {
333
	if m.rErr != nil {
334
		return 0, m.rErr
335
	}
336
	return m.buf.Read(p)
337
}
338

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.