weaviate

Форк
0
/
service.go 
527 строк · 16.3 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package store
13

14
import (
15
	"context"
16
	"encoding/json"
17
	"fmt"
18
	"slices"
19
	"strings"
20
	"time"
21

22
	"github.com/sirupsen/logrus"
23
	cmd "github.com/weaviate/weaviate/cluster/proto/api"
24
	"github.com/weaviate/weaviate/entities/models"
25
	"github.com/weaviate/weaviate/entities/versioned"
26
	"github.com/weaviate/weaviate/usecases/sharding"
27
	"google.golang.org/protobuf/proto"
28
)
29

30
// Service abstracts away the Raft store, providing clients with an interface that encompasses all write operations.
31
// It ensures that these operations are executed on the current leader, regardless of the specific leader in the cluster.
32
type Service struct {
33
	store *Store
34
	cl    client
35
	log   *logrus.Logger
36
}
37

38
// client to communicate with remote services
39
type client interface {
40
	Apply(leaderAddr string, req *cmd.ApplyRequest) (*cmd.ApplyResponse, error)
41
	Query(ctx context.Context, leaderAddr string, req *cmd.QueryRequest) (*cmd.QueryResponse, error)
42
	Remove(ctx context.Context, leaderAddress string, req *cmd.RemovePeerRequest) (*cmd.RemovePeerResponse, error)
43
	Join(ctx context.Context, leaderAddr string, req *cmd.JoinPeerRequest) (*cmd.JoinPeerResponse, error)
44
}
45

46
func NewService(store *Store, client client) *Service {
47
	return &Service{store: store, cl: client, log: store.log}
48
}
49

50
// Open opens this store service and marked as such.
51
// It constructs a new Raft node using the provided configuration.
52
// If there is any old state, such as snapshots, logs, peers, etc., all of those will be restored
53
func (s *Service) Open(ctx context.Context, db Indexer) error {
54
	s.log.Info("starting raft sub-system ...")
55
	s.store.SetDB(db)
56
	return s.store.Open(ctx)
57
}
58

59
func (s *Service) Close(ctx context.Context) (err error) {
60
	s.log.Info("shutting down raft sub-system ...")
61

62
	// non-voter can be safely removed, as they don't partake in RAFT elections
63
	if !s.store.IsVoter() {
64
		s.log.Info("removing this node from cluster prior to shutdown ...")
65
		if err := s.Remove(ctx, s.store.ID()); err != nil {
66
			s.log.WithError(err).Error("remove this node from cluster")
67
		} else {
68
			s.log.Info("successfully removed this node from the cluster.")
69
		}
70
	}
71
	return s.store.Close(ctx)
72
}
73

74
func (s *Service) Ready() bool {
75
	return s.store.Ready()
76
}
77

78
func (s *Service) SchemaReader() retrySchema {
79
	return s.store.SchemaReader()
80
}
81

82
func (s *Service) AddClass(cls *models.Class, ss *sharding.State) (uint64, error) {
83
	if cls == nil || cls.Class == "" {
84
		return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
85
	}
86

87
	req := cmd.AddClassRequest{Class: cls, State: ss}
88
	subCommand, err := json.Marshal(&req)
89
	if err != nil {
90
		return 0, fmt.Errorf("marshal request: %w", err)
91
	}
92
	command := &cmd.ApplyRequest{
93
		Type:       cmd.ApplyRequest_TYPE_ADD_CLASS,
94
		Class:      cls.Class,
95
		SubCommand: subCommand,
96
	}
97
	return s.Execute(command)
98
}
99

100
func (s *Service) UpdateClass(cls *models.Class, ss *sharding.State) (uint64, error) {
101
	if cls == nil || cls.Class == "" {
102
		return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
103
	}
104
	req := cmd.UpdateClassRequest{Class: cls, State: ss}
105
	subCommand, err := json.Marshal(&req)
106
	if err != nil {
107
		return 0, fmt.Errorf("marshal request: %w", err)
108
	}
109
	command := &cmd.ApplyRequest{
110
		Type:       cmd.ApplyRequest_TYPE_UPDATE_CLASS,
111
		Class:      cls.Class,
112
		SubCommand: subCommand,
113
	}
114
	return s.Execute(command)
115
}
116

117
func (s *Service) DeleteClass(name string) (uint64, error) {
118
	command := &cmd.ApplyRequest{
119
		Type:  cmd.ApplyRequest_TYPE_DELETE_CLASS,
120
		Class: name,
121
	}
122
	return s.Execute(command)
123
}
124

125
func (s *Service) RestoreClass(cls *models.Class, ss *sharding.State) (uint64, error) {
126
	if cls == nil || cls.Class == "" {
127
		return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
128
	}
129
	req := cmd.AddClassRequest{Class: cls, State: ss}
130
	subCommand, err := json.Marshal(&req)
131
	if err != nil {
132
		return 0, fmt.Errorf("marshal request: %w", err)
133
	}
134
	command := &cmd.ApplyRequest{
135
		Type:       cmd.ApplyRequest_TYPE_RESTORE_CLASS,
136
		Class:      cls.Class,
137
		SubCommand: subCommand,
138
	}
139
	return s.Execute(command)
140
}
141

142
func (s *Service) AddProperty(class string, props ...*models.Property) (uint64, error) {
143
	for _, p := range props {
144
		if p == nil || p.Name == "" || class == "" {
145
			return 0, fmt.Errorf("empty property or empty class name : %w", errBadRequest)
146
		}
147
	}
148
	req := cmd.AddPropertyRequest{Properties: props}
149
	subCommand, err := json.Marshal(&req)
150
	if err != nil {
151
		return 0, fmt.Errorf("marshal request: %w", err)
152
	}
153
	command := &cmd.ApplyRequest{
154
		Type:       cmd.ApplyRequest_TYPE_ADD_PROPERTY,
155
		Class:      class,
156
		SubCommand: subCommand,
157
	}
158
	return s.Execute(command)
159
}
160

161
func (s *Service) UpdateShardStatus(class, shard, status string) (uint64, error) {
162
	if class == "" || shard == "" {
163
		return 0, fmt.Errorf("empty class or shard : %w", errBadRequest)
164
	}
165
	req := cmd.UpdateShardStatusRequest{Class: class, Shard: shard, Status: status}
166
	subCommand, err := json.Marshal(&req)
167
	if err != nil {
168
		return 0, fmt.Errorf("marshal request: %w", err)
169
	}
170
	command := &cmd.ApplyRequest{
171
		Type:       cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
172
		Class:      req.Class,
173
		SubCommand: subCommand,
174
	}
175
	return s.Execute(command)
176
}
177

178
func (s *Service) AddTenants(class string, req *cmd.AddTenantsRequest) (uint64, error) {
179
	if class == "" || req == nil {
180
		return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
181
	}
182
	subCommand, err := proto.Marshal(req)
183
	if err != nil {
184
		return 0, fmt.Errorf("marshal request: %w", err)
185
	}
186
	command := &cmd.ApplyRequest{
187
		Type:       cmd.ApplyRequest_TYPE_ADD_TENANT,
188
		Class:      class,
189
		SubCommand: subCommand,
190
	}
191
	return s.Execute(command)
192
}
193

194
func (s *Service) UpdateTenants(class string, req *cmd.UpdateTenantsRequest) (uint64, error) {
195
	if class == "" || req == nil {
196
		return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
197
	}
198
	subCommand, err := proto.Marshal(req)
199
	if err != nil {
200
		return 0, fmt.Errorf("marshal request: %w", err)
201
	}
202
	command := &cmd.ApplyRequest{
203
		Type:       cmd.ApplyRequest_TYPE_UPDATE_TENANT,
204
		Class:      class,
205
		SubCommand: subCommand,
206
	}
207
	return s.Execute(command)
208
}
209

210
func (s *Service) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) (uint64, error) {
211
	if class == "" || req == nil {
212
		return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
213
	}
214
	subCommand, err := proto.Marshal(req)
215
	if err != nil {
216
		return 0, fmt.Errorf("marshal request: %w", err)
217
	}
218
	command := &cmd.ApplyRequest{
219
		Type:       cmd.ApplyRequest_TYPE_DELETE_TENANT,
220
		Class:      class,
221
		SubCommand: subCommand,
222
	}
223
	return s.Execute(command)
224
}
225

226
func (s *Service) StoreSchemaV1() error {
227
	command := &cmd.ApplyRequest{
228
		Type: cmd.ApplyRequest_TYPE_STORE_SCHEMA_V1,
229
	}
230
	_, err := s.Execute(command)
231
	return err
232
}
233

234
func (s *Service) Execute(req *cmd.ApplyRequest) (uint64, error) {
235
	if s.store.IsLeader() {
236
		return s.store.Execute(req)
237
	}
238
	if cmd.ApplyRequest_Type_name[int32(req.Type.Number())] == "" {
239
		return 0, ErrUnknownCommand
240
	}
241

242
	leader := s.store.Leader()
243
	if leader == "" {
244
		return 0, s.leaderErr()
245
	}
246
	resp, err := s.cl.Apply(leader, req)
247
	if err != nil {
248
		return 0, err
249
	}
250

251
	return resp.Version, err
252
}
253

254
func (s *Service) Join(ctx context.Context, id, addr string, voter bool) error {
255
	s.log.WithFields(logrus.Fields{
256
		"id":      id,
257
		"address": addr,
258
		"voter":   voter,
259
	}).Debug("membership.join")
260
	if s.store.IsLeader() {
261
		return s.store.Join(id, addr, voter)
262
	}
263
	leader := s.store.Leader()
264
	if leader == "" {
265
		return s.leaderErr()
266
	}
267
	req := &cmd.JoinPeerRequest{Id: id, Address: addr, Voter: voter}
268
	_, err := s.cl.Join(ctx, leader, req)
269
	return err
270
}
271

272
func (s *Service) Remove(ctx context.Context, id string) error {
273
	s.log.WithField("id", id).Debug("membership.remove")
274
	if s.store.IsLeader() {
275
		return s.store.Remove(id)
276
	}
277
	leader := s.store.Leader()
278
	if leader == "" {
279
		return s.leaderErr()
280
	}
281
	req := &cmd.RemovePeerRequest{Id: id}
282
	_, err := s.cl.Remove(ctx, leader, req)
283
	return err
284
}
285

286
func (s *Service) Stats() map[string]any {
287
	s.log.Debug("membership.stats")
288
	return s.store.Stats()
289
}
290

291
// LeaderWithID is used to return the current leader address and ID of the cluster.
292
// It may return empty strings if there is no current leader or the leader is unknown.
293
func (s *Service) LeaderWithID() (string, string) {
294
	addr, id := s.store.LeaderWithID()
295
	return string(addr), string(id)
296
}
297

298
func (s *Service) WaitUntilDBRestored(ctx context.Context, period time.Duration, close chan struct{}) error {
299
	return s.store.WaitToRestoreDB(ctx, period, close)
300
}
301

302
// QueryReadOnlyClass will verify that class is non empty and then build a Query that will be directed to the leader to
303
// ensure we will read the class with strong consistency
304
func (s *Service) QueryReadOnlyClasses(classes ...string) (map[string]versioned.Class, error) {
305
	if len(classes) == 0 {
306
		return nil, fmt.Errorf("empty classes names: %w", errBadRequest)
307
	}
308

309
	// remove dedup and empty
310
	slices.Sort(classes)
311
	classes = slices.Compact(classes)
312
	if len(classes) == 0 {
313
		return map[string]versioned.Class{}, fmt.Errorf("empty classes names: %w", errBadRequest)
314
	}
315

316
	if len(classes) > 1 && classes[0] == "" {
317
		classes = classes[1:]
318
	}
319

320
	// Build the query and execute it
321
	req := cmd.QueryReadOnlyClassesRequest{Classes: classes}
322
	subCommand, err := json.Marshal(&req)
323
	if err != nil {
324
		return map[string]versioned.Class{}, fmt.Errorf("marshal request: %w", err)
325
	}
326
	command := &cmd.QueryRequest{
327
		Type:       cmd.QueryRequest_TYPE_GET_CLASSES,
328
		SubCommand: subCommand,
329
	}
330
	queryResp, err := s.Query(context.Background(), command)
331
	if err != nil {
332
		return map[string]versioned.Class{}, fmt.Errorf("failed to execute query: %w", err)
333
	}
334

335
	// Empty payload doesn't unmarshal to an empty struct and will instead result in an error.
336
	// We have an empty payload when the requested class if not present in the schema.
337
	// In that case return a nil pointer and no error.
338
	if len(queryResp.Payload) == 0 {
339
		return nil, nil
340
	}
341

342
	// Unmarshal the response
343
	resp := cmd.QueryReadOnlyClassResponse{}
344
	err = json.Unmarshal(queryResp.Payload, &resp)
345
	if err != nil {
346
		return map[string]versioned.Class{}, fmt.Errorf("failed to unmarshal query result: %w", err)
347
	}
348
	return resp.Classes, nil
349
}
350

351
// QuerySchema build a Query to read the schema that will be directed to the leader to ensure we will read the class
352
// with strong consistency
353
func (s *Service) QuerySchema() (models.Schema, error) {
354
	command := &cmd.QueryRequest{
355
		Type: cmd.QueryRequest_TYPE_GET_SCHEMA,
356
	}
357
	queryResp, err := s.Query(context.Background(), command)
358
	if err != nil {
359
		return models.Schema{}, fmt.Errorf("failed to execute query: %w", err)
360
	}
361

362
	// Unmarshal the response
363
	resp := cmd.QuerySchemaResponse{}
364
	err = json.Unmarshal(queryResp.Payload, &resp)
365
	if err != nil {
366
		return models.Schema{}, fmt.Errorf("failed to unmarshal query result: %w", err)
367
	}
368
	return resp.Schema, nil
369
}
370

371
// QueryTenants build a Query to read the tenants of a given class that will be directed to the leader to ensure we
372
// will read the class with strong consistency
373
func (s *Service) QueryTenants(class string, tenants []string) ([]*models.Tenant, uint64, error) {
374
	// Build the query and execute it
375
	req := cmd.QueryTenantsRequest{Class: class, Tenants: tenants}
376
	subCommand, err := json.Marshal(&req)
377
	if err != nil {
378
		return []*models.Tenant{}, 0, fmt.Errorf("marshal request: %w", err)
379
	}
380
	command := &cmd.QueryRequest{
381
		Type:       cmd.QueryRequest_TYPE_GET_TENANTS,
382
		SubCommand: subCommand,
383
	}
384
	queryResp, err := s.Query(context.Background(), command)
385
	if err != nil {
386
		return []*models.Tenant{}, 0, fmt.Errorf("failed to execute query: %w", err)
387
	}
388

389
	// Unmarshal the response
390
	resp := cmd.QueryTenantsResponse{}
391
	err = json.Unmarshal(queryResp.Payload, &resp)
392
	if err != nil {
393
		return []*models.Tenant{}, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
394
	}
395

396
	return resp.Tenants, resp.ShardVersion, nil
397
}
398

399
// QueryShardOwner build a Query to read the tenants of a given class that will be directed to the leader to ensure we
400
// will read the tenant with strong consistency and return the shard owner node
401
func (s *Service) QueryShardOwner(class, shard string) (string, uint64, error) {
402
	// Build the query and execute it
403
	req := cmd.QueryShardOwnerRequest{Class: class, Shard: shard}
404
	subCommand, err := json.Marshal(&req)
405
	if err != nil {
406
		return "", 0, fmt.Errorf("marshal request: %w", err)
407
	}
408
	command := &cmd.QueryRequest{
409
		Type:       cmd.QueryRequest_TYPE_GET_SHARD_OWNER,
410
		SubCommand: subCommand,
411
	}
412
	queryResp, err := s.Query(context.Background(), command)
413
	if err != nil {
414
		return "", 0, fmt.Errorf("failed to execute query: %w", err)
415
	}
416

417
	// Unmarshal the response
418
	resp := cmd.QueryShardOwnerResponse{}
419
	err = json.Unmarshal(queryResp.Payload, &resp)
420
	if err != nil {
421
		return "", 0, fmt.Errorf("failed to unmarshal query result: %w", err)
422
	}
423

424
	return resp.Owner, resp.ShardVersion, nil
425
}
426

427
// QueryTenantsShards build a Query to read the tenants and their activity status of a given class.
428
// The request will be directed to the leader to ensure we  will read the tenant with strong consistency and return the
429
// shard owner node
430
func (s *Service) QueryTenantsShards(class string, tenants ...string) (map[string]string, uint64, error) {
431
	// Build the query and execute it
432
	req := cmd.QueryTenantsShardsRequest{Class: class, Tenants: tenants}
433
	subCommand, err := json.Marshal(&req)
434
	if err != nil {
435
		return nil, 0, fmt.Errorf("marshal request: %w", err)
436
	}
437
	command := &cmd.QueryRequest{
438
		Type:       cmd.QueryRequest_TYPE_GET_TENANTS_SHARDS,
439
		SubCommand: subCommand,
440
	}
441
	queryResp, err := s.Query(context.Background(), command)
442
	if err != nil {
443
		return nil, 0, fmt.Errorf("failed to execute query: %w", err)
444
	}
445

446
	// Unmarshal the response
447
	resp := cmd.QueryTenantsShardsResponse{}
448
	err = json.Unmarshal(queryResp.Payload, &resp)
449
	if err != nil {
450
		return nil, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
451
	}
452

453
	return resp.TenantsActivityStatus, resp.SchemaVersion, nil
454
}
455

456
// QueryShardingState build a Query to read the sharding state of a given class.
457
// The request will be directed to the leader to ensure we  will read the shard state with strong consistency and return the
458
// state and it's version.
459
func (s *Service) QueryShardingState(class string) (*sharding.State, uint64, error) {
460
	// Build the query and execute it
461
	req := cmd.QueryShardingStateRequest{Class: class}
462
	subCommand, err := json.Marshal(&req)
463
	if err != nil {
464
		return nil, 0, fmt.Errorf("marshal request: %w", err)
465
	}
466
	command := &cmd.QueryRequest{
467
		Type:       cmd.QueryRequest_TYPE_GET_SHARDING_STATE,
468
		SubCommand: subCommand,
469
	}
470
	queryResp, err := s.Query(context.Background(), command)
471
	if err != nil {
472
		return nil, 0, fmt.Errorf("failed to execute query: %w", err)
473
	}
474

475
	// Unmarshal the response
476
	resp := cmd.QueryShardingStateResponse{}
477
	err = json.Unmarshal(queryResp.Payload, &resp)
478
	if err != nil {
479
		return nil, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
480
	}
481

482
	return resp.State, resp.Version, nil
483
}
484

485
// Query receives a QueryRequest and ensure it is executed on the leader and returns the related QueryResponse
486
// If any error happens it returns it
487
func (s *Service) Query(ctx context.Context, req *cmd.QueryRequest) (*cmd.QueryResponse, error) {
488
	if s.store.IsLeader() {
489
		return s.store.Query(req)
490
	}
491

492
	leader := s.store.Leader()
493
	if leader == "" {
494
		return &cmd.QueryResponse{}, s.leaderErr()
495
	}
496

497
	return s.cl.Query(ctx, leader, req)
498
}
499

500
func removeNilTenants(tenants []*cmd.Tenant) []*cmd.Tenant {
501
	n := 0
502
	for i := range tenants {
503
		if tenants[i] != nil && tenants[i].Name != "" {
504
			tenants[n] = tenants[i]
505
			n++
506
		}
507
	}
508
	return tenants[:n]
509
}
510

511
// leaderErr decorates ErrLeaderNotFound by distinguishing between
512
// normal election happening and there is no leader been chosen yet
513
// and if it can't reach the other nodes either for intercluster
514
// communication issues or other nodes were down.
515
func (s *Service) leaderErr() error {
516
	if s.store.addResolver != nil && len(s.store.addResolver.notResolvedNodes) > 0 {
517
		var nodes []string
518
		for n := range s.store.addResolver.notResolvedNodes {
519
			nodes = append(nodes, string(n))
520
		}
521

522
		return fmt.Errorf("%w, can not resolve nodes [%s]",
523
			ErrLeaderNotFound,
524
			strings.Join(nodes, ","))
525
	}
526
	return ErrLeaderNotFound
527
}
528

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.