1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
17
// github.com/cubefs/cubefs/blobstore/access/... module access interfaces
18
//go:generate mockgen -destination=./controller_mock_test.go -package=access -mock_names ClusterController=MockClusterController,ServiceController=MockServiceController,VolumeGetter=MockVolumeGetter github.com/cubefs/cubefs/blobstore/access/controller ClusterController,ServiceController,VolumeGetter
19
//go:generate mockgen -destination=./access_mock_test.go -package=access -mock_names StreamHandler=MockStreamHandler,Limiter=MockLimiter github.com/cubefs/cubefs/blobstore/access StreamHandler,Limiter
36
"github.com/afex/hystrix-go/hystrix"
37
"github.com/golang/mock/gomock"
39
"github.com/cubefs/cubefs/blobstore/access/controller"
40
"github.com/cubefs/cubefs/blobstore/api/blobnode"
41
"github.com/cubefs/cubefs/blobstore/api/clustermgr"
42
"github.com/cubefs/cubefs/blobstore/api/proxy"
43
"github.com/cubefs/cubefs/blobstore/common/codemode"
44
"github.com/cubefs/cubefs/blobstore/common/ec"
45
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
46
"github.com/cubefs/cubefs/blobstore/common/proto"
47
"github.com/cubefs/cubefs/blobstore/common/resourcepool"
48
"github.com/cubefs/cubefs/blobstore/common/trace"
49
"github.com/cubefs/cubefs/blobstore/testing/mocks"
50
_ "github.com/cubefs/cubefs/blobstore/testing/nolog"
54
errNotFound = errors.New("not found")
55
errAllocTimeout = errors.New("alloc timeout")
57
allocTimeoutSize uint64 = 1 << 40
62
idcOther = "test-idc-other"
63
allID = []int{1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012}
64
idcID = []int{1001, 1002, 1003, 1007, 1008, 1009}
65
idcOtherID = []int{1004, 1005, 1006, 1010, 1011, 1012}
67
clusterID = proto.ClusterID(1)
68
volumeID = proto.Vid(1)
73
memPool *resourcepool.MemPool
74
encoder map[codemode.CodeMode]ec.Encoder
75
proxyClient proxy.Client
77
allCodeModes CodeModePairs
79
cmcli clustermgr.APIAccess
80
volumeGetter controller.VolumeGetter
81
serviceController controller.ServiceController
82
cc controller.ClusterController
84
clusterInfo *clustermgr.ClusterInfo
85
dataVolume *proxy.VersionVolume
86
dataAllocs []proxy.AllocRet
87
dataNodes map[string]clustermgr.ServiceInfo
88
dataDisks map[proto.DiskID]blobnode.DiskInfo
89
dataShards *shardsData
91
vuidController *vuidControl
93
putErrors = []errcode.Error{
94
errcode.ErrDiskBroken, errcode.ErrReadonlyVUID,
95
errcode.ErrChunkNoSpace,
96
errcode.ErrNoSuchDisk, errcode.ErrNoSuchVuid,
98
getErrors = []errcode.Error{
100
errcode.ErrDiskBroken, errcode.ErrReadonlyVUID,
101
errcode.ErrNoSuchDisk, errcode.ErrNoSuchVuid,
105
type shardKey struct {
110
type shardsData struct {
112
data map[shardKey][]byte
115
func (d *shardsData) clean() {
117
for key := range d.data {
118
d.data[key] = d.data[key][:0]
123
func (d *shardsData) get(vuid proto.Vuid, bid proto.BlobID) []byte {
124
key := shardKey{Vuid: vuid, Bid: bid}
127
buff := make([]byte, len(data))
133
func (d *shardsData) set(vuid proto.Vuid, bid proto.BlobID, b []byte) {
134
key := shardKey{Vuid: vuid, Bid: bid}
137
if cap(old) <= len(b) {
138
d.data[key] = make([]byte, len(b))
140
d.data[key] = old[:len(b)]
146
type vuidControl struct {
148
broken map[proto.Vuid]bool
149
blocked map[proto.Vuid]bool
151
duration time.Duration
153
isBNRealError bool // is return blobnode real error
156
func (c *vuidControl) Break(id proto.Vuid) {
162
func (c *vuidControl) Unbreak(id proto.Vuid) {
168
func (c *vuidControl) Isbroken(id proto.Vuid) bool {
170
v, ok := c.broken[id]
175
func (c *vuidControl) Block(id proto.Vuid) {
181
func (c *vuidControl) Unblock(id proto.Vuid) {
183
delete(c.blocked, id)
187
func (c *vuidControl) Isblocked(id proto.Vuid) bool {
189
v, ok := c.blocked[id]
194
func (c *vuidControl) SetBNRealError(b bool) {
200
func (c *vuidControl) IsBNRealError() bool {
202
defer c.mutex.Unlock()
203
return c.isBNRealError
206
func randBlobnodeRealError(errors []errcode.Error) error {
207
n := rand.Intn(1024) % len(errors)
211
var storageAPIRangeGetShard = func(ctx context.Context, host string, args *blobnode.RangeGetShardArgs) (
212
body io.ReadCloser, shardCrc uint32, err error) {
213
if vuidController.Isbroken(args.Vuid) {
214
err = errors.New("get shard fake error")
215
if vuidController.IsBNRealError() {
216
err = randBlobnodeRealError(getErrors)
220
if vuidController.Isblocked(args.Vuid) {
221
vuidController.block()
222
if rand.Intn(2) == 0 {
223
err = errors.New("get shard timeout")
225
err = errors.New("get shard Timeout")
230
buff := dataShards.get(args.Vuid, args.Bid)
232
return nil, 0, errNotFound
234
if len(buff) < int(args.Offset+args.Size) {
235
err = errors.New("get shard concurrently")
239
buff = buff[int(args.Offset):int(args.Offset+args.Size)]
240
shardCrc = crc32.ChecksumIEEE(buff)
241
body = ioutil.NopCloser(bytes.NewReader(buff))
245
var storageAPIPutShard = func(ctx context.Context, host string, args *blobnode.PutShardArgs) (
246
crc uint32, err error) {
247
if vuidController.Isbroken(args.Vuid) {
248
err = errors.New("put shard fake error")
249
if vuidController.IsBNRealError() {
250
err = randBlobnodeRealError(putErrors)
254
if vuidController.Isblocked(args.Vuid) {
255
vuidController.block()
256
err = errors.New("put shard timeout")
260
buffer, _ := memPool.Alloc(int(args.Size))
261
defer memPool.Put(buffer)
263
buffer = buffer[:int(args.Size)]
264
_, err = io.ReadFull(args.Body, buffer)
269
crc = crc32.ChecksumIEEE(buffer)
270
dataShards.set(args.Vuid, args.Bid, buffer)
275
dataAllocs = make([]proxy.AllocRet, 2)
276
dataAllocs[0] = proxy.AllocRet{
281
dataAllocs[1] = proxy.AllocRet{
287
dataVolume = &proxy.VersionVolume{VolumeInfo: clustermgr.VolumeInfo{
288
VolumeInfoBase: clustermgr.VolumeInfoBase{
290
CodeMode: codemode.EC6P6,
292
Units: func() (units []clustermgr.Unit) {
293
for _, id := range allID {
294
units = append(units, clustermgr.Unit{
295
Vuid: proto.Vuid(id),
296
DiskID: proto.DiskID(id),
297
Host: strconv.Itoa(id),
304
proxyNodes := make([]clustermgr.ServiceNode, 32)
305
for idx := range proxyNodes {
306
proxyNodes[idx] = clustermgr.ServiceNode{
309
Host: fmt.Sprintf("proxy-%d", idx),
314
dataNodes = make(map[string]clustermgr.ServiceInfo)
315
dataNodes[serviceProxy] = clustermgr.ServiceInfo{
319
dataDisks = make(map[proto.DiskID]blobnode.DiskInfo)
320
for _, id := range idcID {
321
dataDisks[proto.DiskID(id)] = blobnode.DiskInfo{
322
ClusterID: clusterID, Idc: idc, Host: strconv.Itoa(id),
323
DiskHeartBeatInfo: blobnode.DiskHeartBeatInfo{DiskID: proto.DiskID(id)},
326
for _, id := range idcOtherID {
327
dataDisks[proto.DiskID(id)] = blobnode.DiskInfo{
328
ClusterID: clusterID, Idc: idcOther, Host: strconv.Itoa(id),
329
DiskHeartBeatInfo: blobnode.DiskHeartBeatInfo{DiskID: proto.DiskID(id)},
333
dataShards = &shardsData{
334
data: make(map[shardKey][]byte, len(allID)),
338
ctr := gomock.NewController(&testing.T{})
339
cli := mocks.NewMockClientAPI(ctr)
340
cli.EXPECT().GetService(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
341
func(ctx context.Context, args clustermgr.GetServiceArgs) (clustermgr.ServiceInfo, error) {
342
if val, ok := dataNodes[args.Name]; ok {
345
return clustermgr.ServiceInfo{}, errNotFound
349
clusterInfo = &clustermgr.ClusterInfo{
350
Region: "test-region",
351
ClusterID: clusterID,
352
Nodes: []string{"node-1", "node-2", "node-3"},
355
ctr = gomock.NewController(&testing.T{})
356
proxycli := mocks.NewMockProxyClient(ctr)
357
proxycli.EXPECT().GetCacheVolume(gomock.Any(), gomock.Any(), gomock.Any()).
358
AnyTimes().Return(dataVolume, nil)
359
proxycli.EXPECT().GetCacheDisk(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
360
func(_ context.Context, _ string, args *proxy.CacheDiskArgs) (*blobnode.DiskInfo, error) {
361
if val, ok := dataDisks[args.DiskID]; ok {
364
return nil, errNotFound
367
serviceController, _ = controller.NewServiceController(
368
controller.ServiceConfig{
369
ClusterID: clusterID,
372
}, cmcli, proxycli, nil)
373
volumeGetter, _ = controller.NewVolumeGetter(clusterID, serviceController, proxycli, 0)
375
ctr = gomock.NewController(&testing.T{})
376
c := NewMockClusterController(ctr)
377
c.EXPECT().Region().AnyTimes().Return("test-region")
378
c.EXPECT().ChooseOne().AnyTimes().Return(clusterInfo, nil)
379
c.EXPECT().GetServiceController(gomock.Any()).AnyTimes().Return(serviceController, nil)
380
c.EXPECT().GetVolumeGetter(gomock.Any()).AnyTimes().Return(volumeGetter, nil)
381
c.EXPECT().ChangeChooseAlg(gomock.Any()).AnyTimes().DoAndReturn(
382
func(alg controller.AlgChoose) error {
386
return controller.ErrInvalidChooseAlg
390
ctr = gomock.NewController(&testing.T{})
391
allocCli := mocks.NewMockProxyClient(ctr)
392
allocCli.EXPECT().SendDeleteMsg(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
393
allocCli.EXPECT().SendShardRepairMsg(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
394
allocCli.EXPECT().VolumeAlloc(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
395
func(ctx context.Context, host string, args *proxy.AllocVolsArgs) ([]proxy.AllocRet, error) {
396
if args.Fsize > allocTimeoutSize {
397
return nil, errAllocTimeout
399
return dataAllocs, nil
401
proxyClient = allocCli
405
memPool = resourcepool.NewMemPool(getDefaultMempoolSize())
409
coderEC6P6, _ := ec.NewEncoder(ec.Config{
410
CodeMode: codemode.EC6P6.Tactic(),
413
coderEC6P10L2, _ := ec.NewEncoder(ec.Config{
414
CodeMode: codemode.EC6P10L2.Tactic(),
417
coderEC15P12, _ := ec.NewEncoder(ec.Config{
418
CodeMode: codemode.EC15P12.Tactic(),
421
coderEC16P20L2, _ := ec.NewEncoder(ec.Config{
422
CodeMode: codemode.EC16P20L2.Tactic(),
425
encoder = map[codemode.CodeMode]ec.Encoder{
426
codemode.EC6P6: coderEC6P6,
427
codemode.EC6P10L2: coderEC6P10L2,
428
codemode.EC15P12: coderEC15P12,
429
codemode.EC16P20L2: coderEC16P20L2,
434
allCodeModes = CodeModePairs{
435
codemode.EC6P6: CodeModePair{
436
Policy: codemode.Policy{
437
ModeName: codemode.EC6P6.Name(),
438
MaxSize: math.MaxInt64,
441
Tactic: codemode.EC6P6.Tactic(),
443
codemode.EC6P10L2: CodeModePair{
444
Policy: codemode.Policy{
445
ModeName: codemode.EC6P10L2.Name(),
448
Tactic: codemode.EC6P10L2.Tactic(),
450
codemode.EC15P12: CodeModePair{
451
Policy: codemode.Policy{
452
ModeName: codemode.EC15P12.Name(),
455
Tactic: codemode.EC15P12.Tactic(),
457
codemode.EC16P20L2: CodeModePair{
458
Policy: codemode.Policy{
459
ModeName: codemode.EC16P20L2.Name(),
462
Tactic: codemode.EC16P20L2.Tactic(),
467
func initController() {
468
vuidController = &vuidControl{
469
broken: make(map[proto.Vuid]bool),
470
blocked: make(map[proto.Vuid]bool),
472
time.Sleep(200 * time.Millisecond)
474
duration: 200 * time.Millisecond,
475
isBNRealError: false,
477
// initialized broken 1005
478
vuidController.Break(1005)
481
func newMockStorageAPI() blobnode.StorageAPI {
482
ctr := gomock.NewController(&testing.T{})
483
api := mocks.NewMockStorageAPI(ctr)
484
api.EXPECT().RangeGetShard(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
485
DoAndReturn(storageAPIRangeGetShard)
486
api.EXPECT().PutShard(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
487
DoAndReturn(storageAPIPutShard)
492
rand.Seed(int64(time.Now().Nanosecond()))
494
hystrix.ConfigureCommand(rwCommand, hystrix.CommandConfig{
496
MaxConcurrentRequests: 9000,
497
ErrorPercentThreshold: 90,
509
clusterController: cc,
511
blobnodeClient: newMockStorageAPI(),
512
proxyClient: proxyClient,
514
allCodeModes: allCodeModes,
515
maxObjectSize: defaultMaxObjectSize,
516
StreamConfig: StreamConfig{
518
MaxBlobSize: uint32(blobSize), // 4M
519
DiskPunishIntervalS: punishServiceS,
520
ServicePunishIntervalS: punishServiceS,
522
AllocRetryIntervalMS: 3000,
523
MinReadShardsX: minReadShardsX,
525
discardVidChan: make(chan discardVid, 8),
526
stopCh: make(chan struct{}),
528
streamer.loopDiscardVids()
531
func ctxWithName(funcName string) func() context.Context {
532
return func() context.Context {
533
_, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), funcName, funcName)
538
func getBufSizes(size int) ec.BufferSizes {
539
sizes, _ := ec.GetBufferSizes(size, codemode.EC6P6.Tactic())
543
func dataEqual(exp, act []byte) bool {
544
return crc32.ChecksumIEEE(exp) == crc32.ChecksumIEEE(act)