cubefs

Форк
0
/
migrate_test.go 
636 строк · 24.1 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"context"
19
	"errors"
20
	"testing"
21
	"time"
22

23
	"github.com/golang/mock/gomock"
24
	"github.com/stretchr/testify/require"
25

26
	api "github.com/cubefs/cubefs/blobstore/api/scheduler"
27
	"github.com/cubefs/cubefs/blobstore/common/codemode"
28
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
29
	"github.com/cubefs/cubefs/blobstore/common/proto"
30
	"github.com/cubefs/cubefs/blobstore/scheduler/base"
31
	"github.com/cubefs/cubefs/blobstore/scheduler/client"
32
	"github.com/cubefs/cubefs/blobstore/testing/mocks"
33
)
34

35
var MockMigrateVolInfoMap = map[proto.Vid]*client.VolumeInfoSimple{
36
	100: MockGenVolInfo(100, codemode.EC6P6, proto.VolumeStatusIdle),
37
	101: MockGenVolInfo(101, codemode.EC6P10L2, proto.VolumeStatusIdle),
38
	102: MockGenVolInfo(102, codemode.EC6P10L2, proto.VolumeStatusActive),
39
	103: MockGenVolInfo(103, codemode.EC6P6, proto.VolumeStatusLock),
40
	104: MockGenVolInfo(104, codemode.EC6P6, proto.VolumeStatusLock),
41
	105: MockGenVolInfo(105, codemode.EC6P6, proto.VolumeStatusActive),
42

43
	110: MockGenVolInfo(110, codemode.EC6P6, proto.VolumeStatusIdle),
44
	111: MockGenVolInfo(111, codemode.EC6P6, proto.VolumeStatusIdle),
45
	112: MockGenVolInfo(112, codemode.EC6P6, proto.VolumeStatusIdle),
46
	113: MockGenVolInfo(113, codemode.EC6P6, proto.VolumeStatusIdle),
47
	114: MockGenVolInfo(114, codemode.EC6P6, proto.VolumeStatusIdle),
48

49
	300: MockGenVolInfo(300, codemode.EC6P6, proto.VolumeStatusIdle),
50
	301: MockGenVolInfo(301, codemode.EC6P10L2, proto.VolumeStatusIdle),
51
	302: MockGenVolInfo(302, codemode.EC6P10L2, proto.VolumeStatusActive),
52

53
	400: MockGenVolInfo(400, codemode.EC6P6, proto.VolumeStatusIdle),
54
	401: MockGenVolInfo(401, codemode.EC6P10L2, proto.VolumeStatusIdle),
55
	402: MockGenVolInfo(402, codemode.EC6P10L2, proto.VolumeStatusActive),
56
}
57

58
func newMigrateMgr(t *testing.T) *MigrateMgr {
59
	ctr := gomock.NewController(t)
60
	clusterMgr := NewMockClusterMgrAPI(ctr)
61
	taskSwitch := mocks.NewMockSwitcher(ctr)
62

63
	taskLogger := mocks.NewMockRecordLogEncoder(ctr)
64
	volumeUpdater := NewMockVolumeUpdater(ctr)
65
	conf := &MigrateConfig{
66
		ClusterID: 0,
67
		TaskCommonConfig: base.TaskCommonConfig{
68
			PrepareQueueRetryDelayS: 0,
69
			FinishQueueRetryDelayS:  0,
70
			CancelPunishDurationS:   0,
71
			WorkQueueSize:           3,
72
		},
73
	}
74
	mgr := NewMigrateMgr(clusterMgr, volumeUpdater, taskSwitch, taskLogger, conf, proto.TaskTypeBalance)
75
	mgr.SetLockFailHandleFunc(mgr.FinishTaskInAdvanceWhenLockFail)
76
	return mgr
77
}
78

79
func TestMigrateMigrateLoad(t *testing.T) {
80
	mgr := newMigrateMgr(t)
81

82
	{
83
		// load failed
84
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return(nil, errMock)
85
		err := mgr.Load()
86
		require.True(t, errors.Is(err, errMock))
87
	}
88
	{
89
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
90
		t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 5, 101, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
91
		t3 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z1", 6, 102, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
92
		t4 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 105, proto.MigrateStateInited, MockMigrateVolInfoMap)
93
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1, t2, t3, t4}, nil)
94
		err := mgr.Load()
95
		require.NoError(t, err)
96
	}
97
	{
98
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z2", 8, 104, proto.MigrateStateFinished, MockMigrateVolInfoMap)
99
		t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 105, proto.MigrateStateFinishedInAdvance, MockMigrateVolInfoMap)
100
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1}, nil)
101
		err := mgr.Load()
102
		require.Error(t, err)
103

104
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t2}, nil)
105
		err = mgr.Load()
106
		require.Error(t, err)
107
	}
108
	{
109
		t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 5, 101, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
110
		t3 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z1", 6, 101, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
111
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t2, t3}, nil)
112
		err := mgr.Load()
113
		require.Error(t, err)
114

115
		t4 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z2", 7, 103, 100, MockMigrateVolInfoMap)
116
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t4}, nil)
117
		err = mgr.Load()
118
		require.Error(t, err)
119
	}
120
	{
121
		mgr := newMigrateMgr(t)
122
		mgr.taskType = proto.TaskTypeDiskDrop
123

124
		t1 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 1, 110, proto.MigrateStateInited, MockMigrateVolInfoMap)
125
		t2 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 2, 111, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
126
		t3 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z1", 3, 112, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
127
		t4 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 4, 113, proto.MigrateStateInited, MockMigrateVolInfoMap)
128

129
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1, t2, t3, t4}, nil)
130
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListMigratingDisks(any, any).Return([]*client.MigratingDiskMeta{{Disk: testDisk1}, {Disk: testDisk2}}, nil)
131
		err := mgr.Load()
132
		require.NoError(t, err)
133
	}
134
}
135

136
func TestPrepareMigrateTask(t *testing.T) {
137
	ctx := context.Background()
138
	{
139
		// no task
140
		mgr := newMigrateMgr(t)
141
		err := mgr.prepareTask()
142
		require.True(t, errors.Is(err, base.ErrNoTaskInQueue))
143
	}
144
	{
145
		// one task and finish in advance
146
		mgr := newMigrateMgr(t)
147
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
148
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
149
		mgr.AddTask(ctx, t1)
150

151
		// lock failed and send task to queue
152
		err := base.VolTaskLockerInst().TryLock(ctx, 100)
153
		require.NoError(t, err)
154
		err = mgr.prepareTask()
155
		require.True(t, errors.Is(err, base.ErrVolNotOnlyOneTask))
156
		base.VolTaskLockerInst().Unlock(ctx, 100)
157

158
		// get volume info failed
159
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
160
		err = mgr.prepareTask()
161
		require.True(t, errors.Is(err, errMock))
162

163
		// finish task in advance because source chunk has moved
164
		// unlock failed
165
		volume := MockMigrateVolInfoMap[100]
166
		volume.VunitLocations[int(t1.SourceVuid.Index())].Vuid = volume.VunitLocations[int(t1.SourceVuid.Index())].Vuid + 1
167
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
168
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(errMock)
169
		err = mgr.prepareTask()
170
		require.True(t, errors.Is(err, errMock))
171
		// unlock success
172
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
173
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(nil)
174
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
175
		mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(nil)
176
		err = mgr.prepareTask()
177
		require.NoError(t, err)
178
	}
179
	{
180
		// one task and finish in advance because  other migrate task is doing on this volume
181
		mgr := newMigrateMgr(t)
182
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
183
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
184
		mgr.AddTask(ctx, t1)
185

186
		// lock cm volume failed
187
		volume := MockMigrateVolInfoMap[100]
188
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
189
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(errMock)
190
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
191
		mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(errMock)
192
		err := mgr.prepareTask()
193
		require.True(t, errors.Is(err, errMock))
194

195
		// lock failed and call lockFailHandleFunc
196
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
197
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(errcode.ErrLockNotAllow)
198
		err = mgr.prepareTask()
199
		require.NoError(t, err)
200
	}
201
	{
202
		// one task and normal finish
203
		mgr := newMigrateMgr(t)
204
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
205
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
206
		mgr.AddTask(ctx, t1)
207

208
		// lock cm volume failed
209
		volume := MockMigrateVolInfoMap[100]
210
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
211
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(nil)
212

213
		// alloc volume failed
214
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).Return(nil, errMock)
215
		err := mgr.prepareTask()
216
		require.True(t, errors.Is(err, errMock))
217

218
		// alloc success
219
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
220
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().LockVolume(any, any).Return(nil)
221
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
222
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).DoAndReturn(
223
			func(ctx context.Context, vuid proto.Vuid) (*client.AllocVunitInfo, error) {
224
				vid := vuid.Vid()
225
				idx := vuid.Index()
226
				epoch := vuid.Epoch()
227
				epoch++
228
				newVuid, _ := proto.NewVuid(vid, idx, epoch)
229
				return &client.AllocVunitInfo{
230
					VunitLocation: proto.VunitLocation{Vuid: newVuid},
231
				}, nil
232
			})
233
		err = mgr.prepareTask()
234
		require.NoError(t, err)
235
	}
236
}
237

238
func TestFinishMigrateTask(t *testing.T) {
239
	{
240
		// no task
241
		mgr := newMigrateMgr(t)
242
		err := mgr.finishTask()
243
		require.True(t, errors.Is(err, base.ErrNoTaskInQueue))
244
	}
245
	{
246
		// panic :status not eql proto.MigrateStateWorkCompleted
247
		mgr := newMigrateMgr(t)
248
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
249
		mgr.finishQueue.PushTask(t1.TaskID, t1)
250
		require.Panics(t, func() {
251
			mgr.finishTask()
252
		})
253
	}
254
	{
255
		{
256
			// one task and redo success finally
257
			mgr := newMigrateMgr(t)
258
			t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
259
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
260
			mgr.finishQueue.PushTask(t1.TaskID, t1)
261

262
			// update relationship failed
263
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errMock)
264
			err := mgr.finishTask()
265
			require.True(t, errors.Is(err, errMock))
266

267
			// update relationship failed and need redo
268
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrNewVuidNotMatch)
269
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).Return(nil, errMock)
270
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
271
			// alloc failed
272
			err = mgr.finishTask()
273
			require.True(t, errors.Is(err, errMock))
274

275
			// panic
276
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
277
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrOldVuidNotMatch)
278
			require.Panics(t, func() {
279
				mgr.finishTask()
280
			})
281

282
			// redo success
283
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(errcode.ErrNewVuidNotMatch)
284
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AllocVolumeUnit(any, any).DoAndReturn(
285
				func(ctx context.Context, vuid proto.Vuid) (*client.AllocVunitInfo, error) {
286
					vid := vuid.Vid()
287
					idx := vuid.Index()
288
					epoch := vuid.Epoch()
289
					epoch++
290
					newVuid, _ := proto.NewVuid(vid, idx, epoch)
291
					return &client.AllocVunitInfo{
292
						VunitLocation: proto.VunitLocation{Vuid: newVuid},
293
					}, nil
294
				})
295
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Times(2).Return(nil)
296
			err = mgr.finishTask()
297
			require.NoError(t, err)
298
		}
299
		{
300
			// one task and success normal
301
			mgr := newMigrateMgr(t)
302
			t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateWorkCompleted, MockMigrateVolInfoMap)
303
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
304
			mgr.finishQueue.PushTask(t1.TaskID, t1)
305
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
306
			// release failed and update volume cache failed
307
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(errMock)
308
			mgr.volumeUpdater.(*MockVolumeUpdater).EXPECT().UpdateLeaderVolumeCache(any, any).Return(errMock)
309
			err := mgr.finishTask()
310
			require.True(t, errors.Is(err, base.ErrUpdateVolumeCache))
311

312
			// release failed and update volume cache success
313
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
314
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
315
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(errMock)
316
			mgr.volumeUpdater.(*MockVolumeUpdater).EXPECT().UpdateLeaderVolumeCache(any, any).Return(nil)
317
			// unlock volume failed
318
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(errMock)
319
			err = mgr.finishTask()
320
			require.True(t, errors.Is(err, errMock))
321

322
			// update volume success
323
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
324
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().DeleteMigrateTask(any, any).Return(nil)
325
			mgr.taskLogger.(*mocks.MockRecordLogEncoder).EXPECT().Encode(any).Return(nil)
326
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateVolume(any, any, any, any).Return(nil)
327
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ReleaseVolumeUnit(any, any, any).Return(nil)
328
			mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UnlockVolume(any, any).Return(nil)
329
			err = mgr.finishTask()
330
			require.NoError(t, err)
331
		}
332
	}
333
}
334

335
func TestAcquireMigrateTask(t *testing.T) {
336
	ctx := context.Background()
337
	idc := "z0"
338
	{
339
		// task switch is close
340
		mgr := newMigrateMgr(t)
341
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
342
		_, err := mgr.AcquireTask(ctx, idc)
343
		require.True(t, errors.Is(err, proto.ErrTaskPaused))
344
	}
345
	{
346
		// no task in queue
347
		mgr := newMigrateMgr(t)
348
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
349
		_, err := mgr.AcquireTask(ctx, idc)
350
		require.True(t, errors.Is(err, proto.ErrTaskEmpty))
351
	}
352
	{
353
		// one task in queue
354
		mgr := newMigrateMgr(t)
355
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
356
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
357
		mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
358
		task, err := mgr.AcquireTask(ctx, idc)
359
		require.NoError(t, err)
360
		require.Equal(t, t1.TaskID, task.TaskID)
361
	}
362
}
363

364
func TestCancelMigrateTask(t *testing.T) {
365
	ctx := context.Background()
366
	idc := "z0"
367
	{
368
		mgr := newMigrateMgr(t)
369
		err := mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc})
370
		require.Error(t, err)
371
	}
372
	{
373
		mgr := newMigrateMgr(t)
374
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
375
		mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
376

377
		// no such task
378
		err := mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc})
379
		require.Error(t, err)
380

381
		err = mgr.CancelTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
382
		require.NoError(t, err)
383
	}
384
}
385

386
func TestReclaimMigrateTask(t *testing.T) {
387
	ctx := context.Background()
388
	idc := "z0"
389
	{
390
		// no task
391
		mgr := newMigrateMgr(t)
392
		err := mgr.ReclaimTask(ctx, idc, "", nil, proto.VunitLocation{}, &client.AllocVunitInfo{})
393
		require.Error(t, err)
394
	}
395
	{
396
		mgr := newMigrateMgr(t)
397
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
398
		mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
399

400
		// update failed
401
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(errMock)
402
		err := mgr.ReclaimTask(ctx, idc, t1.TaskID, t1.Sources, t1.Destination, &client.AllocVunitInfo{})
403
		require.True(t, errors.Is(err, errMock))
404

405
		// update success
406
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
407
		err = mgr.ReclaimTask(ctx, idc, t1.TaskID, t1.Sources, t1.Destination, &client.AllocVunitInfo{})
408
		require.NoError(t, err)
409
	}
410
}
411

412
func TestCompleteMigrateTask(t *testing.T) {
413
	ctx := context.Background()
414
	idc := "z0"
415
	{
416
		// no task
417
		mgr := newMigrateMgr(t)
418
		err := mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc})
419
		require.Error(t, err)
420
	}
421
	{
422
		mgr := newMigrateMgr(t)
423
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
424
		mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
425

426
		// update failed
427
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(errMock)
428
		err := mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
429
		require.NoError(t, err)
430

431
		// no task in queue
432
		err = mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t1.TaskID, Src: t1.Sources, Dest: t1.Destination})
433
		require.Error(t, err)
434

435
		// update success
436
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().UpdateMigrateTask(any, any).Return(nil)
437
		t2 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
438
		mgr.workQueue.AddPreparedTask(idc, t2.TaskID, t2)
439
		err = mgr.CompleteTask(ctx, &api.OperateTaskArgs{IDC: idc, TaskID: t2.TaskID, Src: t2.Sources, Dest: t2.Destination})
440
		require.NoError(t, err)
441
	}
442
}
443

444
func TestRenewalMigrateTask(t *testing.T) {
445
	ctx := context.Background()
446
	idc := "z0"
447
	{
448
		// task switch is close
449
		mgr := newMigrateMgr(t)
450
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
451
		err := mgr.RenewalTask(ctx, idc, "")
452
		require.True(t, errors.Is(err, proto.ErrTaskPaused))
453
	}
454
	{
455
		// no task
456
		mgr := newMigrateMgr(t)
457
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
458
		err := mgr.RenewalTask(ctx, idc, "")
459
		require.Error(t, err)
460
	}
461
	{
462
		mgr := newMigrateMgr(t)
463
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
464
		t1 := mockGenMigrateTask(proto.TaskTypeManualMigrate, idc, 4, 100, proto.MigrateStatePrepared, MockMigrateVolInfoMap)
465
		mgr.workQueue.AddPreparedTask(idc, t1.TaskID, t1)
466
		err := mgr.RenewalTask(ctx, idc, t1.TaskID)
467
		require.NoError(t, err)
468
	}
469
}
470

471
func TestAddMigrateTask(t *testing.T) {
472
	{
473
		ctx := context.Background()
474
		mgr := newMigrateMgr(t)
475
		mgr.taskType = proto.TaskTypeDiskDrop
476
		t1 := mockGenMigrateTask(proto.TaskTypeDiskDrop, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
477
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
478
		mgr.AddTask(ctx, t1)
479
		require.False(t, mgr.IsMigratingDisk(proto.DiskID(4)))
480

481
		mgr.taskType = proto.TaskTypeManualMigrate
482
		t1 = mockGenMigrateTask(proto.TaskTypeManualMigrate, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
483
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
484
		mgr.AddTask(ctx, t1)
485
		require.False(t, mgr.IsMigratingDisk(proto.DiskID(4)))
486
	}
487
	{
488
		ctx := context.Background()
489
		mgr := newMigrateMgr(t)
490
		t1 := mockGenMigrateTask(proto.TaskTypeBalance, "z0", 4, 100, proto.MigrateStateInited, MockMigrateVolInfoMap)
491
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().AddMigrateTask(any, any).Return(nil)
492
		mgr.AddTask(ctx, t1)
493
		require.True(t, mgr.IsMigratingDisk(proto.DiskID(4)))
494
		require.False(t, mgr.IsMigratingDisk(proto.DiskID(5)))
495
		require.Equal(t, 1, mgr.GetMigratingDiskNum())
496

497
		inited, prepared, completed := mgr.StatQueueTaskCnt()
498
		require.Equal(t, 1, inited)
499
		require.Equal(t, 0, prepared)
500
		require.Equal(t, 0, completed)
501

502
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasks(any, any).Return([]*proto.MigrateTask{t1}, nil)
503
		tasks, err := mgr.ListAllTask(ctx)
504
		require.NoError(t, err)
505
		require.Equal(t, 1, len(tasks))
506

507
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(t1, nil)
508
		task, err := mgr.GetTask(ctx, t1.TaskID)
509
		require.NoError(t, err)
510
		require.Equal(t, t1.TaskID, task.TaskID)
511

512
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListAllMigrateTasksByDiskID(any, any, any).Return([]*proto.MigrateTask{}, nil)
513
		_, err = mgr.ListAllTaskByDiskID(ctx, proto.DiskID(1))
514
		require.NoError(t, err)
515
	}
516
}
517

518
func TestDeletedTasks(t *testing.T) {
519
	{
520
		mgr := newMigrateMgr(t)
521
		mgr.taskType = proto.TaskTypeDiskDrop
522
		diskID1 := proto.DiskID(1)
523
		diskID2 := proto.DiskID(2)
524
		task1 := &proto.MigrateTask{
525
			TaskID:       "task1",
526
			SourceDiskID: diskID1,
527
		}
528
		task2 := &proto.MigrateTask{
529
			TaskID:       "task2",
530
			SourceDiskID: diskID1,
531
		}
532
		task3 := &proto.MigrateTask{
533
			TaskID:       "task3",
534
			SourceDiskID: diskID2,
535
		}
536
		mgr.addDeletedTask(task1)
537
		mgr.addDeletedTask(task2)
538
		mgr.addDeletedTask(task3)
539

540
		require.True(t, mgr.IsDeletedTask(task1))
541
		require.True(t, mgr.IsDeletedTask(task2))
542
		require.True(t, mgr.IsDeletedTask(task3))
543

544
		mgr.ClearDeletedTasks(diskID1)
545
		require.False(t, mgr.IsDeletedTask(task1))
546
		require.True(t, mgr.IsDeletedTask(task3))
547
	}
548
	{
549
		mgr := newMigrateMgr(t)
550
		diskID1 := proto.DiskID(1)
551
		task1 := &proto.MigrateTask{
552
			TaskID:       "task1",
553
			SourceDiskID: diskID1,
554
		}
555
		mgr.addDeletedTask(task1)
556
		require.True(t, mgr.IsDeletedTask(task1))
557
		require.Equal(t, 1, len(mgr.DeletedTasks()))
558

559
		mgr.ClearDeletedTaskByID(task1.SourceDiskID, task1.TaskID)
560
		require.Equal(t, 0, len(mgr.DeletedTasks()))
561
		require.False(t, mgr.IsDeletedTask(task1))
562

563
		mgr.ClearDeletedTaskByID(task1.SourceDiskID, task1.TaskID)
564
	}
565
}
566

567
func TestMigrateRun(t *testing.T) {
568
	mgr := newMigrateMgr(t)
569
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().AnyTimes().Return()
570
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().AnyTimes().Return(true)
571
	mgr.Run()
572

573
	// wait to run
574
	time.Sleep(2 * time.Millisecond)
575
}
576

577
func TestMigrateQueryTask(t *testing.T) {
578
	ctx := context.Background()
579
	taskID := "task_id"
580
	mgr := newMigrateMgr(t)
581

582
	mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(nil, errMock)
583
	_, err := mgr.QueryTask(ctx, taskID)
584
	require.ErrorIs(t, errMock, err)
585

586
	mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetMigrateTask(any, any, any).Return(&proto.MigrateTask{}, nil)
587
	_, err = mgr.QueryTask(ctx, taskID)
588
	require.NoError(t, err)
589
}
590

591
func TestMigrateReportWorkerTaskStats(t *testing.T) {
592
	mgr := newMigrateMgr(t)
593
	mgr.ReportWorkerTaskStats(&api.TaskReportArgs{
594
		TaskID:               "task_id",
595
		IncreaseDataSizeByte: 1,
596
		IncreaseShardCnt:     1,
597
	})
598
}
599

600
func TestMigrateStatQueueTaskCnt(t *testing.T) {
601
	mgr := newMigrateMgr(t)
602
	inited, prepared, completed := mgr.StatQueueTaskCnt()
603
	require.Equal(t, 0, inited)
604
	require.Equal(t, 0, prepared)
605
	require.Equal(t, 0, completed)
606
}
607

608
func TestMigrateStats(t *testing.T) {
609
	mgr := newMigrateMgr(t)
610
	mgr.Stats()
611
}
612

613
func TestMigrateAction(t *testing.T) {
614
	mgr := newMigrateMgr(t)
615
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().Return()
616
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
617

618
	mgr.WaitEnable()
619
	require.True(t, mgr.Enabled())
620

621
	select {
622
	case <-mgr.Done():
623
		require.Fail(t, "cannot be there")
624
	default:
625
	}
626

627
	mgr.Close()
628

629
	select {
630
	case <-mgr.Done():
631
	default:
632
		require.Fail(t, "cannot be there")
633
	}
634

635
	mgr.Close()
636
}
637

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.