1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
"github.com/golang/mock/gomock"
23
"github.com/stretchr/testify/require"
25
"github.com/cubefs/cubefs/blobstore/common/proto"
26
"github.com/cubefs/cubefs/blobstore/scheduler/base"
27
"github.com/cubefs/cubefs/blobstore/scheduler/client"
31
topoDisk1 = &client.DiskInfoSimple{
35
Host: "127.0.0.1:8000",
40
topoDisk2 = &client.DiskInfoSimple{
44
Host: "127.0.0.2:8000",
49
topoDisk3 = &client.DiskInfoSimple{
53
Host: "127.0.0.3:8000",
58
topoDisk4 = &client.DiskInfoSimple{
62
Host: "127.0.0.4:8000",
67
topoDisk5 = &client.DiskInfoSimple{
71
Host: "127.0.0.4:8000",
76
topoDisk6 = &client.DiskInfoSimple{
80
Host: "127.0.0.4:8000",
86
topoDisks = []*client.DiskInfoSimple{topoDisk1, topoDisk2, topoDisk3, topoDisk4, topoDisk5, topoDisk6}
89
func TestNewClusterTopologyMgr(t *testing.T) {
90
clusterTopMgr := &ClusterTopologyMgr{
91
taskStatsMgr: base.NewClusterTopologyStatisticsMgr(1, []float64{}),
93
clusterTopMgr.buildClusterTopology(topoDisks, 1)
94
require.Equal(t, 3, len(clusterTopMgr.GetIDCs()))
95
disks := clusterTopMgr.GetIDCDisks("z0")
96
require.Equal(t, 2, len(disks))
97
disks = clusterTopMgr.GetIDCDisks("z1")
98
require.Equal(t, 2, len(disks))
99
disks = clusterTopMgr.GetIDCDisks("z2")
100
require.Equal(t, 1, len(disks))
101
disks = clusterTopMgr.GetIDCDisks("z3")
102
require.Nil(t, disks)
103
disk := clusterTopMgr.MaxFreeChunksDisk("z1")
104
require.Equal(t, topoDisk3.DiskID, disk.DiskID)
105
disk = clusterTopMgr.MaxFreeChunksDisk("z3")
108
ctr := gomock.NewController(t)
109
clusterMgrCli := NewMockClusterMgrAPI(ctr)
110
clusterMgrCli.EXPECT().ListClusterDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk1}, nil)
111
clusterMgrCli.EXPECT().ListBrokenDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk2}, nil)
112
clusterMgrCli.EXPECT().ListRepairingDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk2}, nil)
113
clusterMgrCli.EXPECT().ListVolume(any, any, any).Times(3).Return(nil, defaultMarker, errMock)
114
clusterMgrCli.EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
115
clusterMgrCli.EXPECT().GetVolumeInfo(any, any).DoAndReturn(
116
func(_ context.Context, vid proto.Vid) (*client.VolumeInfoSimple, error) {
117
return &client.VolumeInfoSimple{Vid: vid}, nil
120
conf := &clusterTopologyConfig{
122
UpdateInterval: time.Microsecond,
123
VolumeUpdateInterval: time.Microsecond,
126
mgr := NewClusterTopologyMgr(clusterMgrCli, conf)
129
topology := mgr.(*ClusterTopologyMgr)
130
topology.loadNormalDisks()
131
topology.loadBrokenDisks()
133
require.True(t, mgr.IsBrokenDisk(testDisk2.DiskID))
134
require.False(t, mgr.IsBrokenDisk(testDisk1.DiskID))
136
topology.loadBrokenDisks()
137
require.True(t, mgr.IsBrokenDisk(testDisk2.DiskID))
138
require.False(t, mgr.IsBrokenDisk(testDisk1.DiskID))
141
err := mgr.LoadVolumes()
142
require.ErrorIs(t, err, errMock)
143
_, err = mgr.UpdateVolume(proto.Vid(1))
144
require.ErrorIs(t, err, errMock)
145
_, err = mgr.GetVolume(proto.Vid(1))
146
require.NoError(t, err)
149
func TestVolumeCache(t *testing.T) {
150
cmClient := NewMockClusterMgrAPI(gomock.NewController(t))
151
cmClient.EXPECT().ListVolume(any, any, any).Times(2).DoAndReturn(
152
func(_ context.Context, marker proto.Vid, _ int) ([]*client.VolumeInfoSimple, proto.Vid, error) {
153
if marker == defaultMarker {
154
return []*client.VolumeInfoSimple{{Vid: 4}}, proto.Vid(10), nil
156
return []*client.VolumeInfoSimple{{Vid: 9}}, defaultMarker, nil
159
cmClient.EXPECT().GetVolumeInfo(any, any).DoAndReturn(
160
func(_ context.Context, vid proto.Vid) (*client.VolumeInfoSimple, error) {
161
return &client.VolumeInfoSimple{Vid: vid}, nil
165
volCache := NewVolumeCache(cmClient, 10*time.Second)
166
err := volCache.LoadVolumes()
167
require.NoError(t, err)
169
// no cache will update
170
_, err = volCache.GetVolume(1)
171
require.NoError(t, err)
173
_, err = volCache.GetVolume(1)
174
require.NoError(t, err)
176
// update ErrFrequentlyUpdate
177
_, err = volCache.UpdateVolume(1)
178
require.ErrorIs(t, err, ErrFrequentlyUpdate)
180
// list and get failed
181
cmClient.EXPECT().ListVolume(any, any, any).AnyTimes().Return(nil, proto.Vid(0), errMock)
182
cmClient.EXPECT().GetVolumeInfo(any, any).Return(&client.VolumeInfoSimple{}, errMock)
183
volCache = NewVolumeCache(cmClient, -1)
184
_, err = volCache.GetVolume(1)
185
require.ErrorIs(t, err, errMock)
186
err = volCache.LoadVolumes()
187
require.ErrorIs(t, err, errMock)
190
func TestDoubleCheckedRun(t *testing.T) {
191
ctr := gomock.NewController(t)
192
ctx := context.Background()
193
volume := &client.VolumeInfoSimple{Vid: proto.Vid(1)}
195
// get volume failed and return
196
c := NewMockClusterTopology(ctr)
197
c.EXPECT().GetVolume(any).Return(nil, errMock)
198
err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
199
require.ErrorIs(t, err, errMock)
202
// do task failed and not check again
203
c := NewMockClusterTopology(ctr)
204
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
205
err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, errMock })
206
require.ErrorIs(t, err, errMock)
209
// do task success and check task: get volume failed in check phase
210
c := NewMockClusterTopology(ctr)
211
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
212
c.EXPECT().GetVolume(any).Return(nil, errMock)
213
err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
214
require.ErrorIs(t, err, errMock)
217
// do task success and check task: volume change after task done and do it again
218
c := NewMockClusterTopology(ctr)
219
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
220
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
221
c.EXPECT().GetVolume(any).Return(volume, nil)
222
err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
223
require.NoError(t, err)
226
// do task success and check task: volume change when doing task, and no need to do it
227
c := NewMockClusterTopology(ctr)
228
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
229
c.EXPECT().GetVolume(any).Return(volume, nil)
230
err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
231
require.NoError(t, err)
234
// do task success and check task: max times retry
235
c := NewMockClusterTopology(ctr)
237
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 1}}}, nil)
238
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
239
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
240
err := DoubleCheckedRun(context.Background(), c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) {
243
return &client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil
247
require.NoError(t, err)
250
// do task success and check task: max times retry and do task failed
251
c := NewMockClusterTopology(ctr)
252
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 1}}}, nil)
253
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
254
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 3}}}, nil)
255
c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 4}}}, nil)
256
err := DoubleCheckedRun(context.Background(), c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
257
require.ErrorIs(t, err, errVolumeMissmatch)
261
func BenchmarkVolumeCache(b *testing.B) {
262
for _, cs := range []struct {
271
b.Run(cs.Name, func(b *testing.B) {
272
vols := make([]*client.VolumeInfoSimple, items)
273
for idx := range vols {
274
vols[idx] = &client.VolumeInfoSimple{Vid: proto.Vid(idx)}
276
cmCli := NewMockClusterMgrAPI(gomock.NewController(b))
277
cmCli.EXPECT().ListVolume(any, any, any).Return(vols, defaultMarker, nil)
279
cacher := NewVolumeCache(cmCli, -1)
280
require.NoError(b, cacher.LoadVolumes())
283
for ii := 0; ii < b.N; ii++ {
284
cacher.GetVolume(proto.Vid(ii % items))