1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
api "github.com/cubefs/cubefs/blobstore/api/scheduler"
24
"github.com/cubefs/cubefs/blobstore/common/counter"
25
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
26
"github.com/cubefs/cubefs/blobstore/common/proto"
27
"github.com/cubefs/cubefs/blobstore/common/recordlog"
28
"github.com/cubefs/cubefs/blobstore/common/rpc"
29
"github.com/cubefs/cubefs/blobstore/common/taskswitch"
30
"github.com/cubefs/cubefs/blobstore/common/trace"
31
"github.com/cubefs/cubefs/blobstore/scheduler/base"
32
"github.com/cubefs/cubefs/blobstore/scheduler/client"
33
"github.com/cubefs/cubefs/blobstore/util/closer"
36
// DiskRepairMgr repair task manager
37
type DiskRepairMgr struct {
40
prepareQueue *base.TaskQueue
41
workQueue *base.WorkerTaskQueue
42
finishQueue *base.TaskQueue
43
deletedTasks *diskMigratedTasks
44
repairedDisks *migratedDisks
45
repairingDisks *migratingDisks
47
clusterMgrCli client.ClusterMgrAPI
49
taskSwitch taskswitch.ISwitcher
52
finishTaskCounter counter.Counter
53
taskStatsMgr *base.TaskStatsMgr
56
taskLogger recordlog.Encoder
60
// NewDiskRepairMgr returns repair manager
61
func NewDiskRepairMgr(clusterMgrCli client.ClusterMgrAPI, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, cfg *MigrateConfig) *DiskRepairMgr {
62
mgr := &DiskRepairMgr{
64
prepareQueue: base.NewTaskQueue(time.Duration(cfg.PrepareQueueRetryDelayS) * time.Second),
65
workQueue: base.NewWorkerTaskQueue(time.Duration(cfg.CancelPunishDurationS) * time.Second),
66
finishQueue: base.NewTaskQueue(time.Duration(cfg.FinishQueueRetryDelayS) * time.Second),
67
deletedTasks: newDiskMigratedTasks(),
68
repairedDisks: newMigratedDisks(),
69
repairingDisks: newMigratingDisks(),
71
clusterMgrCli: clusterMgrCli,
72
taskSwitch: taskSwitch,
74
taskLogger: taskLogger,
78
mgr.taskStatsMgr = base.NewTaskStatsMgrAndRun(cfg.ClusterID, proto.TaskTypeDiskRepair, mgr)
82
// Load load repair task from database
83
func (mgr *DiskRepairMgr) Load() error {
84
span, ctx := trace.StartSpanFromContext(context.Background(), "Load")
86
repairingDisks, err := mgr.clusterMgrCli.ListMigratingDisks(ctx, proto.TaskTypeDiskRepair)
90
for _, disk := range repairingDisks {
91
mgr.repairingDisks.add(disk.Disk.DiskID, disk.Disk)
94
tasks, err := mgr.clusterMgrCli.ListAllMigrateTasks(ctx, proto.TaskTypeDiskRepair)
96
span.Errorf("find all tasks failed: err[%+v]", err)
103
var junkTasks []*proto.MigrateTask
104
for _, t := range tasks {
105
if _, ok := mgr.repairingDisks.get(t.SourceDiskID); !ok {
106
junkTasks = append(junkTasks, t)
110
err = base.VolTaskLockerInst().TryLock(ctx, t.Vid())
112
return fmt.Errorf("repair task conflict: task[%+v], err[%+v]",
117
span.Infof("load task success: task_id[%s], state[%d]", t.TaskID, t.State)
119
case proto.MigrateStateInited:
120
mgr.prepareQueue.PushTask(t.TaskID, t)
121
case proto.MigrateStatePrepared:
122
mgr.workQueue.AddPreparedTask(t.SourceIDC, t.TaskID, t)
123
case proto.MigrateStateWorkCompleted:
124
mgr.finishQueue.PushTask(t.TaskID, t)
125
case proto.MigrateStateFinished, proto.MigrateStateFinishedInAdvance:
126
return fmt.Errorf("task should be deleted from db: task[%+v]", t)
128
return fmt.Errorf("unexpect migrate state: task[%+v]", t)
132
return mgr.clearJunkTasksWhenLoading(ctx, junkTasks)
135
func (mgr *DiskRepairMgr) clearJunkTasksWhenLoading(ctx context.Context, tasks []*proto.MigrateTask) error {
136
span := trace.SpanFromContextSafe(ctx)
138
disks := make(map[proto.DiskID]bool)
139
for _, task := range tasks {
140
if _, ok := disks[task.SourceDiskID]; !ok {
141
diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, task.SourceDiskID)
145
disks[task.SourceDiskID] = diskInfo.IsRepaired()
147
if !disks[task.SourceDiskID] {
148
span.Errorf("has junk task but the disk is not repaired: disk_id[%d], task_id[%s]", task.SourceDiskID, task.TaskID)
149
return errcode.ErrUnexpectMigrationTask
151
span.Warnf("loading delete junk task: task_id[%s]", task.TaskID)
152
base.InsistOn(ctx, " loading delete junk task", func() error {
153
return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
159
// Run run repair task includes collect/prepare/finish/check phase
160
func (mgr *DiskRepairMgr) Run() {
161
go mgr.collectTaskLoop()
162
go mgr.prepareTaskLoop()
163
go mgr.finishTaskLoop()
164
go mgr.checkRepairedAndClearLoop()
165
go mgr.checkAndClearJunkTasksLoop()
168
func (mgr *DiskRepairMgr) Enabled() bool {
169
return mgr.taskSwitch.Enabled()
172
func (mgr *DiskRepairMgr) WaitEnable() {
173
mgr.taskSwitch.WaitEnable()
176
func (mgr *DiskRepairMgr) collectTaskLoop() {
177
t := time.NewTicker(time.Duration(mgr.cfg.CollectTaskIntervalS) * time.Second)
185
case <-mgr.Closer.Done():
191
func (mgr *DiskRepairMgr) collectTask() {
192
span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.collectTask")
195
// revise repair tasks to make sure data consistency when services start
197
if err := mgr.reviseRepairDisks(ctx); err != nil {
200
mgr.hasRevised = true
203
if mgr.repairingDisks.size() >= mgr.cfg.DiskConcurrency {
207
brokenDisk, err := mgr.acquireBrokenDisk(ctx)
209
span.Info("acquire broken disk failed: err[%+v]", err)
213
if brokenDisk == nil {
217
err = mgr.genDiskRepairTasks(ctx, brokenDisk, true)
219
span.Errorf("generate disk repair tasks failed: err[%+v]", err)
223
base.InsistOn(ctx, "set disk diskId %d repairing failed", func() error {
224
return mgr.clusterMgrCli.SetDiskRepairing(ctx, brokenDisk.DiskID)
227
mgr.repairingDisks.add(brokenDisk.DiskID, brokenDisk)
230
func (mgr *DiskRepairMgr) reviseRepairDisks(ctx context.Context) error {
231
span := trace.SpanFromContextSafe(ctx)
233
for _, disk := range mgr.repairingDisks.list() {
234
if err := mgr.reviseRepairDisk(ctx, disk.DiskID); err != nil {
235
span.Errorf("revise repair tasks failed: disk_id[%d]", disk.DiskID)
242
func (mgr *DiskRepairMgr) reviseRepairDisk(ctx context.Context, diskID proto.DiskID) error {
243
span := trace.SpanFromContextSafe(ctx)
245
diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, diskID)
247
span.Errorf("get disk info failed: err[%+v]", err)
251
if err = mgr.genDiskRepairTasks(ctx, diskInfo, false); err != nil {
252
span.Errorf("generate disk repair tasks failed: err[%+v]", err)
256
if diskInfo.IsBroken() {
257
execMsg := fmt.Sprintf("set disk diskId %d repairing", diskID)
258
base.InsistOn(ctx, execMsg, func() error {
259
return mgr.clusterMgrCli.SetDiskRepairing(ctx, diskID)
265
func (mgr *DiskRepairMgr) acquireBrokenDisk(ctx context.Context) (*client.DiskInfoSimple, error) {
266
repairingDisks, err := mgr.clusterMgrCli.ListBrokenDisks(ctx)
270
if len(repairingDisks) == 0 {
273
return mgr.getUnRepairingDisk(repairingDisks), nil
276
func (mgr *DiskRepairMgr) getUnRepairingDisk(disks []*client.DiskInfoSimple) *client.DiskInfoSimple {
277
for _, v := range disks {
278
if _, ok := mgr.repairingDisks.get(v.DiskID); !ok {
285
func (mgr *DiskRepairMgr) genDiskRepairTasks(ctx context.Context, disk *client.DiskInfoSimple, newRepairDisk bool) error {
286
span := trace.SpanFromContextSafe(ctx)
287
span.Infof("start generate disk repair tasks: disk_id[%d], disk_idc[%s]", disk.DiskID, disk.Idc)
289
migratingVuids, err := mgr.listMigratingVuid(ctx, disk.DiskID)
291
span.Errorf("list repairing vuids failed: err[%+v]", err)
295
unmigratedvuids, err := mgr.listUnmigratedVuid(ctx, disk.DiskID)
297
span.Errorf("list un repaired vuids failed: err[%+v]", err)
301
remain := base.Subtraction(unmigratedvuids, migratingVuids)
302
span.Infof("should gen tasks remain: len[%d]", len(remain))
304
meta := &client.MigratingDiskMeta{
305
TaskType: proto.TaskTypeDiskRepair,
308
if err := mgr.clusterMgrCli.AddMigratingDisk(ctx, meta); err != nil {
312
for _, vuid := range remain {
313
mgr.initOneTask(ctx, vuid, disk.DiskID, disk.Idc)
318
func (mgr *DiskRepairMgr) listMigratingVuid(ctx context.Context, diskID proto.DiskID) (bads []proto.Vuid, err error) {
319
tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
324
for _, t := range tasks {
325
bads = append(bads, t.SourceVuid)
330
func (mgr *DiskRepairMgr) listUnmigratedVuid(ctx context.Context, diskID proto.DiskID) (bads []proto.Vuid, err error) {
331
vunits, err := mgr.clusterMgrCli.ListDiskVolumeUnits(ctx, diskID)
336
for _, vunit := range vunits {
337
bads = append(bads, vunit.Vuid)
342
func (mgr *DiskRepairMgr) initOneTask(ctx context.Context, badVuid proto.Vuid, brokenDiskID proto.DiskID, brokenDiskIdc string) {
343
span := trace.SpanFromContextSafe(ctx)
345
t := proto.MigrateTask{
346
TaskID: client.GenMigrateTaskID(proto.TaskTypeDiskRepair, brokenDiskID, badVuid.Vid()),
347
TaskType: proto.TaskTypeDiskRepair,
348
State: proto.MigrateStateInited,
349
SourceDiskID: brokenDiskID,
351
SourceIDC: brokenDiskIdc,
352
ForbiddenDirectDownload: true,
354
base.InsistOn(ctx, "repair init one task insert task to tbl", func() error {
355
return mgr.clusterMgrCli.AddMigrateTask(ctx, &t)
358
mgr.prepareQueue.PushTask(t.TaskID, &t)
359
span.Infof("init repair task success %+v", t)
362
func (mgr *DiskRepairMgr) prepareTaskLoop() {
365
todo, doing := mgr.workQueue.StatsTasks()
366
if mgr.repairingDisks.size() == 0 || todo+doing >= mgr.cfg.WorkQueueSize {
367
time.Sleep(time.Second)
371
err := mgr.popTaskAndPrepare()
372
if err == base.ErrNoTaskInQueue {
373
time.Sleep(time.Second)
378
func (mgr *DiskRepairMgr) popTaskAndPrepare() error {
379
_, task, exist := mgr.prepareQueue.PopTask()
381
return base.ErrNoTaskInQueue
385
span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.popTaskAndPrepare")
390
span.Errorf("prepare task failed and retry task: task_id[%s], err[%+v]", task.(*proto.MigrateTask).TaskID, err)
391
mgr.prepareQueue.RetryTask(task.(*proto.MigrateTask).TaskID)
395
//why:avoid to change task in queue
396
t := task.(*proto.MigrateTask).Copy()
397
span.Infof("pop task: task_id[%s], task[%+v]", t.TaskID, t)
398
// whether vid has another running task
399
err = base.VolTaskLockerInst().TryLock(ctx, t.Vid())
401
span.Warnf("tryLock failed: vid[%d]", t.Vid())
402
return base.ErrVolNotOnlyOneTask
406
span.Errorf("prepare task failed: task_id[%s], err[%+v]", t.TaskID, err)
407
base.VolTaskLockerInst().Unlock(ctx, t.Vid())
411
err = mgr.prepareTask(t)
413
span.Errorf("prepare task failed: task_id[%s], err[%+v]", t.TaskID, err)
417
span.Infof("prepare task success: task_id[%s]", t.TaskID)
421
func (mgr *DiskRepairMgr) prepareTask(t *proto.MigrateTask) error {
422
span, ctx := trace.StartSpanFromContext(
423
context.Background(),
424
"DiskRepairMgr.prepareTask")
427
span.Infof("start prepare repair task: task_id[%s], task[%+v]", t.TaskID, t)
429
volInfo, err := mgr.clusterMgrCli.GetVolumeInfo(ctx, t.Vid())
431
span.Errorf("prepare task get volume info failed: err[%+v]", err)
435
// 1.check necessity of generating current task
436
badVuid := t.SourceVuid
437
if volInfo.VunitLocations[badVuid.Index()].Vuid != badVuid {
438
span.Infof("repair task finish in advance: task_id[%s]", t.TaskID)
439
mgr.finishTaskInAdvance(ctx, t, "volume has migrated")
443
// 2.generate src and destination for task & task persist
444
allocDstVunit, err := base.AllocVunitSafe(ctx, mgr.clusterMgrCli, badVuid, t.Sources)
446
span.Errorf("repair alloc volume unit failed: err[%+v]", err)
450
t.CodeMode = volInfo.CodeMode
451
t.Sources = volInfo.VunitLocations
452
t.Destination = allocDstVunit.Location()
453
t.State = proto.MigrateStatePrepared
454
base.InsistOn(ctx, "repair prepare task update task tbl", func() error {
455
return mgr.clusterMgrCli.UpdateMigrateTask(ctx, t)
458
mgr.sendToWorkQueue(t)
462
func (mgr *DiskRepairMgr) sendToWorkQueue(t *proto.MigrateTask) {
463
mgr.workQueue.AddPreparedTask(t.SourceIDC, t.TaskID, t)
464
mgr.prepareQueue.RemoveTask(t.TaskID)
467
func (mgr *DiskRepairMgr) finishTaskInAdvance(ctx context.Context, task *proto.MigrateTask, reason string) {
468
task.State = proto.MigrateStateFinishedInAdvance
469
task.FinishAdvanceReason = reason
470
base.InsistOn(ctx, "repair finish task in advance update task tbl", func() error {
471
return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
474
if recordErr := mgr.taskLogger.Encode(task); recordErr != nil {
475
trace.SpanFromContextSafe(ctx).Errorf("record repair task failed: task[%+v], err[%+v]", task, recordErr)
478
mgr.finishTaskCounter.Add()
479
mgr.prepareQueue.RemoveTask(task.TaskID)
480
mgr.deletedTasks.add(task.SourceDiskID, task.TaskID)
481
base.VolTaskLockerInst().Unlock(ctx, task.Vid())
484
func (mgr *DiskRepairMgr) finishTaskLoop() {
487
err := mgr.popTaskAndFinish()
488
if err == base.ErrNoTaskInQueue {
489
time.Sleep(5 * time.Second)
494
func (mgr *DiskRepairMgr) popTaskAndFinish() error {
495
_, task, exist := mgr.finishQueue.PopTask()
497
return base.ErrNoTaskInQueue
500
span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.popTaskAndFinish")
502
t := task.(*proto.MigrateTask).Copy()
503
err := mgr.finishTask(ctx, t)
505
span.Errorf("finish task failed: err[%+v]", err)
509
span.Infof("finish task success: task_id[%s]", t.TaskID)
513
func (mgr *DiskRepairMgr) finishTask(ctx context.Context, task *proto.MigrateTask) (retErr error) {
514
span := trace.SpanFromContextSafe(ctx)
518
mgr.finishQueue.RetryTask(task.TaskID)
522
if task.State != proto.MigrateStateWorkCompleted {
523
span.Panicf("task state not expect: task_id[%s], expect state[%d], actual state[%d]", proto.MigrateStateWorkCompleted, task.State)
525
// complete stage can not make sure to save task info to db,
526
// finish stage make sure to save task info to db
527
// execute update volume mapping relation when can not save task with completed state is dangerous
528
// because if process restart will reload task and redo by worker
529
// worker will write data to chunk which is online
530
base.InsistOn(ctx, "repair finish task update task state completed", func() error {
531
return mgr.clusterMgrCli.UpdateMigrateTask(ctx, task)
534
newVuid := task.Destination.Vuid
535
oldVuid := task.SourceVuid
536
err := mgr.clusterMgrCli.UpdateVolume(ctx, newVuid, oldVuid, task.DestinationDiskID())
538
span.Errorf("update volume failed: err[%+v]", err)
539
return mgr.handleUpdateVolMappingFail(ctx, task, err)
542
task.State = proto.MigrateStateFinished
543
base.InsistOn(ctx, "repair finish task update task state finished", func() error {
544
return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
547
if recordErr := mgr.taskLogger.Encode(task); recordErr != nil {
548
span.Errorf("record repair task failed: task[%+v], err[%+v]", task, recordErr)
551
mgr.finishTaskCounter.Add()
552
// 1.remove task in memory
553
// 2.release lock of volume task
554
mgr.finishQueue.RemoveTask(task.TaskID)
556
// add delete task and check it again
557
mgr.deletedTasks.add(task.SourceDiskID, task.TaskID)
559
base.VolTaskLockerInst().Unlock(ctx, task.Vid())
564
func (mgr *DiskRepairMgr) handleUpdateVolMappingFail(ctx context.Context, task *proto.MigrateTask, err error) error {
565
span := trace.SpanFromContextSafe(ctx)
566
span.Infof("handle update vol mapping failed: task_id[%s], state[%d], dest vuid[%d]", task.TaskID, task.State, task.Destination.Vuid)
568
code := rpc.DetectStatusCode(err)
569
if code == errcode.CodeOldVuidNotMatch {
570
span.Panicf("change volume unit relationship got unexpected err")
573
if base.ShouldAllocAndRedo(code) {
574
span.Infof("realloc vunit and redo: task_id[%s]", task.TaskID)
576
newVunit, err := base.AllocVunitSafe(ctx, mgr.clusterMgrCli, task.SourceVuid, task.Sources)
578
span.Errorf("realloc failed: vuid[%d], err[%+v]", task.SourceVuid, err)
581
task.SetDestination(newVunit.Location())
582
task.State = proto.MigrateStatePrepared
585
base.InsistOn(ctx, "repair redo task update task tbl", func() error {
586
return mgr.clusterMgrCli.UpdateMigrateTask(ctx, task)
589
mgr.finishQueue.RemoveTask(task.TaskID)
590
mgr.workQueue.AddPreparedTask(task.SourceIDC, task.TaskID, task)
591
span.Infof("task redo again: task_id[%v]", task.TaskID)
598
func (mgr *DiskRepairMgr) checkRepairedAndClearLoop() {
599
t := time.NewTicker(time.Duration(mgr.cfg.CheckTaskIntervalS) * time.Second)
606
mgr.checkRepairedAndClear()
607
case <-mgr.Closer.Done():
613
func (mgr *DiskRepairMgr) checkRepairedAndClear() {
614
span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.checkRepairedAndClear")
616
for _, disk := range mgr.repairingDisks.list() {
617
if !mgr.checkDiskRepaired(ctx, disk.DiskID) {
620
err := mgr.clusterMgrCli.SetDiskRepaired(ctx, disk.DiskID)
624
span.Infof("disk repaired will start clear: disk_id[%d]", disk.DiskID)
625
mgr.clearTasksByDiskID(ctx, disk.DiskID)
629
func (mgr *DiskRepairMgr) clearTasksByDiskID(ctx context.Context, diskID proto.DiskID) {
630
base.InsistOn(ctx, "delete migrating disk fail", func() error {
631
return mgr.clusterMgrCli.DeleteMigratingDisk(ctx, proto.TaskTypeDiskRepair, diskID)
633
mgr.deletedTasks.delete(diskID)
634
mgr.repairedDisks.add(diskID, time.Now())
635
mgr.repairingDisks.delete(diskID)
638
func (mgr *DiskRepairMgr) checkDiskRepaired(ctx context.Context, diskID proto.DiskID) bool {
639
span := trace.SpanFromContextSafe(ctx)
640
span.Infof("check repaired: disk_id[%d]", diskID)
642
tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
644
span.Errorf("check repaired and find task failed: disk_iD[%d], err[%+v]", diskID, err)
647
vunitInfos, err := mgr.clusterMgrCli.ListDiskVolumeUnits(ctx, diskID)
649
span.Errorf("check repaired list disk volume units failed: disk_id[%s], err[%+v]", diskID, err)
652
if len(vunitInfos) == 0 && len(tasks) != 0 {
653
// due to network timeout, it may lead to repeated insertion of deleted tasks, and need to delete it again
654
mgr.clearJunkTasks(ctx, diskID, tasks)
657
if len(vunitInfos) != 0 && len(tasks) == 0 {
658
// it may be occur when migration done and repair tasks generate concurrent, list volume units may not return the migrate unit
659
span.Warnf("clustermgr has some volume unit not repair and revise again: disk_id[%d], volume units len[%d]", diskID, len(vunitInfos))
660
if err = mgr.reviseRepairDisk(ctx, diskID); err != nil {
661
span.Errorf("revise repair task failed: err[%+v]", err)
665
return len(tasks) == 0 && len(vunitInfos) == 0
668
func (mgr *DiskRepairMgr) clearJunkTasks(ctx context.Context, diskID proto.DiskID, tasks []*proto.MigrateTask) {
669
span := trace.SpanFromContextSafe(ctx)
670
for _, task := range tasks {
671
if !mgr.deletedTasks.exits(diskID, task.TaskID) {
674
span.Warnf("delete junk task: task_id[%s]", task.TaskID)
675
base.InsistOn(ctx, "delete junk task", func() error {
676
return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
681
// checkAndClearJunkTasksLoop due to network timeout, the repaired disk may still have some junk migrate tasks in clustermgr,
682
// and we need to clear those tasks later
683
func (mgr *DiskRepairMgr) checkAndClearJunkTasksLoop() {
684
t := time.NewTicker(clearJunkMigrationTaskInterval)
690
mgr.checkAndClearJunkTasks()
691
case <-mgr.Closer.Done():
697
func (mgr *DiskRepairMgr) checkAndClearJunkTasks() {
698
span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.clearJunkTasks")
700
for _, disk := range mgr.repairedDisks.list() {
701
if time.Since(disk.finishedTime) < junkMigrationTaskProtectionWindow {
704
span.Debugf("check repaired disk: disk_id[%d], repaired time[%v]", disk.diskID, disk.finishedTime)
705
diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, disk.diskID)
707
span.Errorf("get disk info failed: disk_id[%d], err[%+v]", disk.diskID, err)
710
if !diskInfo.IsRepaired() {
713
tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, disk.diskID)
718
span.Warnf("clear junk tasks of repaired disk: disk_id[%d], tasks size[%d]", disk.diskID, len(tasks))
719
for _, task := range tasks {
720
span.Warnf("check and delete junk task: task_id[%s]", task.TaskID)
721
base.InsistOn(ctx, "chek and delete junk task", func() error {
722
return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
726
mgr.repairedDisks.delete(disk.diskID)
730
// AcquireTask acquire repair task
731
func (mgr *DiskRepairMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error) {
732
if !mgr.taskSwitch.Enabled() {
733
return task, proto.ErrTaskPaused
736
_, repairTask, _ := mgr.workQueue.Acquire(idc)
737
if repairTask != nil {
738
task = *repairTask.(*proto.MigrateTask)
741
return task, proto.ErrTaskEmpty
744
// CancelTask cancel repair task
745
func (mgr *DiskRepairMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error {
746
span := trace.SpanFromContextSafe(ctx)
748
err := mgr.workQueue.Cancel(args.IDC, args.TaskID, args.Src, args.Dest)
750
span.Errorf("cancel repair failed: task_id[%s], err[%+v]", args.TaskID, err)
753
mgr.taskStatsMgr.CancelTask()
758
// ReclaimTask reclaim repair task
759
func (mgr *DiskRepairMgr) ReclaimTask(ctx context.Context,
761
src []proto.VunitLocation,
762
oldDst proto.VunitLocation,
763
newDst *client.AllocVunitInfo) error {
764
span := trace.SpanFromContextSafe(ctx)
766
err := mgr.workQueue.Reclaim(idc, taskID, src, oldDst, newDst.Location(), newDst.DiskID)
768
// task has finished,because only complete will remove task from queue
769
span.Errorf("reclaim repair task failed: task_id[%s], err[%+v]", taskID, err)
773
task, err := mgr.workQueue.Query(idc, taskID)
775
span.Errorf("found task in workQueue failed: idc[%s], task_id[%s], err[%+v]", idc, taskID, err)
779
err = mgr.clusterMgrCli.UpdateMigrateTask(ctx, task.(*proto.MigrateTask))
781
span.Warnf("update reclaim task failed: task_id[%s], err[%+v]", taskID, err)
784
mgr.taskStatsMgr.ReclaimTask()
788
// CompleteTask complete repair task
789
func (mgr *DiskRepairMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error {
790
span := trace.SpanFromContextSafe(ctx)
792
completeTask, err := mgr.workQueue.Complete(args.IDC, args.TaskID, args.Src, args.Dest)
794
span.Errorf("complete repair task failed: task_id[%s], err[%+v]", args.TaskID, err)
798
t := completeTask.(*proto.MigrateTask)
799
t.State = proto.MigrateStateWorkCompleted
801
mgr.finishQueue.PushTask(args.TaskID, t)
802
// as complete func is face to svr api, so can not loop save task
803
// to db until success, it will make saving task info to be difficult,
804
// that delay saving task info in finish stage is a simply way
808
// RenewalTask renewal repair task
809
func (mgr *DiskRepairMgr) RenewalTask(ctx context.Context, idc, taskID string) error {
810
if !mgr.taskSwitch.Enabled() {
811
// renewal task stopping will touch off worker to stop task
812
return proto.ErrTaskPaused
815
span := trace.SpanFromContextSafe(ctx)
816
err := mgr.workQueue.Renewal(idc, taskID)
818
span.Warnf("renewal repair task failed: task_id[%s], err[%+v]", taskID, err)
824
// ReportWorkerTaskStats reports task stats
825
func (mgr *DiskRepairMgr) ReportWorkerTaskStats(st *api.TaskReportArgs) {
826
mgr.taskStatsMgr.ReportWorkerTaskStats(st.TaskID, st.TaskStats, st.IncreaseDataSizeByte, st.IncreaseShardCnt)
829
// QueryTask return task statistics
830
func (mgr *DiskRepairMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error) {
831
detail := &api.MigrateTaskDetail{}
832
taskInfo, err := mgr.clusterMgrCli.GetMigrateTask(ctx, proto.TaskTypeDiskRepair, taskID)
836
detail.Task = *taskInfo
838
detailRunInfo, err := mgr.taskStatsMgr.QueryTaskDetail(taskID)
842
detail.Stat = detailRunInfo.Statistics
846
// StatQueueTaskCnt returns task queue stats
847
func (mgr *DiskRepairMgr) StatQueueTaskCnt() (inited, prepared, completed int) {
848
todo, doing := mgr.prepareQueue.StatsTasks()
849
inited = todo + doing
851
todo, doing = mgr.workQueue.StatsTasks()
852
prepared = todo + doing
854
todo, doing = mgr.finishQueue.StatsTasks()
855
completed = todo + doing
859
// Stats returns task stats
860
func (mgr *DiskRepairMgr) Stats() api.MigrateTasksStat {
861
preparing, workerDoing, finishing := mgr.StatQueueTaskCnt()
862
finishedCnt := mgr.finishTaskCounter.Show()
863
increaseDataSize, increaseShardCnt := mgr.taskStatsMgr.Counters()
864
return api.MigrateTasksStat{
865
PreparingCnt: preparing,
866
WorkerDoingCnt: workerDoing,
867
FinishingCnt: finishing,
868
StatsPerMin: api.PerMinStats{
869
FinishedCnt: fmt.Sprint(finishedCnt),
870
DataAmountByte: base.DataMountFormat(increaseDataSize),
871
ShardCnt: fmt.Sprint(increaseShardCnt),
876
// Progress repair manager progress
877
func (mgr *DiskRepairMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int) {
878
span := trace.SpanFromContextSafe(ctx)
879
migratingDisks = make([]proto.DiskID, 0)
881
for _, disk := range mgr.repairingDisks.list() {
882
total += int(disk.UsedChunkCnt)
883
remainTasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, disk.DiskID)
885
span.Errorf("find all task failed: err[%+v]", err)
886
return migratingDisks, 0, 0
888
migrated += int(disk.UsedChunkCnt) - len(remainTasks)
889
migratingDisks = append(migratingDisks, disk.DiskID)
894
func (mgr *DiskRepairMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error) {
895
span := trace.SpanFromContextSafe(ctx)
897
migratingDisk, ok := mgr.repairingDisks.get(diskID)
899
err = errors.New("not repairing disk")
902
remainTasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
904
span.Errorf("find all task failed: err[%+v]", err)
907
stats = &api.DiskMigratingStats{}
908
stats.TotalTasksCnt = int(migratingDisk.UsedChunkCnt)
909
stats.MigratedTasksCnt = stats.TotalTasksCnt - len(remainTasks)