cubefs

Форк
0
/
volume_inspector_test.go 
316 строк · 11.0 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"context"
19
	"errors"
20
	"testing"
21
	"time"
22

23
	"github.com/golang/mock/gomock"
24
	"github.com/stretchr/testify/require"
25

26
	"github.com/cubefs/cubefs/blobstore/common/codemode"
27
	"github.com/cubefs/cubefs/blobstore/common/proto"
28
	"github.com/cubefs/cubefs/blobstore/scheduler/client"
29
	"github.com/cubefs/cubefs/blobstore/testing/mocks"
30
)
31

32
func genMockFailShards(vid proto.Vid, bids []proto.BlobID) []*proto.MissedShard {
33
	vuid, _ := proto.NewVuid(vid, 1, 1)
34
	var FailShards []*proto.MissedShard
35
	for _, bid := range bids {
36
		FailShards = append(FailShards, &proto.MissedShard{Vuid: vuid, Bid: bid})
37
	}
38
	return FailShards
39
}
40

41
func TestTaskTimeout(t *testing.T) {
42
	task := inspectTaskInfo{}
43
	require.NoError(t, task.tryAcquire())
44
	require.Equal(t, false, task.timeout(5))
45
	time.Sleep(10 * time.Millisecond)
46
	require.Equal(t, true, task.timeout(5))
47
	require.Equal(t, false, task.timeout(10000))
48
}
49

50
func TestBadShardDeduplicator(t *testing.T) {
51
	d := newBadShardDeduplicator(3)
52
	require.Equal(t, false, d.reduplicate(1, 1, []uint8{1, 2}))
53

54
	d.add(1, 1, []uint8{1, 2})
55
	require.Equal(t, true, d.reduplicate(1, 1, []uint8{1, 2}))
56

57
	require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2}))
58
	d.add(2, 1, []uint8{1, 2})
59

60
	require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2, 3}))
61
	d.add(2, 1, []uint8{1, 2, 3})
62

63
	d.add(2, 2, []uint8{1, 2, 3})
64
	require.Equal(t, false, d.reduplicate(2, 1, []uint8{1, 2, 3}))
65
}
66

67
func newInspector(t *testing.T) *VolumeInspectMgr {
68
	ctr := gomock.NewController(t)
69
	clusterMgr := NewMockClusterMgrAPI(ctr)
70
	taskSwitch := mocks.NewMockSwitcher(ctr)
71
	shardRepairSender := NewMockMqProxyAPI(ctr)
72
	conf := &VolumeInspectMgrCfg{InspectIntervalS: defaultInspectIntervalS, TimeoutMs: 1}
73
	return NewVolumeInspectMgr(clusterMgr, shardRepairSender, taskSwitch, conf)
74
}
75

76
func TestInspectorRun(t *testing.T) {
77
	mgr := newInspector(t)
78

79
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().WaitEnable().AnyTimes().Return()
80
	mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().AnyTimes().Return(true)
81
	mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(nil, errMock)
82
	mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).AnyTimes().Return(errMock)
83

84
	require.True(t, mgr.Enabled())
85
	go mgr.Run()
86

87
	time.Sleep(defaultInspectIntervalS * time.Second)
88
	mgr.Close()
89
}
90

91
func TestInspectorPrepare(t *testing.T) {
92
	ctx := context.Background()
93
	{
94
		mgr := newInspector(t)
95
		mgr.cfg.InspectBatch = 2
96
		mgr.cfg.ListVolStep = 2
97

98
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
99
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
100
		mgr.prepare(ctx)
101
	}
102
	{
103
		mgr := newInspector(t)
104
		mgr.cfg.InspectBatch = 2
105
		mgr.cfg.ListVolStep = 2
106

107
		volume1 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
108
		volume2 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
109
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
110
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume1}, proto.Vid(0), nil)
111
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume2}, proto.Vid(0), nil)
112
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
113

114
		mgr.prepare(ctx)
115
		require.Equal(t, 1, len(mgr.tasks))
116
	}
117
	{
118
		mgr := newInspector(t)
119
		mgr.firstPrepare = false
120
		mgr.cfg.InspectBatch = 2
121
		mgr.cfg.ListVolStep = 2
122

123
		volume1 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
124
		volume2 := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
125
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
126
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume1}, proto.Vid(0), nil)
127
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume2}, proto.Vid(0), nil)
128
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return(nil, proto.Vid(0), nil)
129

130
		mgr.prepare(ctx)
131
		require.Equal(t, 1, len(mgr.tasks))
132
	}
133
}
134

135
func TestInspectorWaitCompleted(t *testing.T) {
136
	ctx := context.Background()
137
	{
138
		mgr := newInspector(t)
139

140
		mgr.cfg.InspectBatch = 1
141
		mgr.cfg.ListVolStep = 1
142

143
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
144
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
145
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
146

147
		mgr.prepare(ctx)
148
		require.Equal(t, 1, len(mgr.tasks))
149

150
		for _, task := range mgr.tasks {
151
			task.ret = &proto.VolumeInspectRet{}
152
		}
153
		mgr.waitCompleted(ctx)
154
	}
155
}
156

157
func TestInspectorFinish(t *testing.T) {
158
	ctx := context.Background()
159
	{
160
		mgr := newInspector(t)
161

162
		mgr.cfg.InspectBatch = 1
163
		mgr.cfg.ListVolStep = 1
164

165
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
166
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
167
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
168

169
		mgr.prepare(ctx)
170
		require.Equal(t, 1, len(mgr.tasks))
171

172
		for _, task := range mgr.tasks {
173
			task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
174
		}
175
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
176
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
177
		mgr.finish(ctx)
178
		require.Equal(t, 0, len(mgr.tasks))
179
	}
180
	{
181
		mgr := newInspector(t)
182

183
		mgr.cfg.InspectBatch = 1
184
		mgr.cfg.ListVolStep = 1
185

186
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
187
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
188
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
189

190
		mgr.prepare(ctx)
191
		require.Equal(t, 1, len(mgr.tasks))
192

193
		for _, task := range mgr.tasks {
194
			task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
195
		}
196
		volume = MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusActive)
197
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
198
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
199
		mgr.finish(ctx)
200
		require.Equal(t, 0, len(mgr.tasks))
201
	}
202
	{
203
		mgr := newInspector(t)
204

205
		mgr.cfg.InspectBatch = 1
206
		mgr.cfg.ListVolStep = 1
207

208
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
209
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
210
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
211

212
		mgr.prepare(ctx)
213
		require.Equal(t, 1, len(mgr.tasks))
214

215
		for _, task := range mgr.tasks {
216
			task.ret = &proto.VolumeInspectRet{MissedShards: genMockFailShards(100012, []proto.BlobID{3, 4})}
217
		}
218
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInfo(any, any).Return(volume, nil)
219
		mgr.repairShardSender.(*MockMqProxyAPI).EXPECT().SendShardRepairMsg(any, any, any, any).Return(errMock)
220
		mgr.repairShardSender.(*MockMqProxyAPI).EXPECT().SendShardRepairMsg(any, any, any, any).AnyTimes().Return(nil)
221
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().SetVolumeInspectCheckPoint(any, any).Return(nil)
222

223
		mgr.finish(ctx)
224
		require.Equal(t, 0, len(mgr.tasks))
225
	}
226
}
227

228
func TestInspectorAcquire(t *testing.T) {
229
	ctx := context.Background()
230
	{
231
		mgr := newInspector(t)
232
		mgr.enableAcquire(false)
233
		_, err := mgr.AcquireInspect(ctx)
234
		require.True(t, errors.Is(err, errForbiddenAcquire))
235
	}
236
	{
237
		mgr := newInspector(t)
238
		mgr.enableAcquire(true)
239
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(false)
240
		_, err := mgr.AcquireInspect(ctx)
241
		require.True(t, errors.Is(err, proto.ErrTaskPaused))
242
	}
243
	{
244
		mgr := newInspector(t)
245
		mgr.enableAcquire(true)
246
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
247
		_, err := mgr.AcquireInspect(ctx)
248
		require.True(t, errors.Is(err, proto.ErrTaskEmpty))
249
	}
250
	{
251
		mgr := newInspector(t)
252
		mgr.enableAcquire(true)
253
		mgr.taskSwitch.(*mocks.MockSwitcher).EXPECT().Enabled().Return(true)
254

255
		mgr.cfg.InspectBatch = 1
256
		mgr.cfg.ListVolStep = 1
257

258
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
259
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
260
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
261

262
		mgr.prepare(ctx)
263
		require.Equal(t, 1, len(mgr.tasks))
264

265
		var taskID string
266
		for k := range mgr.tasks {
267
			taskID = k
268
		}
269

270
		task, err := mgr.AcquireInspect(ctx)
271
		require.NoError(t, err)
272
		require.Equal(t, mgr.tasks[taskID].t.TaskID, task.TaskID)
273
	}
274
}
275

276
func TestInspectorComplete(t *testing.T) {
277
	ctx := context.Background()
278
	{
279
		mgr := newInspector(t)
280
		mgr.enableAcquire(false)
281

282
		mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{})
283
	}
284
	{
285
		mgr := newInspector(t)
286
		mgr.enableAcquire(true)
287

288
		mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{})
289
	}
290
	{
291
		mgr := newInspector(t)
292
		mgr.enableAcquire(true)
293

294
		mgr.cfg.InspectBatch = 1
295
		mgr.cfg.ListVolStep = 1
296

297
		volume := MockGenVolInfo(100012, codemode.EC6P6, proto.VolumeStatusIdle)
298
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().GetVolumeInspectCheckPoint(any).AnyTimes().Return(&proto.VolumeInspectCheckPoint{}, nil)
299
		mgr.clusterMgrCli.(*MockClusterMgrAPI).EXPECT().ListVolume(any, any, any).Return([]*client.VolumeInfoSimple{volume}, proto.Vid(0), nil)
300

301
		mgr.prepare(ctx)
302
		require.Equal(t, 1, len(mgr.tasks))
303

304
		var taskID string
305
		for k := range mgr.tasks {
306
			taskID = k
307
		}
308

309
		mgr.CompleteInspect(ctx, &proto.VolumeInspectRet{TaskID: taskID})
310
	}
311
}
312

313
func TestInspectorGetTaskStats(t *testing.T) {
314
	mgr := newInspector(t)
315
	mgr.GetTaskStats()
316
}
317

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.