22
"github.com/sirupsen/logrus"
23
cmd "github.com/weaviate/weaviate/cluster/proto/api"
24
"github.com/weaviate/weaviate/entities/models"
25
"github.com/weaviate/weaviate/entities/versioned"
26
"github.com/weaviate/weaviate/usecases/sharding"
27
"google.golang.org/protobuf/proto"
39
type client interface {
40
Apply(leaderAddr string, req *cmd.ApplyRequest) (*cmd.ApplyResponse, error)
41
Query(ctx context.Context, leaderAddr string, req *cmd.QueryRequest) (*cmd.QueryResponse, error)
42
Remove(ctx context.Context, leaderAddress string, req *cmd.RemovePeerRequest) (*cmd.RemovePeerResponse, error)
43
Join(ctx context.Context, leaderAddr string, req *cmd.JoinPeerRequest) (*cmd.JoinPeerResponse, error)
46
func NewService(store *Store, client client) *Service {
47
return &Service{store: store, cl: client, log: store.log}
53
func (s *Service) Open(ctx context.Context, db Indexer) error {
54
s.log.Info("starting raft sub-system ...")
56
return s.store.Open(ctx)
59
func (s *Service) Close(ctx context.Context) (err error) {
60
s.log.Info("shutting down raft sub-system ...")
63
if !s.store.IsVoter() {
64
s.log.Info("removing this node from cluster prior to shutdown ...")
65
if err := s.Remove(ctx, s.store.ID()); err != nil {
66
s.log.WithError(err).Error("remove this node from cluster")
68
s.log.Info("successfully removed this node from the cluster.")
71
return s.store.Close(ctx)
74
func (s *Service) Ready() bool {
75
return s.store.Ready()
78
func (s *Service) SchemaReader() retrySchema {
79
return s.store.SchemaReader()
82
func (s *Service) AddClass(cls *models.Class, ss *sharding.State) (uint64, error) {
83
if cls == nil || cls.Class == "" {
84
return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
87
req := cmd.AddClassRequest{Class: cls, State: ss}
88
subCommand, err := json.Marshal(&req)
90
return 0, fmt.Errorf("marshal request: %w", err)
92
command := &cmd.ApplyRequest{
93
Type: cmd.ApplyRequest_TYPE_ADD_CLASS,
95
SubCommand: subCommand,
97
return s.Execute(command)
100
func (s *Service) UpdateClass(cls *models.Class, ss *sharding.State) (uint64, error) {
101
if cls == nil || cls.Class == "" {
102
return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
104
req := cmd.UpdateClassRequest{Class: cls, State: ss}
105
subCommand, err := json.Marshal(&req)
107
return 0, fmt.Errorf("marshal request: %w", err)
109
command := &cmd.ApplyRequest{
110
Type: cmd.ApplyRequest_TYPE_UPDATE_CLASS,
112
SubCommand: subCommand,
114
return s.Execute(command)
117
func (s *Service) DeleteClass(name string) (uint64, error) {
118
command := &cmd.ApplyRequest{
119
Type: cmd.ApplyRequest_TYPE_DELETE_CLASS,
122
return s.Execute(command)
125
func (s *Service) RestoreClass(cls *models.Class, ss *sharding.State) (uint64, error) {
126
if cls == nil || cls.Class == "" {
127
return 0, fmt.Errorf("nil class or empty class name : %w", errBadRequest)
129
req := cmd.AddClassRequest{Class: cls, State: ss}
130
subCommand, err := json.Marshal(&req)
132
return 0, fmt.Errorf("marshal request: %w", err)
134
command := &cmd.ApplyRequest{
135
Type: cmd.ApplyRequest_TYPE_RESTORE_CLASS,
137
SubCommand: subCommand,
139
return s.Execute(command)
142
func (s *Service) AddProperty(class string, props ...*models.Property) (uint64, error) {
143
for _, p := range props {
144
if p == nil || p.Name == "" || class == "" {
145
return 0, fmt.Errorf("empty property or empty class name : %w", errBadRequest)
148
req := cmd.AddPropertyRequest{Properties: props}
149
subCommand, err := json.Marshal(&req)
151
return 0, fmt.Errorf("marshal request: %w", err)
153
command := &cmd.ApplyRequest{
154
Type: cmd.ApplyRequest_TYPE_ADD_PROPERTY,
156
SubCommand: subCommand,
158
return s.Execute(command)
161
func (s *Service) UpdateShardStatus(class, shard, status string) (uint64, error) {
162
if class == "" || shard == "" {
163
return 0, fmt.Errorf("empty class or shard : %w", errBadRequest)
165
req := cmd.UpdateShardStatusRequest{Class: class, Shard: shard, Status: status}
166
subCommand, err := json.Marshal(&req)
168
return 0, fmt.Errorf("marshal request: %w", err)
170
command := &cmd.ApplyRequest{
171
Type: cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
173
SubCommand: subCommand,
175
return s.Execute(command)
178
func (s *Service) AddTenants(class string, req *cmd.AddTenantsRequest) (uint64, error) {
179
if class == "" || req == nil {
180
return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
182
subCommand, err := proto.Marshal(req)
184
return 0, fmt.Errorf("marshal request: %w", err)
186
command := &cmd.ApplyRequest{
187
Type: cmd.ApplyRequest_TYPE_ADD_TENANT,
189
SubCommand: subCommand,
191
return s.Execute(command)
194
func (s *Service) UpdateTenants(class string, req *cmd.UpdateTenantsRequest) (uint64, error) {
195
if class == "" || req == nil {
196
return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
198
subCommand, err := proto.Marshal(req)
200
return 0, fmt.Errorf("marshal request: %w", err)
202
command := &cmd.ApplyRequest{
203
Type: cmd.ApplyRequest_TYPE_UPDATE_TENANT,
205
SubCommand: subCommand,
207
return s.Execute(command)
210
func (s *Service) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) (uint64, error) {
211
if class == "" || req == nil {
212
return 0, fmt.Errorf("empty class name or nil request : %w", errBadRequest)
214
subCommand, err := proto.Marshal(req)
216
return 0, fmt.Errorf("marshal request: %w", err)
218
command := &cmd.ApplyRequest{
219
Type: cmd.ApplyRequest_TYPE_DELETE_TENANT,
221
SubCommand: subCommand,
223
return s.Execute(command)
226
func (s *Service) StoreSchemaV1() error {
227
command := &cmd.ApplyRequest{
228
Type: cmd.ApplyRequest_TYPE_STORE_SCHEMA_V1,
230
_, err := s.Execute(command)
234
func (s *Service) Execute(req *cmd.ApplyRequest) (uint64, error) {
235
if s.store.IsLeader() {
236
return s.store.Execute(req)
238
if cmd.ApplyRequest_Type_name[int32(req.Type.Number())] == "" {
239
return 0, ErrUnknownCommand
242
leader := s.store.Leader()
244
return 0, s.leaderErr()
246
resp, err := s.cl.Apply(leader, req)
251
return resp.Version, err
254
func (s *Service) Join(ctx context.Context, id, addr string, voter bool) error {
255
s.log.WithFields(logrus.Fields{
259
}).Debug("membership.join")
260
if s.store.IsLeader() {
261
return s.store.Join(id, addr, voter)
263
leader := s.store.Leader()
267
req := &cmd.JoinPeerRequest{Id: id, Address: addr, Voter: voter}
268
_, err := s.cl.Join(ctx, leader, req)
272
func (s *Service) Remove(ctx context.Context, id string) error {
273
s.log.WithField("id", id).Debug("membership.remove")
274
if s.store.IsLeader() {
275
return s.store.Remove(id)
277
leader := s.store.Leader()
281
req := &cmd.RemovePeerRequest{Id: id}
282
_, err := s.cl.Remove(ctx, leader, req)
286
func (s *Service) Stats() map[string]any {
287
s.log.Debug("membership.stats")
288
return s.store.Stats()
293
func (s *Service) LeaderWithID() (string, string) {
294
addr, id := s.store.LeaderWithID()
295
return string(addr), string(id)
298
func (s *Service) WaitUntilDBRestored(ctx context.Context, period time.Duration, close chan struct{}) error {
299
return s.store.WaitToRestoreDB(ctx, period, close)
304
func (s *Service) QueryReadOnlyClasses(classes ...string) (map[string]versioned.Class, error) {
305
if len(classes) == 0 {
306
return nil, fmt.Errorf("empty classes names: %w", errBadRequest)
311
classes = slices.Compact(classes)
312
if len(classes) == 0 {
313
return map[string]versioned.Class{}, fmt.Errorf("empty classes names: %w", errBadRequest)
316
if len(classes) > 1 && classes[0] == "" {
317
classes = classes[1:]
321
req := cmd.QueryReadOnlyClassesRequest{Classes: classes}
322
subCommand, err := json.Marshal(&req)
324
return map[string]versioned.Class{}, fmt.Errorf("marshal request: %w", err)
326
command := &cmd.QueryRequest{
327
Type: cmd.QueryRequest_TYPE_GET_CLASSES,
328
SubCommand: subCommand,
330
queryResp, err := s.Query(context.Background(), command)
332
return map[string]versioned.Class{}, fmt.Errorf("failed to execute query: %w", err)
338
if len(queryResp.Payload) == 0 {
343
resp := cmd.QueryReadOnlyClassResponse{}
344
err = json.Unmarshal(queryResp.Payload, &resp)
346
return map[string]versioned.Class{}, fmt.Errorf("failed to unmarshal query result: %w", err)
348
return resp.Classes, nil
353
func (s *Service) QuerySchema() (models.Schema, error) {
354
command := &cmd.QueryRequest{
355
Type: cmd.QueryRequest_TYPE_GET_SCHEMA,
357
queryResp, err := s.Query(context.Background(), command)
359
return models.Schema{}, fmt.Errorf("failed to execute query: %w", err)
363
resp := cmd.QuerySchemaResponse{}
364
err = json.Unmarshal(queryResp.Payload, &resp)
366
return models.Schema{}, fmt.Errorf("failed to unmarshal query result: %w", err)
368
return resp.Schema, nil
373
func (s *Service) QueryTenants(class string, tenants []string) ([]*models.Tenant, uint64, error) {
375
req := cmd.QueryTenantsRequest{Class: class, Tenants: tenants}
376
subCommand, err := json.Marshal(&req)
378
return []*models.Tenant{}, 0, fmt.Errorf("marshal request: %w", err)
380
command := &cmd.QueryRequest{
381
Type: cmd.QueryRequest_TYPE_GET_TENANTS,
382
SubCommand: subCommand,
384
queryResp, err := s.Query(context.Background(), command)
386
return []*models.Tenant{}, 0, fmt.Errorf("failed to execute query: %w", err)
390
resp := cmd.QueryTenantsResponse{}
391
err = json.Unmarshal(queryResp.Payload, &resp)
393
return []*models.Tenant{}, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
396
return resp.Tenants, resp.ShardVersion, nil
401
func (s *Service) QueryShardOwner(class, shard string) (string, uint64, error) {
403
req := cmd.QueryShardOwnerRequest{Class: class, Shard: shard}
404
subCommand, err := json.Marshal(&req)
406
return "", 0, fmt.Errorf("marshal request: %w", err)
408
command := &cmd.QueryRequest{
409
Type: cmd.QueryRequest_TYPE_GET_SHARD_OWNER,
410
SubCommand: subCommand,
412
queryResp, err := s.Query(context.Background(), command)
414
return "", 0, fmt.Errorf("failed to execute query: %w", err)
418
resp := cmd.QueryShardOwnerResponse{}
419
err = json.Unmarshal(queryResp.Payload, &resp)
421
return "", 0, fmt.Errorf("failed to unmarshal query result: %w", err)
424
return resp.Owner, resp.ShardVersion, nil
430
func (s *Service) QueryTenantsShards(class string, tenants ...string) (map[string]string, uint64, error) {
432
req := cmd.QueryTenantsShardsRequest{Class: class, Tenants: tenants}
433
subCommand, err := json.Marshal(&req)
435
return nil, 0, fmt.Errorf("marshal request: %w", err)
437
command := &cmd.QueryRequest{
438
Type: cmd.QueryRequest_TYPE_GET_TENANTS_SHARDS,
439
SubCommand: subCommand,
441
queryResp, err := s.Query(context.Background(), command)
443
return nil, 0, fmt.Errorf("failed to execute query: %w", err)
447
resp := cmd.QueryTenantsShardsResponse{}
448
err = json.Unmarshal(queryResp.Payload, &resp)
450
return nil, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
453
return resp.TenantsActivityStatus, resp.SchemaVersion, nil
459
func (s *Service) QueryShardingState(class string) (*sharding.State, uint64, error) {
461
req := cmd.QueryShardingStateRequest{Class: class}
462
subCommand, err := json.Marshal(&req)
464
return nil, 0, fmt.Errorf("marshal request: %w", err)
466
command := &cmd.QueryRequest{
467
Type: cmd.QueryRequest_TYPE_GET_SHARDING_STATE,
468
SubCommand: subCommand,
470
queryResp, err := s.Query(context.Background(), command)
472
return nil, 0, fmt.Errorf("failed to execute query: %w", err)
476
resp := cmd.QueryShardingStateResponse{}
477
err = json.Unmarshal(queryResp.Payload, &resp)
479
return nil, 0, fmt.Errorf("failed to unmarshal query result: %w", err)
482
return resp.State, resp.Version, nil
487
func (s *Service) Query(ctx context.Context, req *cmd.QueryRequest) (*cmd.QueryResponse, error) {
488
if s.store.IsLeader() {
489
return s.store.Query(req)
492
leader := s.store.Leader()
494
return &cmd.QueryResponse{}, s.leaderErr()
497
return s.cl.Query(ctx, leader, req)
500
func removeNilTenants(tenants []*cmd.Tenant) []*cmd.Tenant {
502
for i := range tenants {
503
if tenants[i] != nil && tenants[i].Name != "" {
504
tenants[n] = tenants[i]
515
func (s *Service) leaderErr() error {
516
if s.store.addResolver != nil && len(s.store.addResolver.notResolvedNodes) > 0 {
518
for n := range s.store.addResolver.notResolvedNodes {
519
nodes = append(nodes, string(n))
522
return fmt.Errorf("%w, can not resolve nodes [%s]",
524
strings.Join(nodes, ","))
526
return ErrLeaderNotFound