1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/golang/mock/gomock"
24
"github.com/stretchr/testify/require"
26
"github.com/cubefs/cubefs/blobstore/common/codemode"
27
"github.com/cubefs/cubefs/blobstore/common/proto"
28
"github.com/cubefs/cubefs/blobstore/scheduler/client"
29
"github.com/cubefs/cubefs/blobstore/testing/mocks"
32
func genMockFailShards(vid proto.Vid, bids []proto.BlobID) []*proto.MissedShard {
33
vuid, _ := proto.NewVuid(vid, 1, 1)
34
var FailShards []*proto.MissedShard
35
for _, bid := range bids {
36
FailShards = append(FailShards, &proto.MissedShard{Vuid: vuid, Bid: bid})
41
func TestTaskTimeout(t *testing.T) {
42
task := inspectTaskInfo{}
43
require.NoError(t, task.tryAcquire())
44
require.Equal(t, false, task.timeout(5))
45
time.Sleep(10 * time.Millisecond)
46
require.Equal(t, true, task.timeout(5))
47
require.Equal(t, false, task.timeout(10000))
50
func TestBadShardDeduplicator(t *testing.T) {
51
d := newBadShardDeduplicator(3)
52
require.Equal(t, false, d.reduplicate(1, 1, []uint8{1, 2}))
54
d.add(1, 1, []uint8{1, 2})
55
require.Equal(t, true, d.reduplicate(1, 1, []uint8{1, 2}))
57
require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2}))
58
d.add(2, 1, []uint8{1, 2})
60
require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2, 3}))
61
d.add(2, 1, []uint8{1, 2, 3})
63
d.add(2, 2, []uint8{1, 2, 3})
64
require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2, 3}))
67
func newInspector(t *testing.T) *VolumeInspectMgr {
68
ctr := gomock.NewController(t)
69
clusterMgr := NewMockClusterMgrAPI(ctr)
70
taskSwitch := mocks.NewMockSwitcher(ctr)
71
shardRepairSender := NewMockMqProxyAPI(ctr)
72
conf := &VolumeInspectMgrCfg{InspectIntervalS: defaultInspectIntervalS, TimeoutMs: 1}
73
return NewVolumeInspectMgr(clusterMgr, shardRepairSender, taskSwitch, conf)
76
func TestInspectorRun(t *testing.T) {
77
mgr := newInspector(t)
79
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().AnyTimes().Return()
80
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().AnyTimes().Return(true)
81
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(nil, errMock)
82
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).AnyTimes().Return(errMock)
84
require.True(t, mgr.Enabled())
87
time.Sleep(defaultInspectIntervalS * time.Second)
91
func TestInspectorPrepare(t *testing.T) {
92
ctx := context.Background()
94
mgr := newInspector(t)
95
mgr.cfg.InspectBatch = 2
96
mgr.cfg.ListVolStep = 2
98
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
99
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
103
mgr := newInspector(t)
104
mgr.cfg.InspectBatch = 2
105
mgr.cfg.ListVolStep = 2
107
volume1 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
108
volume2 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
109
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
110
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume1}, proto.Vid(0), nil)
111
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume2}, proto.Vid(0), nil)
112
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
115
require.Equal(t, 1, len(mgr.tasks))
118
mgr := newInspector(t)
119
mgr.firstPrepare = false
120
mgr.cfg.InspectBatch = 2
121
mgr.cfg.ListVolStep = 2
123
volume1 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
124
volume2 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
125
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
126
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume1}, proto.Vid(0), nil)
127
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume2}, proto.Vid(0), nil)
128
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
131
require.Equal(t, 1, len(mgr.tasks))
135
func TestInspectorWaitCompleted(t *testing.T) {
136
ctx := context.Background()
138
mgr := newInspector(t)
140
mgr.cfg.InspectBatch = 1
141
mgr.cfg.ListVolStep = 1
143
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
144
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
145
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
148
require.Equal(t, 1, len(mgr.tasks))
150
for _, task := range mgr.tasks {
151
task.ret = &proto.VolumeInspectRet{}
153
mgr.waitCompleted(ctx)
157
func TestInspectorFinish(t *testing.T) {
158
ctx := context.Background()
160
mgr := newInspector(t)
162
mgr.cfg.InspectBatch = 1
163
mgr.cfg.ListVolStep = 1
165
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
166
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
167
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
170
require.Equal(t, 1, len(mgr.tasks))
172
for _, task := range mgr.tasks {
173
task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
175
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
176
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
178
require.Equal(t, 0, len(mgr.tasks))
181
mgr := newInspector(t)
183
mgr.cfg.InspectBatch = 1
184
mgr.cfg.ListVolStep = 1
186
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
187
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
188
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
191
require.Equal(t, 1, len(mgr.tasks))
193
for _, task := range mgr.tasks {
194
task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
196
volume = MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
197
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
198
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
200
require.Equal(t, 0, len(mgr.tasks))
203
mgr := newInspector(t)
205
mgr.cfg.InspectBatch = 1
206
mgr.cfg.ListVolStep = 1
208
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
209
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
210
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
213
require.Equal(t, 1, len(mgr.tasks))
215
for _, task := range mgr.tasks {
216
task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
218
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
219
mgr.repairShardSender.(*MockMqProxyAPI).EXPECT().SendShardRepairMsg(any, any, any, any).Return(errMock)
220
mgr.repairShardSender.(*MockMqProxyAPI).EXPECT().SendShardRepairMsg(any, any, any, any).AnyTimes().Return(nil)
221
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
224
require.Equal(t, 0, len(mgr.tasks))
228
func TestInspectorAcquire(t *testing.T) {
229
ctx := context.Background()
231
mgr := newInspector(t)
232
mgr.enableAcquire(false)
233
_, err := mgr.AcquireInspect(ctx)
234
require.True(t, errors.Is(err, errForbiddenAcquire))
237
mgr := newInspector(t)
238
mgr.enableAcquire(true)
239
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
240
_, err := mgr.AcquireInspect(ctx)
241
require.True(t, errors.Is(err, proto.ErrTaskPaused))
244
mgr := newInspector(t)
245
mgr.enableAcquire(true)
246
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
247
_, err := mgr.AcquireInspect(ctx)
248
require.True(t, errors.Is(err, proto.ErrTaskEmpty))
251
mgr := newInspector(t)
252
mgr.enableAcquire(true)
253
mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
255
mgr.cfg.InspectBatch = 1
256
mgr.cfg.ListVolStep = 1
258
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
259
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
260
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
263
require.Equal(t, 1, len(mgr.tasks))
266
for k := range mgr.tasks {
270
task, err := mgr.AcquireInspect(ctx)
271
require.NoError(t, err)
272
require.Equal(t, mgr.tasks[taskID].t.TaskID, task.TaskID)
276
func TestInspectorComplete(t *testing.T) {
277
ctx := context.Background()
279
mgr := newInspector(t)
280
mgr.enableAcquire(false)
282
mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{})
285
mgr := newInspector(t)
286
mgr.enableAcquire(true)
288
mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{})
291
mgr := newInspector(t)
292
mgr.enableAcquire(true)
294
mgr.cfg.InspectBatch = 1
295
mgr.cfg.ListVolStep = 1
297
volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
298
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
299
mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
302
require.Equal(t, 1, len(mgr.tasks))
305
for k := range mgr.tasks {
309
mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{TaskID: taskID})
313
func TestInspectorGetTaskStats(t *testing.T) {
314
mgr := newInspector(t)