cubefs

Форк
0
/
cluster_topology_test.go 
288 строк · 9.9 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"context"
19
	"testing"
20
	"time"
21

22
	"github.com/golang/mock/gomock"
23
	"github.com/stretchr/testify/require"
24

25
	"github.com/cubefs/cubefs/blobstore/common/proto"
26
	"github.com/cubefs/cubefs/blobstore/scheduler/base"
27
	"github.com/cubefs/cubefs/blobstore/scheduler/client"
28
)
29

30
var (
31
	topoDisk1 = &client.DiskInfoSimple{
32
		ClusterID:    1,
33
		Idc:          "z0",
34
		Rack:         "rack1",
35
		Host:         "127.0.0.1:8000",
36
		DiskID:       1,
37
		FreeChunkCnt: 10,
38
		MaxChunkCnt:  700,
39
	}
40
	topoDisk2 = &client.DiskInfoSimple{
41
		ClusterID:    1,
42
		Idc:          "z0",
43
		Rack:         "rack1",
44
		Host:         "127.0.0.2:8000",
45
		DiskID:       2,
46
		FreeChunkCnt: 100,
47
		MaxChunkCnt:  700,
48
	}
49
	topoDisk3 = &client.DiskInfoSimple{
50
		ClusterID:    1,
51
		Idc:          "z1",
52
		Rack:         "rack1",
53
		Host:         "127.0.0.3:8000",
54
		DiskID:       3,
55
		FreeChunkCnt: 20,
56
		MaxChunkCnt:  700,
57
	}
58
	topoDisk4 = &client.DiskInfoSimple{
59
		ClusterID:    1,
60
		Idc:          "z1",
61
		Rack:         "rack2",
62
		Host:         "127.0.0.4:8000",
63
		DiskID:       4,
64
		FreeChunkCnt: 5,
65
		MaxChunkCnt:  700,
66
	}
67
	topoDisk5 = &client.DiskInfoSimple{
68
		ClusterID:    1,
69
		Idc:          "z2",
70
		Rack:         "rack2",
71
		Host:         "127.0.0.4:8000",
72
		DiskID:       5,
73
		FreeChunkCnt: 200,
74
		MaxChunkCnt:  700,
75
	}
76
	topoDisk6 = &client.DiskInfoSimple{
77
		ClusterID:    123,
78
		Idc:          "z2",
79
		Rack:         "rack2",
80
		Host:         "127.0.0.4:8000",
81
		DiskID:       5,
82
		FreeChunkCnt: 200,
83
		MaxChunkCnt:  700,
84
	}
85

86
	topoDisks = []*client.DiskInfoSimple{topoDisk1, topoDisk2, topoDisk3, topoDisk4, topoDisk5, topoDisk6}
87
)
88

89
func TestNewClusterTopologyMgr(t *testing.T) {
90
	clusterTopMgr := &ClusterTopologyMgr{
91
		taskStatsMgr: base.NewClusterTopologyStatisticsMgr(1, []float64{}),
92
	}
93
	clusterTopMgr.buildClusterTopology(topoDisks, 1)
94
	require.Equal(t, 3, len(clusterTopMgr.GetIDCs()))
95
	disks := clusterTopMgr.GetIDCDisks("z0")
96
	require.Equal(t, 2, len(disks))
97
	disks = clusterTopMgr.GetIDCDisks("z1")
98
	require.Equal(t, 2, len(disks))
99
	disks = clusterTopMgr.GetIDCDisks("z2")
100
	require.Equal(t, 1, len(disks))
101
	disks = clusterTopMgr.GetIDCDisks("z3")
102
	require.Nil(t, disks)
103
	disk := clusterTopMgr.MaxFreeChunksDisk("z1")
104
	require.Equal(t, topoDisk3.DiskID, disk.DiskID)
105
	disk = clusterTopMgr.MaxFreeChunksDisk("z3")
106
	require.Nil(t, disk)
107

108
	ctr := gomock.NewController(t)
109
	clusterMgrCli := NewMockClusterMgrAPI(ctr)
110
	clusterMgrCli.EXPECT().ListClusterDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk1}, nil)
111
	clusterMgrCli.EXPECT().ListBrokenDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk2}, nil)
112
	clusterMgrCli.EXPECT().ListRepairingDisks(any).AnyTimes().Return([]*client.DiskInfoSimple{testDisk2}, nil)
113
	clusterMgrCli.EXPECT().ListVolume(any, any, any).Times(3).Return(nil, defaultMarker, errMock)
114
	clusterMgrCli.EXPECT().GetVolumeInfo(any, any).Return(nil, errMock)
115
	clusterMgrCli.EXPECT().GetVolumeInfo(any, any).DoAndReturn(
116
		func(_ context.Context, vid proto.Vid) (*client.VolumeInfoSimple, error) {
117
			return &client.VolumeInfoSimple{Vid: vid}, nil
118
		},
119
	)
120
	conf := &clusterTopologyConfig{
121
		ClusterID:            1,
122
		UpdateInterval:       time.Microsecond,
123
		VolumeUpdateInterval: time.Microsecond,
124
		Leader:               true,
125
	}
126
	mgr := NewClusterTopologyMgr(clusterMgrCli, conf)
127
	defer mgr.Close()
128

129
	topology := mgr.(*ClusterTopologyMgr)
130
	topology.loadNormalDisks()
131
	topology.loadBrokenDisks()
132

133
	require.True(t, mgr.IsBrokenDisk(testDisk2.DiskID))
134
	require.False(t, mgr.IsBrokenDisk(testDisk1.DiskID))
135

136
	topology.loadBrokenDisks()
137
	require.True(t, mgr.IsBrokenDisk(testDisk2.DiskID))
138
	require.False(t, mgr.IsBrokenDisk(testDisk1.DiskID))
139

140
	// update volume
141
	err := mgr.LoadVolumes()
142
	require.ErrorIs(t, err, errMock)
143
	_, err = mgr.UpdateVolume(proto.Vid(1))
144
	require.ErrorIs(t, err, errMock)
145
	_, err = mgr.GetVolume(proto.Vid(1))
146
	require.NoError(t, err)
147
}
148

149
func TestVolumeCache(t *testing.T) {
150
	cmClient := NewMockClusterMgrAPI(gomock.NewController(t))
151
	cmClient.EXPECT().ListVolume(any, any, any).Times(2).DoAndReturn(
152
		func(_ context.Context, marker proto.Vid, _ int) ([]*client.VolumeInfoSimple, proto.Vid, error) {
153
			if marker == defaultMarker {
154
				return []*client.VolumeInfoSimple{{Vid: 4}}, proto.Vid(10), nil
155
			}
156
			return []*client.VolumeInfoSimple{{Vid: 9}}, defaultMarker, nil
157
		},
158
	)
159
	cmClient.EXPECT().GetVolumeInfo(any, any).DoAndReturn(
160
		func(_ context.Context, vid proto.Vid) (*client.VolumeInfoSimple, error) {
161
			return &client.VolumeInfoSimple{Vid: vid}, nil
162
		},
163
	)
164

165
	volCache := NewVolumeCache(cmClient, 10*time.Second)
166
	err := volCache.LoadVolumes()
167
	require.NoError(t, err)
168

169
	// no cache will update
170
	_, err = volCache.GetVolume(1)
171
	require.NoError(t, err)
172
	// return cache
173
	_, err = volCache.GetVolume(1)
174
	require.NoError(t, err)
175

176
	// update ErrFrequentlyUpdate
177
	_, err = volCache.UpdateVolume(1)
178
	require.ErrorIs(t, err, ErrFrequentlyUpdate)
179

180
	// list and get failed
181
	cmClient.EXPECT().ListVolume(any, any, any).AnyTimes().Return(nil, proto.Vid(0), errMock)
182
	cmClient.EXPECT().GetVolumeInfo(any, any).Return(&client.VolumeInfoSimple{}, errMock)
183
	volCache = NewVolumeCache(cmClient, -1)
184
	_, err = volCache.GetVolume(1)
185
	require.ErrorIs(t, err, errMock)
186
	err = volCache.LoadVolumes()
187
	require.ErrorIs(t, err, errMock)
188
}
189

190
func TestDoubleCheckedRun(t *testing.T) {
191
	ctr := gomock.NewController(t)
192
	ctx := context.Background()
193
	volume := &client.VolumeInfoSimple{Vid: proto.Vid(1)}
194
	{
195
		// get volume failed and return
196
		c := NewMockClusterTopology(ctr)
197
		c.EXPECT().GetVolume(any).Return(nil, errMock)
198
		err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
199
		require.ErrorIs(t, err, errMock)
200
	}
201
	{
202
		// do task failed and not check again
203
		c := NewMockClusterTopology(ctr)
204
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
205
		err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, errMock })
206
		require.ErrorIs(t, err, errMock)
207
	}
208
	{
209
		// do task success and check task: get volume failed in check phase
210
		c := NewMockClusterTopology(ctr)
211
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
212
		c.EXPECT().GetVolume(any).Return(nil, errMock)
213
		err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
214
		require.ErrorIs(t, err, errMock)
215
	}
216
	{
217
		// do task success and check task: volume change after task done and do it again
218
		c := NewMockClusterTopology(ctr)
219
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
220
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
221
		c.EXPECT().GetVolume(any).Return(volume, nil)
222
		err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
223
		require.NoError(t, err)
224
	}
225
	{
226
		// do task success and check task: volume change when doing task, and no need to do it
227
		c := NewMockClusterTopology(ctr)
228
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{}, nil)
229
		c.EXPECT().GetVolume(any).Return(volume, nil)
230
		err := DoubleCheckedRun(ctx, c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
231
		require.NoError(t, err)
232
	}
233
	{
234
		// do task success and check task: max times retry
235
		c := NewMockClusterTopology(ctr)
236
		retry := 0
237
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 1}}}, nil)
238
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
239
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
240
		err := DoubleCheckedRun(context.Background(), c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) {
241
			retry++
242
			if retry == 2 {
243
				return &client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil
244
			}
245
			return volume, nil
246
		})
247
		require.NoError(t, err)
248
	}
249
	{
250
		// do task success and check task: max times retry and do task failed
251
		c := NewMockClusterTopology(ctr)
252
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 1}}}, nil)
253
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 2}}}, nil)
254
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 3}}}, nil)
255
		c.EXPECT().GetVolume(any).Return(&client.VolumeInfoSimple{Vid: 1, VunitLocations: []proto.VunitLocation{{Vuid: 4}}}, nil)
256
		err := DoubleCheckedRun(context.Background(), c, 1, func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error) { return volume, nil })
257
		require.ErrorIs(t, err, errVolumeMissmatch)
258
	}
259
}
260

261
func BenchmarkVolumeCache(b *testing.B) {
262
	for _, cs := range []struct {
263
		Name  string
264
		Items int
265
	}{
266
		{"tiny", 1 << 5},
267
		{"norm", 1 << 16},
268
		{"huge", 1 << 20},
269
	} {
270
		items := cs.Items
271
		b.Run(cs.Name, func(b *testing.B) {
272
			vols := make([]*client.VolumeInfoSimple, items)
273
			for idx := range vols {
274
				vols[idx] = &client.VolumeInfoSimple{Vid: proto.Vid(idx)}
275
			}
276
			cmCli := NewMockClusterMgrAPI(gomock.NewController(b))
277
			cmCli.EXPECT().ListVolume(any, any, any).Return(vols, defaultMarker, nil)
278

279
			cacher := NewVolumeCache(cmCli, -1)
280
			require.NoError(b, cacher.LoadVolumes())
281

282
			b.ResetTimer()
283
			for ii := 0; ii < b.N; ii++ {
284
				cacher.GetVolume(proto.Vid(ii % items))
285
			}
286
		})
287
	}
288
}
289

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.