1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
18
api "github.com/cubefs/cubefs/blobstore/api/proxy"
19
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
20
"github.com/cubefs/cubefs/blobstore/common/rpc"
21
"github.com/cubefs/cubefs/blobstore/common/trace"
24
// SendRepairMessage send repair message to kafka
25
// 1. message from access
26
// 2. message from scheduler
27
func (s *Service) SendRepairMessage(c *rpc.Context) {
28
span := trace.SpanFromContextSafe(c.Request.Context())
29
ctx := trace.ContextWithSpan(c.Request.Context(), span)
31
args := new(api.ShardRepairArgs)
32
if err := c.ParseArgs(args); err != nil {
36
span.Infof("accept SendRepairMessage request, args: %v", args)
37
if args.ClusterID != s.ClusterID {
38
span.Errorf("clusterID not match: info[%+v], self clusterID[%d]", args, s.ClusterID)
39
c.RespondError(errcode.ErrClusterIDNotMatch)
43
err := s.shardRepairMgr.SendShardRepairMsg(ctx, args)
45
span.Errorf("send shard repair message failed: %+v", err)
53
// SendDeleteMessage send delete message to kafka
54
// 1. message from access because of put object fail
55
// 2. message from access because of business side delete
56
func (s *Service) SendDeleteMessage(c *rpc.Context) {
57
span := trace.SpanFromContextSafe(c.Request.Context())
58
ctx := trace.ContextWithSpan(c.Request.Context(), span)
60
args := new(api.DeleteArgs)
61
if err := c.ParseArgs(args); err != nil {
66
span.Infof("accept SendDeleteMessage request, args: %v", args)
67
if args.ClusterID != s.ClusterID {
68
span.Errorf("clusterID not match: info[%+v], self clusterID[%d]", args, s.ClusterID)
69
c.RespondError(errcode.ErrClusterIDNotMatch)
73
err := s.blobDeleteMgr.SendDeleteMsg(ctx, args)
75
span.Errorf("send delete message failed: %+v", err)