22
api "github.com/cubefs/cubefs/blobstore/api/scheduler"
23
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
24
"github.com/cubefs/cubefs/blobstore/common/proto"
25
"github.com/cubefs/cubefs/blobstore/common/rpc"
26
"github.com/cubefs/cubefs/blobstore/common/trace"
27
"github.com/cubefs/cubefs/blobstore/scheduler/base"
28
"github.com/cubefs/cubefs/blobstore/scheduler/client"
29
"github.com/cubefs/cubefs/blobstore/util/task"
32
var errIllegalTaskType = rpc.NewError(http.StatusBadRequest, "illegal_type", errcode.ErrIllegalTaskType)
36
ClusterID proto.ClusterID
39
followerHosts []string
42
diskDropMgr IDisKMigrator
43
diskRepairMgr IDisKMigrator
44
manualMigMgr IManualMigrator
45
inspectMgr IVolumeInspector
47
shardRepairMgr ITaskRunner
48
blobDeleteMgr ITaskRunner
49
clusterTopology IClusterTopology
50
volumeUpdater client.IVolumeUpdater
51
kafkaMonitors []*base.KafkaTopicMonitor
53
clusterMgrCli client.ClusterMgrAPI
56
func (svr *Service) mgrByType(typ proto.TaskType) (Migrator, error) {
58
case proto.TaskTypeDiskRepair:
59
return svr.diskRepairMgr, nil
60
case proto.TaskTypeBalance:
61
return svr.balanceMgr, nil
62
case proto.TaskTypeDiskDrop:
63
return svr.diskDropMgr, nil
64
case proto.TaskTypeManualMigrate:
65
return svr.manualMigMgr, nil
67
return nil, errIllegalTaskType
71
func (svr *Service) diskMgrByType(typ proto.TaskType) (IDisKMigrator, error) {
73
case proto.TaskTypeDiskDrop:
74
return svr.diskDropMgr, nil
75
case proto.TaskTypeDiskRepair:
76
return svr.diskRepairMgr, nil
78
return nil, errIllegalTaskType
83
func (svr *Service) HTTPTaskAcquire(c *rpc.Context) {
84
args := new(api.AcquireArgs)
85
if err := c.ParseArgs(args); err != nil {
91
ctx := c.Request.Context()
92
migrators := []Migrator{svr.diskRepairMgr, svr.manualMigMgr, svr.diskDropMgr, svr.balanceMgr}
93
shuffledMigrators := migrators[1:]
94
rand.Shuffle(len(shuffledMigrators), func(i, j int) {
95
shuffledMigrators[i], shuffledMigrators[j] = shuffledMigrators[j], shuffledMigrators[i]
97
for _, acquire := range migrators {
98
if migrateTask, err := acquire.AcquireTask(ctx, args.IDC); err == nil {
99
c.RespondJSON(migrateTask)
103
c.RespondError(errcode.ErrNothingTodo)
107
func (svr *Service) HTTPTaskReclaim(c *rpc.Context) {
108
args := new(api.OperateTaskArgs)
109
if err := c.ParseArgs(args); err != nil {
113
if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
114
c.RespondError(errcode.ErrIllegalArguments)
117
ctx := c.Request.Context()
118
reclaimer, err := svr.mgrByType(args.TaskType)
124
newDst, err := base.AllocVunitSafe(ctx, svr.clusterMgrCli, args.Dest.Vuid, args.Src)
129
c.RespondError(reclaimer.ReclaimTask(ctx, args.IDC, args.TaskID, args.Src, args.Dest, newDst))
133
func (svr *Service) HTTPTaskCancel(c *rpc.Context) {
134
args := new(api.OperateTaskArgs)
135
if err := c.ParseArgs(args); err != nil {
139
if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
140
c.RespondError(errcode.ErrIllegalArguments)
143
ctx := c.Request.Context()
144
canceler, err := svr.mgrByType(args.TaskType)
149
c.RespondError(canceler.CancelTask(ctx, args))
153
func (svr *Service) HTTPTaskComplete(c *rpc.Context) {
154
args := new(api.OperateTaskArgs)
155
if err := c.ParseArgs(args); err != nil {
159
if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
160
c.RespondError(errcode.ErrIllegalArguments)
163
ctx := c.Request.Context()
164
completer, err := svr.mgrByType(args.TaskType)
169
c.RespondError(completer.CompleteTask(ctx, args))
173
func (svr *Service) HTTPInspectAcquire(c *rpc.Context) {
174
ctx := c.Request.Context()
176
task, _ := svr.inspectMgr.AcquireInspect(ctx)
181
c.RespondError(errcode.ErrNothingTodo)
185
func (svr *Service) HTTPInspectComplete(c *rpc.Context) {
186
args := new(proto.VolumeInspectRet)
187
if err := c.ParseArgs(args); err != nil {
192
ctx := c.Request.Context()
193
svr.inspectMgr.CompleteInspect(ctx, args)
198
func (svr *Service) HTTPTaskRenewal(c *rpc.Context) {
199
args := new(api.TaskRenewalArgs)
200
if err := c.ParseArgs(args); err != nil {
205
ctx := c.Request.Context()
206
typeErrors := make(map[proto.TaskType]map[string]string)
207
for typ, ids := range args.IDs {
208
renewaler, err := svr.mgrByType(typ)
214
errors := make(map[string]string)
215
for _, id := range ids {
216
if !client.ValidMigrateTask(typ, id) {
217
errors[id] = errcode.ErrIllegalArguments.Error()
220
if err := renewaler.RenewalTask(ctx, args.IDC, id); err != nil {
221
errors[id] = err.Error()
226
typeErrors[typ] = errors
229
c.RespondJSON(api.TaskRenewalRet{Errors: typeErrors})
233
func (svr *Service) HTTPTaskReport(c *rpc.Context) {
234
args := new(api.TaskReportArgs)
235
if err := c.ParseArgs(args); err != nil {
239
if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
240
c.RespondError(errcode.ErrIllegalArguments)
243
reporter, err := svr.mgrByType(args.TaskType)
248
reporter.ReportWorkerTaskStats(args)
253
func (svr *Service) HTTPMigrateTaskDetail(c *rpc.Context) {
254
args := new(api.MigrateTaskDetailArgs)
255
if err := c.ParseArgs(args); err != nil {
259
if args.ID == "" || !client.ValidMigrateTask(args.Type, args.ID) {
260
c.RespondError(errcode.ErrIllegalArguments)
264
querier, err := svr.mgrByType(args.Type)
269
detail, err := querier.QueryTask(c.Request.Context(), args.ID)
271
c.RespondError(rpc.NewError(http.StatusNotFound, "NotFound", err))
274
c.RespondJSON(detail)
278
func (svr *Service) HTTPDiskMigratingStats(c *rpc.Context) {
279
args := new(api.DiskMigratingStatsArgs)
280
if err := c.ParseArgs(args); err != nil {
284
querier, err := svr.diskMgrByType(args.TaskType)
289
stats, err := querier.DiskProgress(c.Request.Context(), args.DiskID)
298
func (svr *Service) HTTPStats(c *rpc.Context) {
299
ctx := c.Request.Context()
300
taskStats := api.TasksStat{}
303
deleteSuccessCounter, deleteFailedCounter := svr.blobDeleteMgr.GetTaskStats()
304
delErrStats, delTotalErrCnt := svr.blobDeleteMgr.GetErrorStats()
305
taskStats.BlobDelete = &api.RunnerStat{
306
Enable: svr.blobDeleteMgr.Enabled(),
307
SuccessPerMin: fmt.Sprint(deleteSuccessCounter),
308
FailedPerMin: fmt.Sprint(deleteFailedCounter),
309
TotalErrCnt: delTotalErrCnt,
310
ErrStats: delErrStats,
314
repairSuccessCounter, repairFailedCounter := svr.shardRepairMgr.GetTaskStats()
315
repairErrStats, repairTotalErrCnt := svr.shardRepairMgr.GetErrorStats()
316
taskStats.ShardRepair = &api.RunnerStat{
317
Enable: svr.shardRepairMgr.Enabled(),
318
SuccessPerMin: fmt.Sprint(repairSuccessCounter),
319
FailedPerMin: fmt.Sprint(repairFailedCounter),
320
TotalErrCnt: repairTotalErrCnt,
321
ErrStats: repairErrStats,
325
c.RespondJSON(taskStats)
330
repairDisks, totalTasksCnt, repairedTasksCnt := svr.diskRepairMgr.Progress(ctx)
331
taskStats.DiskRepair = &api.DiskRepairTasksStat{
332
Enable: svr.diskRepairMgr.Enabled(),
333
RepairingDisks: repairDisks,
334
TotalTasksCnt: totalTasksCnt,
335
RepairedTasksCnt: repairedTasksCnt,
336
MigrateTasksStat: svr.diskRepairMgr.Stats(),
340
dropDisks, totalTasksCnt, droppedTasksCnt := svr.diskDropMgr.Progress(ctx)
341
taskStats.DiskDrop = &api.DiskDropTasksStat{
342
Enable: svr.diskDropMgr.Enabled(),
343
DroppingDisks: dropDisks,
344
TotalTasksCnt: totalTasksCnt,
345
DroppedTasksCnt: droppedTasksCnt,
346
MigrateTasksStat: svr.diskDropMgr.Stats(),
350
taskStats.Balance = &api.BalanceTasksStat{
351
Enable: svr.balanceMgr.Enabled(),
352
MigrateTasksStat: svr.balanceMgr.Stats(),
356
taskStats.ManualMigrate = &api.ManualMigrateTasksStat{
357
MigrateTasksStat: svr.manualMigMgr.Stats(),
361
finished, timeout := svr.inspectMgr.GetTaskStats()
362
taskStats.VolumeInspect = &api.VolumeInspectTasksStat{
363
Enable: svr.inspectMgr.Enabled(),
364
FinishedPerMin: fmt.Sprint(finished),
365
TimeOutPerMin: fmt.Sprint(timeout),
368
c.RespondJSON(taskStats)
372
func (svr *Service) HTTPManualMigrateTaskAdd(c *rpc.Context) {
373
ctx := c.Request.Context()
375
args := new(api.AddManualMigrateArgs)
376
if err := c.ParseArgs(args); err != nil {
382
c.RespondError(errcode.ErrIllegalArguments)
386
err := svr.manualMigMgr.AddManualTask(ctx, args.Vuid, !args.DirectDownload)
387
c.RespondError(rpc.Error2HTTPError(err))
391
func (svr *Service) HTTPUpdateVolume(c *rpc.Context) {
392
args := new(api.UpdateVolumeArgs)
393
if err := c.ParseArgs(args); err != nil {
398
ctx := c.Request.Context()
399
span := trace.SpanFromContextSafe(ctx)
402
_, err := svr.clusterTopology.UpdateVolume(args.Vid)
404
span.Errorf("local volume cache update failed: vid[%d], err[%+v]", args.Vid, err)
414
tasks := make([]func() error, 0, len(svr.followerHosts))
415
for _, host := range svr.followerHosts {
417
tasks = append(tasks, func() error {
418
return svr.volumeUpdater.UpdateFollowerVolumeCache(ctx, host, args.Vid)
422
span.Debug("to update follower volume cache")
423
if err := task.Run(ctx, tasks...); err != nil {
424
span.Errorf("notify follower to update cache err[%+v]", err)