cubefs

Форк
0
/
disk_repairer.go 
911 строк · 27.7 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"context"
19
	"errors"
20
	"fmt"
21
	"time"
22

23
	api "github.com/cubefs/cubefs/blobstore/api/scheduler"
24
	"github.com/cubefs/cubefs/blobstore/common/counter"
25
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
26
	"github.com/cubefs/cubefs/blobstore/common/proto"
27
	"github.com/cubefs/cubefs/blobstore/common/recordlog"
28
	"github.com/cubefs/cubefs/blobstore/common/rpc"
29
	"github.com/cubefs/cubefs/blobstore/common/taskswitch"
30
	"github.com/cubefs/cubefs/blobstore/common/trace"
31
	"github.com/cubefs/cubefs/blobstore/scheduler/base"
32
	"github.com/cubefs/cubefs/blobstore/scheduler/client"
33
	"github.com/cubefs/cubefs/blobstore/util/closer"
34
)
35

36
// DiskRepairMgr repair task manager
37
type DiskRepairMgr struct {
38
	closer.Closer
39

40
	prepareQueue   *base.TaskQueue
41
	workQueue      *base.WorkerTaskQueue
42
	finishQueue    *base.TaskQueue
43
	deletedTasks   *diskMigratedTasks
44
	repairedDisks  *migratedDisks
45
	repairingDisks *migratingDisks
46

47
	clusterMgrCli client.ClusterMgrAPI
48

49
	taskSwitch taskswitch.ISwitcher
50

51
	// for stats
52
	finishTaskCounter counter.Counter
53
	taskStatsMgr      *base.TaskStatsMgr
54

55
	hasRevised bool
56
	taskLogger recordlog.Encoder
57
	cfg        *MigrateConfig
58
}
59

60
// NewDiskRepairMgr returns repair manager
61
func NewDiskRepairMgr(clusterMgrCli client.ClusterMgrAPI, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, cfg *MigrateConfig) *DiskRepairMgr {
62
	mgr := &DiskRepairMgr{
63
		Closer:         closer.New(),
64
		prepareQueue:   base.NewTaskQueue(time.Duration(cfg.PrepareQueueRetryDelayS) * time.Second),
65
		workQueue:      base.NewWorkerTaskQueue(time.Duration(cfg.CancelPunishDurationS) * time.Second),
66
		finishQueue:    base.NewTaskQueue(time.Duration(cfg.FinishQueueRetryDelayS) * time.Second),
67
		deletedTasks:   newDiskMigratedTasks(),
68
		repairedDisks:  newMigratedDisks(),
69
		repairingDisks: newMigratingDisks(),
70

71
		clusterMgrCli: clusterMgrCli,
72
		taskSwitch:    taskSwitch,
73
		cfg:           cfg,
74
		taskLogger:    taskLogger,
75

76
		hasRevised: false,
77
	}
78
	mgr.taskStatsMgr = base.NewTaskStatsMgrAndRun(cfg.ClusterID, proto.TaskTypeDiskRepair, mgr)
79
	return mgr
80
}
81

82
// Load load repair task from database
83
func (mgr *DiskRepairMgr) Load() error {
84
	span, ctx := trace.StartSpanFromContext(context.Background(), "Load")
85

86
	repairingDisks, err := mgr.clusterMgrCli.ListMigratingDisks(ctx, proto.TaskTypeDiskRepair)
87
	if err != nil {
88
		return err
89
	}
90
	for _, disk := range repairingDisks {
91
		mgr.repairingDisks.add(disk.Disk.DiskID, disk.Disk)
92
	}
93

94
	tasks, err := mgr.clusterMgrCli.ListAllMigrateTasks(ctx, proto.TaskTypeDiskRepair)
95
	if err != nil {
96
		span.Errorf("find all tasks failed: err[%+v]", err)
97
		return err
98
	}
99
	if len(tasks) == 0 {
100
		return nil
101
	}
102

103
	var junkTasks []*proto.MigrateTask
104
	for _, t := range tasks {
105
		if _, ok := mgr.repairingDisks.get(t.SourceDiskID); !ok {
106
			junkTasks = append(junkTasks, t)
107
			continue
108
		}
109
		if t.Running() {
110
			err = base.VolTaskLockerInst().TryLock(ctx, t.Vid())
111
			if err != nil {
112
				return fmt.Errorf("repair task conflict: task[%+v], err[%+v]",
113
					t, err.Error())
114
			}
115
		}
116

117
		span.Infof("load task success: task_id[%s], state[%d]", t.TaskID, t.State)
118
		switch t.State {
119
		case proto.MigrateStateInited:
120
			mgr.prepareQueue.PushTask(t.TaskID, t)
121
		case proto.MigrateStatePrepared:
122
			mgr.workQueue.AddPreparedTask(t.SourceIDC, t.TaskID, t)
123
		case proto.MigrateStateWorkCompleted:
124
			mgr.finishQueue.PushTask(t.TaskID, t)
125
		case proto.MigrateStateFinished, proto.MigrateStateFinishedInAdvance:
126
			return fmt.Errorf("task should be deleted from db: task[%+v]", t)
127
		default:
128
			return fmt.Errorf("unexpect migrate state: task[%+v]", t)
129
		}
130
	}
131

132
	return mgr.clearJunkTasksWhenLoading(ctx, junkTasks)
133
}
134

135
func (mgr *DiskRepairMgr) clearJunkTasksWhenLoading(ctx context.Context, tasks []*proto.MigrateTask) error {
136
	span := trace.SpanFromContextSafe(ctx)
137

138
	disks := make(map[proto.DiskID]bool)
139
	for _, task := range tasks {
140
		if _, ok := disks[task.SourceDiskID]; !ok {
141
			diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, task.SourceDiskID)
142
			if err != nil {
143
				return err
144
			}
145
			disks[task.SourceDiskID] = diskInfo.IsRepaired()
146
		}
147
		if !disks[task.SourceDiskID] {
148
			span.Errorf("has junk task but the disk is not repaired: disk_id[%d], task_id[%s]", task.SourceDiskID, task.TaskID)
149
			return errcode.ErrUnexpectMigrationTask
150
		}
151
		span.Warnf("loading delete junk task: task_id[%s]", task.TaskID)
152
		base.InsistOn(ctx, " loading delete junk task", func() error {
153
			return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
154
		})
155
	}
156
	return nil
157
}
158

159
// Run run repair task includes collect/prepare/finish/check phase
160
func (mgr *DiskRepairMgr) Run() {
161
	go mgr.collectTaskLoop()
162
	go mgr.prepareTaskLoop()
163
	go mgr.finishTaskLoop()
164
	go mgr.checkRepairedAndClearLoop()
165
	go mgr.checkAndClearJunkTasksLoop()
166
}
167

168
func (mgr *DiskRepairMgr) Enabled() bool {
169
	return mgr.taskSwitch.Enabled()
170
}
171

172
func (mgr *DiskRepairMgr) WaitEnable() {
173
	mgr.taskSwitch.WaitEnable()
174
}
175

176
func (mgr *DiskRepairMgr) collectTaskLoop() {
177
	t := time.NewTicker(time.Duration(mgr.cfg.CollectTaskIntervalS) * time.Second)
178
	defer t.Stop()
179

180
	for {
181
		select {
182
		case <-t.C:
183
			mgr.WaitEnable()
184
			mgr.collectTask()
185
		case <-mgr.Closer.Done():
186
			return
187
		}
188
	}
189
}
190

191
func (mgr *DiskRepairMgr) collectTask() {
192
	span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.collectTask")
193
	defer span.Finish()
194

195
	// revise repair tasks to make sure data consistency when services start
196
	if !mgr.hasRevised {
197
		if err := mgr.reviseRepairDisks(ctx); err != nil {
198
			return
199
		}
200
		mgr.hasRevised = true
201
	}
202

203
	if mgr.repairingDisks.size() >= mgr.cfg.DiskConcurrency {
204
		return
205
	}
206

207
	brokenDisk, err := mgr.acquireBrokenDisk(ctx)
208
	if err != nil {
209
		span.Info("acquire broken disk failed: err[%+v]", err)
210
		return
211
	}
212

213
	if brokenDisk == nil {
214
		return
215
	}
216

217
	err = mgr.genDiskRepairTasks(ctx, brokenDisk, true)
218
	if err != nil {
219
		span.Errorf("generate disk repair tasks failed: err[%+v]", err)
220
		return
221
	}
222

223
	base.InsistOn(ctx, "set disk diskId %d repairing failed", func() error {
224
		return mgr.clusterMgrCli.SetDiskRepairing(ctx, brokenDisk.DiskID)
225
	})
226

227
	mgr.repairingDisks.add(brokenDisk.DiskID, brokenDisk)
228
}
229

230
func (mgr *DiskRepairMgr) reviseRepairDisks(ctx context.Context) error {
231
	span := trace.SpanFromContextSafe(ctx)
232

233
	for _, disk := range mgr.repairingDisks.list() {
234
		if err := mgr.reviseRepairDisk(ctx, disk.DiskID); err != nil {
235
			span.Errorf("revise repair tasks failed: disk_id[%d]", disk.DiskID)
236
			return err
237
		}
238
	}
239
	return nil
240
}
241

242
func (mgr *DiskRepairMgr) reviseRepairDisk(ctx context.Context, diskID proto.DiskID) error {
243
	span := trace.SpanFromContextSafe(ctx)
244

245
	diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, diskID)
246
	if err != nil {
247
		span.Errorf("get disk info failed: err[%+v]", err)
248
		return err
249
	}
250

251
	if err = mgr.genDiskRepairTasks(ctx, diskInfo, false); err != nil {
252
		span.Errorf("generate disk repair tasks failed: err[%+v]", err)
253
		return err
254
	}
255

256
	if diskInfo.IsBroken() {
257
		execMsg := fmt.Sprintf("set disk diskId %d repairing", diskID)
258
		base.InsistOn(ctx, execMsg, func() error {
259
			return mgr.clusterMgrCli.SetDiskRepairing(ctx, diskID)
260
		})
261
	}
262
	return nil
263
}
264

265
func (mgr *DiskRepairMgr) acquireBrokenDisk(ctx context.Context) (*client.DiskInfoSimple, error) {
266
	repairingDisks, err := mgr.clusterMgrCli.ListBrokenDisks(ctx)
267
	if err != nil {
268
		return nil, err
269
	}
270
	if len(repairingDisks) == 0 {
271
		return nil, nil
272
	}
273
	return mgr.getUnRepairingDisk(repairingDisks), nil
274
}
275

276
func (mgr *DiskRepairMgr) getUnRepairingDisk(disks []*client.DiskInfoSimple) *client.DiskInfoSimple {
277
	for _, v := range disks {
278
		if _, ok := mgr.repairingDisks.get(v.DiskID); !ok {
279
			return v
280
		}
281
	}
282
	return nil
283
}
284

285
func (mgr *DiskRepairMgr) genDiskRepairTasks(ctx context.Context, disk *client.DiskInfoSimple, newRepairDisk bool) error {
286
	span := trace.SpanFromContextSafe(ctx)
287
	span.Infof("start generate disk repair tasks: disk_id[%d], disk_idc[%s]", disk.DiskID, disk.Idc)
288

289
	migratingVuids, err := mgr.listMigratingVuid(ctx, disk.DiskID)
290
	if err != nil {
291
		span.Errorf("list repairing vuids failed: err[%+v]", err)
292
		return err
293
	}
294

295
	unmigratedvuids, err := mgr.listUnmigratedVuid(ctx, disk.DiskID)
296
	if err != nil {
297
		span.Errorf("list un repaired vuids failed: err[%+v]", err)
298
		return err
299
	}
300

301
	remain := base.Subtraction(unmigratedvuids, migratingVuids)
302
	span.Infof("should gen tasks remain: len[%d]", len(remain))
303
	if newRepairDisk {
304
		meta := &client.MigratingDiskMeta{
305
			TaskType: proto.TaskTypeDiskRepair,
306
			Disk:     disk,
307
		}
308
		if err := mgr.clusterMgrCli.AddMigratingDisk(ctx, meta); err != nil {
309
			return err
310
		}
311
	}
312
	for _, vuid := range remain {
313
		mgr.initOneTask(ctx, vuid, disk.DiskID, disk.Idc)
314
	}
315
	return nil
316
}
317

318
func (mgr *DiskRepairMgr) listMigratingVuid(ctx context.Context, diskID proto.DiskID) (bads []proto.Vuid, err error) {
319
	tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
320
	if err != nil {
321
		return nil, err
322
	}
323

324
	for _, t := range tasks {
325
		bads = append(bads, t.SourceVuid)
326
	}
327
	return bads, nil
328
}
329

330
func (mgr *DiskRepairMgr) listUnmigratedVuid(ctx context.Context, diskID proto.DiskID) (bads []proto.Vuid, err error) {
331
	vunits, err := mgr.clusterMgrCli.ListDiskVolumeUnits(ctx, diskID)
332
	if err != nil {
333
		return nil, err
334
	}
335

336
	for _, vunit := range vunits {
337
		bads = append(bads, vunit.Vuid)
338
	}
339
	return bads, nil
340
}
341

342
func (mgr *DiskRepairMgr) initOneTask(ctx context.Context, badVuid proto.Vuid, brokenDiskID proto.DiskID, brokenDiskIdc string) {
343
	span := trace.SpanFromContextSafe(ctx)
344

345
	t := proto.MigrateTask{
346
		TaskID:                  client.GenMigrateTaskID(proto.TaskTypeDiskRepair, brokenDiskID, badVuid.Vid()),
347
		TaskType:                proto.TaskTypeDiskRepair,
348
		State:                   proto.MigrateStateInited,
349
		SourceDiskID:            brokenDiskID,
350
		SourceVuid:              badVuid,
351
		SourceIDC:               brokenDiskIdc,
352
		ForbiddenDirectDownload: true,
353
	}
354
	base.InsistOn(ctx, "repair init one task insert task to tbl", func() error {
355
		return mgr.clusterMgrCli.AddMigrateTask(ctx, &t)
356
	})
357

358
	mgr.prepareQueue.PushTask(t.TaskID, &t)
359
	span.Infof("init repair task success %+v", t)
360
}
361

362
func (mgr *DiskRepairMgr) prepareTaskLoop() {
363
	for {
364
		mgr.WaitEnable()
365
		todo, doing := mgr.workQueue.StatsTasks()
366
		if mgr.repairingDisks.size() == 0 || todo+doing >= mgr.cfg.WorkQueueSize {
367
			time.Sleep(time.Second)
368
			continue
369
		}
370

371
		err := mgr.popTaskAndPrepare()
372
		if err == base.ErrNoTaskInQueue {
373
			time.Sleep(time.Second)
374
		}
375
	}
376
}
377

378
func (mgr *DiskRepairMgr) popTaskAndPrepare() error {
379
	_, task, exist := mgr.prepareQueue.PopTask()
380
	if !exist {
381
		return base.ErrNoTaskInQueue
382
	}
383

384
	var err error
385
	span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.popTaskAndPrepare")
386
	defer span.Finish()
387

388
	defer func() {
389
		if err != nil {
390
			span.Errorf("prepare task failed  and retry task: task_id[%s], err[%+v]", task.(*proto.MigrateTask).TaskID, err)
391
			mgr.prepareQueue.RetryTask(task.(*proto.MigrateTask).TaskID)
392
		}
393
	}()
394

395
	//why:avoid to change task in queue
396
	t := task.(*proto.MigrateTask).Copy()
397
	span.Infof("pop task: task_id[%s], task[%+v]", t.TaskID, t)
398
	// whether vid has another running task
399
	err = base.VolTaskLockerInst().TryLock(ctx, t.Vid())
400
	if err != nil {
401
		span.Warnf("tryLock failed: vid[%d]", t.Vid())
402
		return base.ErrVolNotOnlyOneTask
403
	}
404
	defer func() {
405
		if err != nil {
406
			span.Errorf("prepare task failed: task_id[%s], err[%+v]", t.TaskID, err)
407
			base.VolTaskLockerInst().Unlock(ctx, t.Vid())
408
		}
409
	}()
410

411
	err = mgr.prepareTask(t)
412
	if err != nil {
413
		span.Errorf("prepare task failed: task_id[%s], err[%+v]", t.TaskID, err)
414
		return err
415
	}
416

417
	span.Infof("prepare task success: task_id[%s]", t.TaskID)
418
	return nil
419
}
420

421
func (mgr *DiskRepairMgr) prepareTask(t *proto.MigrateTask) error {
422
	span, ctx := trace.StartSpanFromContext(
423
		context.Background(),
424
		"DiskRepairMgr.prepareTask")
425
	defer span.Finish()
426

427
	span.Infof("start prepare repair task: task_id[%s], task[%+v]", t.TaskID, t)
428

429
	volInfo, err := mgr.clusterMgrCli.GetVolumeInfo(ctx, t.Vid())
430
	if err != nil {
431
		span.Errorf("prepare task get volume info failed: err[%+v]", err)
432
		return err
433
	}
434

435
	// 1.check necessity of generating current task
436
	badVuid := t.SourceVuid
437
	if volInfo.VunitLocations[badVuid.Index()].Vuid != badVuid {
438
		span.Infof("repair task finish in advance: task_id[%s]", t.TaskID)
439
		mgr.finishTaskInAdvance(ctx, t, "volume has migrated")
440
		return nil
441
	}
442

443
	// 2.generate src and destination for task & task persist
444
	allocDstVunit, err := base.AllocVunitSafe(ctx, mgr.clusterMgrCli, badVuid, t.Sources)
445
	if err != nil {
446
		span.Errorf("repair alloc volume unit failed: err[%+v]", err)
447
		return err
448
	}
449

450
	t.CodeMode = volInfo.CodeMode
451
	t.Sources = volInfo.VunitLocations
452
	t.Destination = allocDstVunit.Location()
453
	t.State = proto.MigrateStatePrepared
454
	base.InsistOn(ctx, "repair prepare task update task tbl", func() error {
455
		return mgr.clusterMgrCli.UpdateMigrateTask(ctx, t)
456
	})
457

458
	mgr.sendToWorkQueue(t)
459
	return nil
460
}
461

462
func (mgr *DiskRepairMgr) sendToWorkQueue(t *proto.MigrateTask) {
463
	mgr.workQueue.AddPreparedTask(t.SourceIDC, t.TaskID, t)
464
	mgr.prepareQueue.RemoveTask(t.TaskID)
465
}
466

467
func (mgr *DiskRepairMgr) finishTaskInAdvance(ctx context.Context, task *proto.MigrateTask, reason string) {
468
	task.State = proto.MigrateStateFinishedInAdvance
469
	task.FinishAdvanceReason = reason
470
	base.InsistOn(ctx, "repair finish task in advance update task tbl", func() error {
471
		return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
472
	})
473

474
	if recordErr := mgr.taskLogger.Encode(task); recordErr != nil {
475
		trace.SpanFromContextSafe(ctx).Errorf("record repair task failed: task[%+v], err[%+v]", task, recordErr)
476
	}
477

478
	mgr.finishTaskCounter.Add()
479
	mgr.prepareQueue.RemoveTask(task.TaskID)
480
	mgr.deletedTasks.add(task.SourceDiskID, task.TaskID)
481
	base.VolTaskLockerInst().Unlock(ctx, task.Vid())
482
}
483

484
func (mgr *DiskRepairMgr) finishTaskLoop() {
485
	for {
486
		mgr.WaitEnable()
487
		err := mgr.popTaskAndFinish()
488
		if err == base.ErrNoTaskInQueue {
489
			time.Sleep(5 * time.Second)
490
		}
491
	}
492
}
493

494
func (mgr *DiskRepairMgr) popTaskAndFinish() error {
495
	_, task, exist := mgr.finishQueue.PopTask()
496
	if !exist {
497
		return base.ErrNoTaskInQueue
498
	}
499

500
	span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.popTaskAndFinish")
501

502
	t := task.(*proto.MigrateTask).Copy()
503
	err := mgr.finishTask(ctx, t)
504
	if err != nil {
505
		span.Errorf("finish task failed: err[%+v]", err)
506
		return err
507
	}
508

509
	span.Infof("finish task success: task_id[%s]", t.TaskID)
510
	return nil
511
}
512

513
func (mgr *DiskRepairMgr) finishTask(ctx context.Context, task *proto.MigrateTask) (retErr error) {
514
	span := trace.SpanFromContextSafe(ctx)
515

516
	defer func() {
517
		if retErr != nil {
518
			mgr.finishQueue.RetryTask(task.TaskID)
519
		}
520
	}()
521

522
	if task.State != proto.MigrateStateWorkCompleted {
523
		span.Panicf("task state not expect: task_id[%s], expect state[%d], actual state[%d]", proto.MigrateStateWorkCompleted, task.State)
524
	}
525
	// complete stage can not make sure to save task info to db,
526
	// finish stage make sure to save task info to db
527
	// execute update volume mapping relation when can not save task with completed state is dangerous
528
	// because if process restart will reload task and redo by worker
529
	// worker will write data to chunk which is online
530
	base.InsistOn(ctx, "repair finish task update task state completed", func() error {
531
		return mgr.clusterMgrCli.UpdateMigrateTask(ctx, task)
532
	})
533

534
	newVuid := task.Destination.Vuid
535
	oldVuid := task.SourceVuid
536
	err := mgr.clusterMgrCli.UpdateVolume(ctx, newVuid, oldVuid, task.DestinationDiskID())
537
	if err != nil {
538
		span.Errorf("update volume failed: err[%+v]", err)
539
		return mgr.handleUpdateVolMappingFail(ctx, task, err)
540
	}
541

542
	task.State = proto.MigrateStateFinished
543
	base.InsistOn(ctx, "repair finish task update task state finished", func() error {
544
		return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
545
	})
546

547
	if recordErr := mgr.taskLogger.Encode(task); recordErr != nil {
548
		span.Errorf("record repair task failed: task[%+v], err[%+v]", task, recordErr)
549
	}
550

551
	mgr.finishTaskCounter.Add()
552
	// 1.remove task in memory
553
	// 2.release lock of volume task
554
	mgr.finishQueue.RemoveTask(task.TaskID)
555

556
	// add delete task and check it again
557
	mgr.deletedTasks.add(task.SourceDiskID, task.TaskID)
558

559
	base.VolTaskLockerInst().Unlock(ctx, task.Vid())
560

561
	return nil
562
}
563

564
func (mgr *DiskRepairMgr) handleUpdateVolMappingFail(ctx context.Context, task *proto.MigrateTask, err error) error {
565
	span := trace.SpanFromContextSafe(ctx)
566
	span.Infof("handle update vol mapping failed: task_id[%s], state[%d], dest vuid[%d]", task.TaskID, task.State, task.Destination.Vuid)
567

568
	code := rpc.DetectStatusCode(err)
569
	if code == errcode.CodeOldVuidNotMatch {
570
		span.Panicf("change volume unit relationship got unexpected err")
571
	}
572

573
	if base.ShouldAllocAndRedo(code) {
574
		span.Infof("realloc vunit and redo: task_id[%s]", task.TaskID)
575

576
		newVunit, err := base.AllocVunitSafe(ctx, mgr.clusterMgrCli, task.SourceVuid, task.Sources)
577
		if err != nil {
578
			span.Errorf("realloc failed: vuid[%d], err[%+v]", task.SourceVuid, err)
579
			return err
580
		}
581
		task.SetDestination(newVunit.Location())
582
		task.State = proto.MigrateStatePrepared
583
		task.WorkerRedoCnt++
584

585
		base.InsistOn(ctx, "repair redo task update task tbl", func() error {
586
			return mgr.clusterMgrCli.UpdateMigrateTask(ctx, task)
587
		})
588

589
		mgr.finishQueue.RemoveTask(task.TaskID)
590
		mgr.workQueue.AddPreparedTask(task.SourceIDC, task.TaskID, task)
591
		span.Infof("task redo again:  task_id[%v]", task.TaskID)
592
		return nil
593
	}
594

595
	return err
596
}
597

598
func (mgr *DiskRepairMgr) checkRepairedAndClearLoop() {
599
	t := time.NewTicker(time.Duration(mgr.cfg.CheckTaskIntervalS) * time.Second)
600
	defer t.Stop()
601

602
	for {
603
		select {
604
		case <-t.C:
605
			mgr.WaitEnable()
606
			mgr.checkRepairedAndClear()
607
		case <-mgr.Closer.Done():
608
			return
609
		}
610
	}
611
}
612

613
func (mgr *DiskRepairMgr) checkRepairedAndClear() {
614
	span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.checkRepairedAndClear")
615

616
	for _, disk := range mgr.repairingDisks.list() {
617
		if !mgr.checkDiskRepaired(ctx, disk.DiskID) {
618
			continue
619
		}
620
		err := mgr.clusterMgrCli.SetDiskRepaired(ctx, disk.DiskID)
621
		if err != nil {
622
			return
623
		}
624
		span.Infof("disk repaired will start clear: disk_id[%d]", disk.DiskID)
625
		mgr.clearTasksByDiskID(ctx, disk.DiskID)
626
	}
627
}
628

629
func (mgr *DiskRepairMgr) clearTasksByDiskID(ctx context.Context, diskID proto.DiskID) {
630
	base.InsistOn(ctx, "delete migrating disk fail", func() error {
631
		return mgr.clusterMgrCli.DeleteMigratingDisk(ctx, proto.TaskTypeDiskRepair, diskID)
632
	})
633
	mgr.deletedTasks.delete(diskID)
634
	mgr.repairedDisks.add(diskID, time.Now())
635
	mgr.repairingDisks.delete(diskID)
636
}
637

638
func (mgr *DiskRepairMgr) checkDiskRepaired(ctx context.Context, diskID proto.DiskID) bool {
639
	span := trace.SpanFromContextSafe(ctx)
640
	span.Infof("check repaired: disk_id[%d]", diskID)
641

642
	tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
643
	if err != nil {
644
		span.Errorf("check repaired and find task failed: disk_iD[%d], err[%+v]", diskID, err)
645
		return false
646
	}
647
	vunitInfos, err := mgr.clusterMgrCli.ListDiskVolumeUnits(ctx, diskID)
648
	if err != nil {
649
		span.Errorf("check repaired list disk volume units failed: disk_id[%s], err[%+v]", diskID, err)
650
		return false
651
	}
652
	if len(vunitInfos) == 0 && len(tasks) != 0 {
653
		// due to network timeout, it may lead to repeated insertion of deleted tasks, and need to delete it again
654
		mgr.clearJunkTasks(ctx, diskID, tasks)
655
		return false
656
	}
657
	if len(vunitInfos) != 0 && len(tasks) == 0 {
658
		// it may be occur when migration done and repair tasks generate concurrent, list volume units may not return the migrate unit
659
		span.Warnf("clustermgr has some volume unit not repair and revise again: disk_id[%d], volume units len[%d]", diskID, len(vunitInfos))
660
		if err = mgr.reviseRepairDisk(ctx, diskID); err != nil {
661
			span.Errorf("revise repair task failed: err[%+v]", err)
662
		}
663
		return false
664
	}
665
	return len(tasks) == 0 && len(vunitInfos) == 0
666
}
667

668
func (mgr *DiskRepairMgr) clearJunkTasks(ctx context.Context, diskID proto.DiskID, tasks []*proto.MigrateTask) {
669
	span := trace.SpanFromContextSafe(ctx)
670
	for _, task := range tasks {
671
		if !mgr.deletedTasks.exits(diskID, task.TaskID) {
672
			continue
673
		}
674
		span.Warnf("delete junk task: task_id[%s]", task.TaskID)
675
		base.InsistOn(ctx, "delete junk task", func() error {
676
			return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
677
		})
678
	}
679
}
680

681
// checkAndClearJunkTasksLoop due to network timeout, the repaired disk may still have some junk migrate tasks in clustermgr,
682
// and we need to clear those tasks later
683
func (mgr *DiskRepairMgr) checkAndClearJunkTasksLoop() {
684
	t := time.NewTicker(clearJunkMigrationTaskInterval)
685
	defer t.Stop()
686

687
	for {
688
		select {
689
		case <-t.C:
690
			mgr.checkAndClearJunkTasks()
691
		case <-mgr.Closer.Done():
692
			return
693
		}
694
	}
695
}
696

697
func (mgr *DiskRepairMgr) checkAndClearJunkTasks() {
698
	span, ctx := trace.StartSpanFromContext(context.Background(), "disk_repair.clearJunkTasks")
699

700
	for _, disk := range mgr.repairedDisks.list() {
701
		if time.Since(disk.finishedTime) < junkMigrationTaskProtectionWindow {
702
			continue
703
		}
704
		span.Debugf("check repaired disk: disk_id[%d], repaired time[%v]", disk.diskID, disk.finishedTime)
705
		diskInfo, err := mgr.clusterMgrCli.GetDiskInfo(ctx, disk.diskID)
706
		if err != nil {
707
			span.Errorf("get disk info failed: disk_id[%d], err[%+v]", disk.diskID, err)
708
			continue
709
		}
710
		if !diskInfo.IsRepaired() {
711
			continue
712
		}
713
		tasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, disk.diskID)
714
		if err != nil {
715
			continue
716
		}
717
		if len(tasks) != 0 {
718
			span.Warnf("clear junk tasks of repaired disk: disk_id[%d], tasks size[%d]", disk.diskID, len(tasks))
719
			for _, task := range tasks {
720
				span.Warnf("check and delete junk task: task_id[%s]", task.TaskID)
721
				base.InsistOn(ctx, "chek and delete junk task", func() error {
722
					return mgr.clusterMgrCli.DeleteMigrateTask(ctx, task.TaskID)
723
				})
724
			}
725
		}
726
		mgr.repairedDisks.delete(disk.diskID)
727
	}
728
}
729

730
// AcquireTask acquire repair task
731
func (mgr *DiskRepairMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error) {
732
	if !mgr.taskSwitch.Enabled() {
733
		return task, proto.ErrTaskPaused
734
	}
735

736
	_, repairTask, _ := mgr.workQueue.Acquire(idc)
737
	if repairTask != nil {
738
		task = *repairTask.(*proto.MigrateTask)
739
		return task, nil
740
	}
741
	return task, proto.ErrTaskEmpty
742
}
743

744
// CancelTask cancel repair task
745
func (mgr *DiskRepairMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error {
746
	span := trace.SpanFromContextSafe(ctx)
747

748
	err := mgr.workQueue.Cancel(args.IDC, args.TaskID, args.Src, args.Dest)
749
	if err != nil {
750
		span.Errorf("cancel repair failed: task_id[%s], err[%+v]", args.TaskID, err)
751
	}
752

753
	mgr.taskStatsMgr.CancelTask()
754

755
	return err
756
}
757

758
// ReclaimTask reclaim repair task
759
func (mgr *DiskRepairMgr) ReclaimTask(ctx context.Context,
760
	idc, taskID string,
761
	src []proto.VunitLocation,
762
	oldDst proto.VunitLocation,
763
	newDst *client.AllocVunitInfo) error {
764
	span := trace.SpanFromContextSafe(ctx)
765

766
	err := mgr.workQueue.Reclaim(idc, taskID, src, oldDst, newDst.Location(), newDst.DiskID)
767
	if err != nil {
768
		// task has finished,because only complete will remove task from queue
769
		span.Errorf("reclaim repair task failed: task_id[%s], err[%+v]", taskID, err)
770
		return err
771
	}
772

773
	task, err := mgr.workQueue.Query(idc, taskID)
774
	if err != nil {
775
		span.Errorf("found task in workQueue failed: idc[%s], task_id[%s], err[%+v]", idc, taskID, err)
776
		return err
777
	}
778

779
	err = mgr.clusterMgrCli.UpdateMigrateTask(ctx, task.(*proto.MigrateTask))
780
	if err != nil {
781
		span.Warnf("update reclaim task failed: task_id[%s], err[%+v]", taskID, err)
782
	}
783

784
	mgr.taskStatsMgr.ReclaimTask()
785
	return nil
786
}
787

788
// CompleteTask complete repair task
789
func (mgr *DiskRepairMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error {
790
	span := trace.SpanFromContextSafe(ctx)
791

792
	completeTask, err := mgr.workQueue.Complete(args.IDC, args.TaskID, args.Src, args.Dest)
793
	if err != nil {
794
		span.Errorf("complete repair task failed: task_id[%s], err[%+v]", args.TaskID, err)
795
		return err
796
	}
797

798
	t := completeTask.(*proto.MigrateTask)
799
	t.State = proto.MigrateStateWorkCompleted
800

801
	mgr.finishQueue.PushTask(args.TaskID, t)
802
	// as complete func is face to svr api, so can not loop save task
803
	// to db until success, it will make saving task info to be difficult,
804
	// that delay saving task info in finish stage is a simply way
805
	return nil
806
}
807

808
// RenewalTask renewal repair task
809
func (mgr *DiskRepairMgr) RenewalTask(ctx context.Context, idc, taskID string) error {
810
	if !mgr.taskSwitch.Enabled() {
811
		// renewal task stopping will touch off worker to stop task
812
		return proto.ErrTaskPaused
813
	}
814

815
	span := trace.SpanFromContextSafe(ctx)
816
	err := mgr.workQueue.Renewal(idc, taskID)
817
	if err != nil {
818
		span.Warnf("renewal repair task failed: task_id[%s], err[%+v]", taskID, err)
819
	}
820

821
	return err
822
}
823

824
// ReportWorkerTaskStats reports task stats
825
func (mgr *DiskRepairMgr) ReportWorkerTaskStats(st *api.TaskReportArgs) {
826
	mgr.taskStatsMgr.ReportWorkerTaskStats(st.TaskID, st.TaskStats, st.IncreaseDataSizeByte, st.IncreaseShardCnt)
827
}
828

829
// QueryTask return task statistics
830
func (mgr *DiskRepairMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error) {
831
	detail := &api.MigrateTaskDetail{}
832
	taskInfo, err := mgr.clusterMgrCli.GetMigrateTask(ctx, proto.TaskTypeDiskRepair, taskID)
833
	if err != nil {
834
		return detail, err
835
	}
836
	detail.Task = *taskInfo
837

838
	detailRunInfo, err := mgr.taskStatsMgr.QueryTaskDetail(taskID)
839
	if err != nil {
840
		return detail, nil
841
	}
842
	detail.Stat = detailRunInfo.Statistics
843
	return detail, nil
844
}
845

846
// StatQueueTaskCnt returns task queue stats
847
func (mgr *DiskRepairMgr) StatQueueTaskCnt() (inited, prepared, completed int) {
848
	todo, doing := mgr.prepareQueue.StatsTasks()
849
	inited = todo + doing
850

851
	todo, doing = mgr.workQueue.StatsTasks()
852
	prepared = todo + doing
853

854
	todo, doing = mgr.finishQueue.StatsTasks()
855
	completed = todo + doing
856
	return
857
}
858

859
// Stats returns task stats
860
func (mgr *DiskRepairMgr) Stats() api.MigrateTasksStat {
861
	preparing, workerDoing, finishing := mgr.StatQueueTaskCnt()
862
	finishedCnt := mgr.finishTaskCounter.Show()
863
	increaseDataSize, increaseShardCnt := mgr.taskStatsMgr.Counters()
864
	return api.MigrateTasksStat{
865
		PreparingCnt:   preparing,
866
		WorkerDoingCnt: workerDoing,
867
		FinishingCnt:   finishing,
868
		StatsPerMin: api.PerMinStats{
869
			FinishedCnt:    fmt.Sprint(finishedCnt),
870
			DataAmountByte: base.DataMountFormat(increaseDataSize),
871
			ShardCnt:       fmt.Sprint(increaseShardCnt),
872
		},
873
	}
874
}
875

876
// Progress repair manager progress
877
func (mgr *DiskRepairMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int) {
878
	span := trace.SpanFromContextSafe(ctx)
879
	migratingDisks = make([]proto.DiskID, 0)
880

881
	for _, disk := range mgr.repairingDisks.list() {
882
		total += int(disk.UsedChunkCnt)
883
		remainTasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, disk.DiskID)
884
		if err != nil {
885
			span.Errorf("find all task failed: err[%+v]", err)
886
			return migratingDisks, 0, 0
887
		}
888
		migrated += int(disk.UsedChunkCnt) - len(remainTasks)
889
		migratingDisks = append(migratingDisks, disk.DiskID)
890
	}
891
	return
892
}
893

894
func (mgr *DiskRepairMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error) {
895
	span := trace.SpanFromContextSafe(ctx)
896

897
	migratingDisk, ok := mgr.repairingDisks.get(diskID)
898
	if !ok {
899
		err = errors.New("not repairing disk")
900
		return
901
	}
902
	remainTasks, err := mgr.clusterMgrCli.ListAllMigrateTasksByDiskID(ctx, proto.TaskTypeDiskRepair, diskID)
903
	if err != nil {
904
		span.Errorf("find all task failed: err[%+v]", err)
905
		return
906
	}
907
	stats = &api.DiskMigratingStats{}
908
	stats.TotalTasksCnt = int(migratingDisk.UsedChunkCnt)
909
	stats.MigratedTasksCnt = stats.TotalTasksCnt - len(remainTasks)
910
	return
911
}
912

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.