1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/golang/mock/gomock"
24
"github.com/stretchr/testify/require"
26
api "github.com/cubefs/cubefs/blobstore/api/scheduler"
27
"github.com/cubefs/cubefs/blobstore/common/codemode"
28
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
29
"github.com/cubefs/cubefs/blobstore/common/proto"
30
"github.com/cubefs/cubefs/blobstore/scheduler/base"
31
"github.com/cubefs/cubefs/blobstore/scheduler/client"
32
"github.com/cubefs/cubefs/blobstore/testing/mocks"
35
var MockMigrateVolInfoMap = map[proto.Vid]*client.VolumeInfoSimple{
36
100: MockGenVolInfo(100, codemode.EC6P6, proto.VolumeStatusIdle),
37
101: MockGenVolInfo(101, codemode.EC6P10L2, proto.VolumeStatusIdle),
38
102: MockGenVolInfo(102, codemode.EC6P10L2, proto.VolumeStatusActive),
39
103: MockGenVolInfo(103, codemode.EC6P6, proto.VolumeStatusLock),
40
104: MockGenVolInfo(104, codemode.EC6P6, proto.VolumeStatusLock),
41
105: MockGenVolInfo(105, codemode.EC6P6, proto.VolumeStatusActive),
43
110: MockGenVolInfo(110, codemode.EC6P6, proto.VolumeStatusIdle),
44
111: MockGenVolInfo(111, codemode.EC6P6, proto.VolumeStatusIdle),
45
112: MockGenVolInfo(112, codemode.EC6P6, proto.VolumeStatusIdle),
46
113: MockGenVolInfo(113, codemode.EC6P6, proto.VolumeStatusIdle),
47
114: MockGenVolInfo(114, codemode.EC6P6, proto.VolumeStatusIdle),
49
300: MockGenVolInfo(300, codemode.EC6P6, proto.VolumeStatusIdle),
50
301: MockGenVolInfo(301, codemode.EC6P10L2, proto.VolumeStatusIdle),
51
302: MockGenVolInfo(302, codemode.EC6P10L2, proto.VolumeStatusActive),
53
400: MockGenVolInfo(400, codemode.EC6P6, proto.VolumeStatusIdle),
54
401: MockGenVolInfo(401, codemode.EC6P10L2, proto.VolumeStatusIdle),
55
402: MockGenVolInfo(402, codemode.EC6P10L2, proto.VolumeStatusActive),
58
func newMigrateMgr(t *testing.T) *MigrateMgr {
59
ctr := gomock.NewController(t)
60
clusterMgr := NewMockClusterMgrAPI(ctr)
61
taskSwitch := mocks.NewMockSwitcher(ctr)
63
taskLogger := mocks.NewMockRecordLogEncoder(ctr)
64
volumeUpdater := NewMockVolumeUpdater(ctr)
65
conf := &MigrateConfig{
67
TaskCommonConfig: base.TaskCommonConfig{
68
PrepareQueueRetryDelayS: 0,
69
FinishQueueRetryDelayS: 0,
70
CancelPunishDurationS: 0,
74
mgr := NewMigrateMgr(clusterMgr, volumeUpdater, taskSwitch, taskLogger, conf, proto.TaskTypeBalance)
75
mgr.SetLockFailHandleFunc(mgr.FinishTaskInAdvanceWhenLockFail)
79
func TestMigrateMigrateLoad(t *testing.T) {
80
mgr := newMigrateMgr(t)
84
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return(nil, errMock)
86
require.True(t, errors.Is(err, errMock))
89
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
90
t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 5, 101, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
91
t3 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z1", 6, 102, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
92
t4 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 105, proto.MigrateStateInited, MockMigrateVolInfoMap)
93
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1, t2, t3, t4}, nil)
95
require.NoError(t, err)
98
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z2", 8, 104, proto.MigrateStateFinished, MockMigrateVolInfoMap)
99
t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 105, proto.MigrateStateFinishedInAdvance, MockMigrateVolInfoMap)
100
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1}, nil)
102
require.Error(t, err)
104
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t2}, nil)
106
require.Error(t, err)
109
t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 5, 101, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
110
t3 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z1", 6, 101, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
111
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t2, t3}, nil)
113
require.Error(t, err)
115
t4 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z2", 7, 103, 100, MockMigrateVolInfoMap)
116
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t4}, nil)
118
require.Error(t, err)
121
mgr := newMigrateMgr(t)
122
mgr.taskType = proto.TaskTypeDiskDrop
124
t1 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 1, 110, proto.MigrateStateInited, MockMigrateVolInfoMap)
125
t2 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 2, 111, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
126
t3 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z1", 3, 112, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
127
t4 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 4, 113, proto.MigrateStateInited, MockMigrateVolInfoMap)
129
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1, t2, t3, t4}, nil)
130
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListMigratingDisks(any, any).Return([]*client.MigratingDiskMeta{{Disk: testDisk1}, {Disk: testDisk2}}, nil)
132
require.NoError(t, err)
136
func TestPrepareMigrateTask(t *testing.T) {
137
ctx := context.Background()
140
mgr := newMigrateMgr(t)
141
err := mgr.prepareTask()
142
require.True(t, errors.Is(err, base.ErrNoTaskInQueue))
145
// one task and finish in advance
146
mgr := newMigrateMgr(t)
147
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
148
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
151
// lock failed and send task to queue
152
err := base.VolTaskLockerInst().TryLock(ctx, 100)
153
require.NoError(t, err)
154
err = mgr.prepareTask()
155
require.True(t, errors.Is(err, base.ErrVolNotOnlyOneTask))
156
base.VolTaskLockerInst().Unlock(ctx, 100)
158
// get volume info failed
159
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
160
err = mgr.prepareTask()
161
require.True(t, errors.Is(err, errMock))
163
// finish task in advance because source chunk has moved
165
volume := MockMigrateVolInfoMap[100]
166
volume.VunitLocations[int(t1.SourceVuid.Index())].Vuid = volume.VunitLocations[int(t1.SourceVuid.Index())].Vuid + 1
167
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
168
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(errMock)
169
err = mgr.prepareTask()
170
require.True(t, errors.Is(err, errMock))
172
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
173
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(nil)
174
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
175
mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(nil)
176
err = mgr.prepareTask()
177
require.NoError(t, err)
180
// one task and finish in advance because other migrate task is doing on this volume
181
mgr := newMigrateMgr(t)
182
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
183
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
186
// lock cm volume failed
187
volume := MockMigrateVolInfoMap[100]
188
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
189
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(errMock)
190
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
191
mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(errMock)
192
err := mgr.prepareTask()
193
require.True(t, errors.Is(err, errMock))
195
// lock failed and call lockFailHandleFunc
196
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
197
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(errcode.ErrLockNotAllow)
198
err = mgr.prepareTask()
199
require.NoError(t, err)
202
// one task and normal finish
203
mgr := newMigrateMgr(t)
204
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
205
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
208
// lock cm volume failed
209
volume := MockMigrateVolInfoMap[100]
210
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
211
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(nil)
213
// alloc volume failed
214
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).Return(nil, errMock)
215
err := mgr.prepareTask()
216
require.True(t, errors.Is(err, errMock))
219
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
220
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(nil)
221
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
222
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).DoAndReturn(
223
func(ctx context.Context, vuid proto.Vuid) (*client.AllocVunitInfo, error) {
226
epoch := vuid.Epoch()
228
newVuid, _ := proto.NewVuid(vid, idx, epoch)
229
return &client.AllocVunitInfo{
230
VunitLocation: proto.VunitLocation{Vuid: newVuid},
233
err = mgr.prepareTask()
234
require.NoError(t, err)
238
func TestFinishMigrateTask(t *testing.T) {
241
mgr := newMigrateMgr(t)
242
err := mgr.finishTask()
243
require.True(t, errors.Is(err, base.ErrNoTaskInQueue))
246
// panic :status not eql proto.MigrateStateWorkCompleted
247
mgr := newMigrateMgr(t)
248
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
249
mgr.finishQueue.PushTask(t1.TaskID, t1)
250
require.Panics(t, func() {
256
// one task and redo success finally
257
mgr := newMigrateMgr(t)
258
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
259
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
260
mgr.finishQueue.PushTask(t1.TaskID, t1)
262
// update relationship failed
263
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errMock)
264
err := mgr.finishTask()
265
require.True(t, errors.Is(err, errMock))
267
// update relationship failed and need redo
268
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrNewVuidNotMatch)
269
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).Return(nil, errMock)
270
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
272
err = mgr.finishTask()
273
require.True(t, errors.Is(err, errMock))
276
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
277
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrOldVuidNotMatch)
278
require.Panics(t, func() {
283
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrNewVuidNotMatch)
284
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).DoAndReturn(
285
func(ctx context.Context, vuid proto.Vuid) (*client.AllocVunitInfo, error) {
288
epoch := vuid.Epoch()
290
newVuid, _ := proto.NewVuid(vid, idx, epoch)
291
return &client.AllocVunitInfo{
292
VunitLocation: proto.VunitLocation{Vuid: newVuid},
295
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Times(2).Return(nil)
296
err = mgr.finishTask()
297
require.NoError(t, err)
300
// one task and success normal
301
mgr := newMigrateMgr(t)
302
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
303
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
304
mgr.finishQueue.PushTask(t1.TaskID, t1)
305
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
306
// release failed and update volume cache failed
307
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(errMock)
308
mgr.volumeUpdater.(*MockVolumeUpdater).EXPECT().UpdateLeaderVolumeCache(any, any).Return(errMock)
309
err := mgr.finishTask()
310
require.True(t, errors.Is(err, base.ErrUpdateVolumeCache))
312
// release failed and update volume cache success
313
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
314
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
315
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(errMock)
316
mgr.volumeUpdater.(*MockVolumeUpdater).EXPECT().UpdateLeaderVolumeCache(any, any).Return(nil)
317
// unlock volume failed
318
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(errMock)
319
err = mgr.finishTask()
320
require.True(t, errors.Is(err, errMock))
322
// update volume success
323
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
324
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
325
mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(nil)
326
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
327
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(nil)
328
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(nil)
329
err = mgr.finishTask()
330
require.NoError(t, err)
335
func TestAcquireMigrateTask(t *testing.T) {
336
ctx := context.Background()
339
// task switch is close
340
mgr := newMigrateMgr(t)
341
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
342
_, err := mgr.AcquireTask(ctx, idc)
343
require.True(t, errors.Is(err, proto.ErrTaskPaused))
347
mgr := newMigrateMgr(t)
348
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
349
_, err := mgr.AcquireTask(ctx, idc)
350
require.True(t, errors.Is(err, proto.ErrTaskEmpty))
354
mgr := newMigrateMgr(t)
355
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
356
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
357
mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
358
task, err := mgr.AcquireTask(ctx, idc)
359
require.NoError(t, err)
360
require.Equal(t, t1.TaskID, task.TaskID)
364
func TestCancelMigrateTask(t *testing.T) {
365
ctx := context.Background()
368
mgr := newMigrateMgr(t)
369
err := mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc})
370
require.Error(t, err)
373
mgr := newMigrateMgr(t)
374
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
375
mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
378
err := mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc})
379
require.Error(t, err)
381
err = mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
382
require.NoError(t, err)
386
func TestReclaimMigrateTask(t *testing.T) {
387
ctx := context.Background()
391
mgr := newMigrateMgr(t)
392
err := mgr.ReclaimTask(ctx, idc, "", nil, proto.VunitLocation{}, &client.AllocVunitInfo{})
393
require.Error(t, err)
396
mgr := newMigrateMgr(t)
397
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
398
mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
401
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(errMock)
402
err := mgr.ReclaimTask(ctx, idc, t1.TaskID, t1.Sources, t1.Destination, &client.AllocVunitInfo{})
403
require.True(t, errors.Is(err, errMock))
406
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
407
err = mgr.ReclaimTask(ctx, idc, t1.TaskID, t1.Sources, t1.Destination, &client.AllocVunitInfo{})
408
require.NoError(t, err)
412
func TestCompleteMigrateTask(t *testing.T) {
413
ctx := context.Background()
417
mgr := newMigrateMgr(t)
418
err := mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc})
419
require.Error(t, err)
422
mgr := newMigrateMgr(t)
423
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
424
mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
427
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(errMock)
428
err := mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
429
require.NoError(t, err)
432
err = mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
433
require.Error(t, err)
436
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
437
t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
438
mgr.workQueue.AddPreparedTask(idc, t2.TaskID, t2)
439
err = mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t2.TaskID, Src: t2.Sources, Dest: t2.Destination})
440
require.NoError(t, err)
444
func TestRenewalMigrateTask(t *testing.T) {
445
ctx := context.Background()
448
// task switch is close
449
mgr := newMigrateMgr(t)
450
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
451
err := mgr.RenewalTask(ctx, idc, "")
452
require.True(t, errors.Is(err, proto.ErrTaskPaused))
456
mgr := newMigrateMgr(t)
457
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
458
err := mgr.RenewalTask(ctx, idc, "")
459
require.Error(t, err)
462
mgr := newMigrateMgr(t)
463
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
464
t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
465
mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
466
err := mgr.RenewalTask(ctx, idc, t1.TaskID)
467
require.NoError(t, err)
471
func TestAddMigrateTask(t *testing.T) {
473
ctx := context.Background()
474
mgr := newMigrateMgr(t)
475
mgr.taskType = proto.TaskTypeDiskDrop
476
t1 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
477
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
479
require.False(t, mgr.IsMigratingDisk(proto.DiskID(4)))
481
mgr.taskType = proto.TaskTypeManualMigrate
482
t1 = mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
483
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
485
require.False(t, mgr.IsMigratingDisk(proto.DiskID(4)))
488
ctx := context.Background()
489
mgr := newMigrateMgr(t)
490
t1 := mockGenMigrateTask(proto.TaskTypeBalance, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
491
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
493
require.True(t, mgr.IsMigratingDisk(proto.DiskID(4)))
494
require.False(t, mgr.IsMigratingDisk(proto.DiskID(5)))
495
require.Equal(t, 1, mgr.GetMigratingDiskNum())
497
inited, prepared, completed := mgr.StatQueueTaskCnt()
498
require.Equal(t, 1, inited)
499
require.Equal(t, 0, prepared)
500
require.Equal(t, 0, completed)
502
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1}, nil)
503
tasks, err := mgr.ListAllTask(ctx)
504
require.NoError(t, err)
505
require.Equal(t, 1, len(tasks))
507
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(t1, nil)
508
task, err := mgr.GetTask(ctx, t1.TaskID)
509
require.NoError(t, err)
510
require.Equal(t, t1.TaskID, task.TaskID)
512
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasksByDiskID(any, any, any).Return([]*proto.MigrateTask{}, nil)
513
_, err = mgr.ListAllTaskByDiskID(ctx, proto.DiskID(1))
514
require.NoError(t, err)
518
func TestDeletedTasks(t *testing.T) {
520
mgr := newMigrateMgr(t)
521
mgr.taskType = proto.TaskTypeDiskDrop
522
diskID1 := proto.DiskID(1)
523
diskID2 := proto.DiskID(2)
524
task1 := &proto.MigrateTask{
526
SourceDiskID: diskID1,
528
task2 := &proto.MigrateTask{
530
SourceDiskID: diskID1,
532
task3 := &proto.MigrateTask{
534
SourceDiskID: diskID2,
536
mgr.addDeletedTask(task1)
537
mgr.addDeletedTask(task2)
538
mgr.addDeletedTask(task3)
540
require.True(t, mgr.IsDeletedTask(task1))
541
require.True(t, mgr.IsDeletedTask(task2))
542
require.True(t, mgr.IsDeletedTask(task3))
544
mgr.ClearDeletedTasks(diskID1)
545
require.False(t, mgr.IsDeletedTask(task1))
546
require.True(t, mgr.IsDeletedTask(task3))
549
mgr := newMigrateMgr(t)
550
diskID1 := proto.DiskID(1)
551
task1 := &proto.MigrateTask{
553
SourceDiskID: diskID1,
555
mgr.addDeletedTask(task1)
556
require.True(t, mgr.IsDeletedTask(task1))
557
require.Equal(t, 1, len(mgr.DeletedTasks()))
559
mgr.ClearDeletedTaskByID(task1.SourceDiskID, task1.TaskID)
560
require.Equal(t, 0, len(mgr.DeletedTasks()))
561
require.False(t, mgr.IsDeletedTask(task1))
563
mgr.ClearDeletedTaskByID(task1.SourceDiskID, task1.TaskID)
567
func TestMigrateRun(t *testing.T) {
568
mgr := newMigrateMgr(t)
569
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().AnyTimes().Return()
570
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().AnyTimes().Return(true)
574
time.Sleep(2 * time.Millisecond)
577
func TestMigrateQueryTask(t *testing.T) {
578
ctx := context.Background()
580
mgr := newMigrateMgr(t)
582
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(nil, errMock)
583
_, err := mgr.QueryTask(ctx, taskID)
584
require.ErrorIs(t, errMock, err)
586
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(&proto.MigrateTask{}, nil)
587
_, err = mgr.QueryTask(ctx, taskID)
588
require.NoError(t, err)
591
func TestMigrateReportWorkerTaskStats(t *testing.T) {
592
mgr := newMigrateMgr(t)
593
mgr.ReportWorkerTaskStats(&api.TaskReportArgs{
595
IncreaseDataSizeByte: 1,
600
func TestMigrateStatQueueTaskCnt(t *testing.T) {
601
mgr := newMigrateMgr(t)
602
inited, prepared, completed := mgr.StatQueueTaskCnt()
603
require.Equal(t, 0, inited)
604
require.Equal(t, 0, prepared)
605
require.Equal(t, 0, completed)
608
func TestMigrateStats(t *testing.T) {
609
mgr := newMigrateMgr(t)
613
func TestMigrateAction(t *testing.T) {
614
mgr := newMigrateMgr(t)
615
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().Return()
616
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
619
require.True(t, mgr.Enabled())
623
require.Fail(t, "cannot be there")
632
require.Fail(t, "cannot be there")