cubefs

Форк
0
/
stream_mock_test.go 
545 строк · 14.4 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
// github.com/cubefs/cubefs/blobstore/access/... module access interfaces
18
//go:generate mockgen -destination=./controller_mock_test.go -package=access -mock_names ClusterController=MockClusterController,ServiceController=MockServiceController,VolumeGetter=MockVolumeGetter github.com/cubefs/cubefs/blobstore/access/controller ClusterController,ServiceController,VolumeGetter
19
//go:generate mockgen -destination=./access_mock_test.go -package=access -mock_names StreamHandler=MockStreamHandler,Limiter=MockLimiter github.com/cubefs/cubefs/blobstore/access StreamHandler,Limiter
20

21
import (
22
	"bytes"
23
	"context"
24
	"errors"
25
	"fmt"
26
	"hash/crc32"
27
	"io"
28
	"io/ioutil"
29
	"math"
30
	"math/rand"
31
	"strconv"
32
	"sync"
33
	"testing"
34
	"time"
35

36
	"github.com/afex/hystrix-go/hystrix"
37
	"github.com/golang/mock/gomock"
38

39
	"github.com/cubefs/cubefs/blobstore/access/controller"
40
	"github.com/cubefs/cubefs/blobstore/api/blobnode"
41
	"github.com/cubefs/cubefs/blobstore/api/clustermgr"
42
	"github.com/cubefs/cubefs/blobstore/api/proxy"
43
	"github.com/cubefs/cubefs/blobstore/common/codemode"
44
	"github.com/cubefs/cubefs/blobstore/common/ec"
45
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
46
	"github.com/cubefs/cubefs/blobstore/common/proto"
47
	"github.com/cubefs/cubefs/blobstore/common/resourcepool"
48
	"github.com/cubefs/cubefs/blobstore/common/trace"
49
	"github.com/cubefs/cubefs/blobstore/testing/mocks"
50
	_ "github.com/cubefs/cubefs/blobstore/testing/nolog"
51
)
52

53
var (
54
	errNotFound     = errors.New("not found")
55
	errAllocTimeout = errors.New("alloc timeout")
56

57
	allocTimeoutSize uint64 = 1 << 40
58
	punishServiceS          = 1
59
	minReadShardsX          = 5
60

61
	idc        = "test-idc"
62
	idcOther   = "test-idc-other"
63
	allID      = []int{1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012}
64
	idcID      = []int{1001, 1002, 1003, 1007, 1008, 1009}
65
	idcOtherID = []int{1004, 1005, 1006, 1010, 1011, 1012}
66

67
	clusterID = proto.ClusterID(1)
68
	volumeID  = proto.Vid(1)
69
	blobSize  = 1 << 22
70

71
	streamer *Handler
72

73
	memPool     *resourcepool.MemPool
74
	encoder     map[codemode.CodeMode]ec.Encoder
75
	proxyClient proxy.Client
76

77
	allCodeModes CodeModePairs
78

79
	cmcli             clustermgr.APIAccess
80
	volumeGetter      controller.VolumeGetter
81
	serviceController controller.ServiceController
82
	cc                controller.ClusterController
83

84
	clusterInfo *clustermgr.ClusterInfo
85
	dataVolume  *proxy.VersionVolume
86
	dataAllocs  []proxy.AllocRet
87
	dataNodes   map[string]clustermgr.ServiceInfo
88
	dataDisks   map[proto.DiskID]blobnode.DiskInfo
89
	dataShards  *shardsData
90

91
	vuidController *vuidControl
92

93
	putErrors = []errcode.Error{
94
		errcode.ErrDiskBroken, errcode.ErrReadonlyVUID,
95
		errcode.ErrChunkNoSpace,
96
		errcode.ErrNoSuchDisk, errcode.ErrNoSuchVuid,
97
	}
98
	getErrors = []errcode.Error{
99
		errcode.ErrOverload,
100
		errcode.ErrDiskBroken, errcode.ErrReadonlyVUID,
101
		errcode.ErrNoSuchDisk, errcode.ErrNoSuchVuid,
102
	}
103
)
104

105
type shardKey struct {
106
	Vuid proto.Vuid
107
	Bid  proto.BlobID
108
}
109

110
type shardsData struct {
111
	mutex sync.RWMutex
112
	data  map[shardKey][]byte
113
}
114

115
func (d *shardsData) clean() {
116
	d.mutex.Lock()
117
	for key := range d.data {
118
		d.data[key] = d.data[key][:0]
119
	}
120
	d.mutex.Unlock()
121
}
122

123
func (d *shardsData) get(vuid proto.Vuid, bid proto.BlobID) []byte {
124
	key := shardKey{Vuid: vuid, Bid: bid}
125
	d.mutex.RLock()
126
	data := d.data[key]
127
	buff := make([]byte, len(data))
128
	copy(buff, data)
129
	d.mutex.RUnlock()
130
	return buff
131
}
132

133
func (d *shardsData) set(vuid proto.Vuid, bid proto.BlobID, b []byte) {
134
	key := shardKey{Vuid: vuid, Bid: bid}
135
	d.mutex.Lock()
136
	old := d.data[key]
137
	if cap(old) <= len(b) {
138
		d.data[key] = make([]byte, len(b))
139
	} else {
140
		d.data[key] = old[:len(b)]
141
	}
142
	copy(d.data[key], b)
143
	d.mutex.Unlock()
144
}
145

146
type vuidControl struct {
147
	mutex    sync.Mutex
148
	broken   map[proto.Vuid]bool
149
	blocked  map[proto.Vuid]bool
150
	block    func()
151
	duration time.Duration
152

153
	isBNRealError bool // is return blobnode real error
154
}
155

156
func (c *vuidControl) Break(id proto.Vuid) {
157
	c.mutex.Lock()
158
	c.broken[id] = true
159
	c.mutex.Unlock()
160
}
161

162
func (c *vuidControl) Unbreak(id proto.Vuid) {
163
	c.mutex.Lock()
164
	delete(c.broken, id)
165
	c.mutex.Unlock()
166
}
167

168
func (c *vuidControl) Isbroken(id proto.Vuid) bool {
169
	c.mutex.Lock()
170
	v, ok := c.broken[id]
171
	c.mutex.Unlock()
172
	return ok && v
173
}
174

175
func (c *vuidControl) Block(id proto.Vuid) {
176
	c.mutex.Lock()
177
	c.blocked[id] = true
178
	c.mutex.Unlock()
179
}
180

181
func (c *vuidControl) Unblock(id proto.Vuid) {
182
	c.mutex.Lock()
183
	delete(c.blocked, id)
184
	c.mutex.Unlock()
185
}
186

187
func (c *vuidControl) Isblocked(id proto.Vuid) bool {
188
	c.mutex.Lock()
189
	v, ok := c.blocked[id]
190
	c.mutex.Unlock()
191
	return ok && v
192
}
193

194
func (c *vuidControl) SetBNRealError(b bool) {
195
	c.mutex.Lock()
196
	c.isBNRealError = b
197
	c.mutex.Unlock()
198
}
199

200
func (c *vuidControl) IsBNRealError() bool {
201
	c.mutex.Lock()
202
	defer c.mutex.Unlock()
203
	return c.isBNRealError
204
}
205

206
func randBlobnodeRealError(errors []errcode.Error) error {
207
	n := rand.Intn(1024) % len(errors)
208
	return errors[n]
209
}
210

211
var storageAPIRangeGetShard = func(ctx context.Context, host string, args *blobnode.RangeGetShardArgs) (
212
	body io.ReadCloser, shardCrc uint32, err error) {
213
	if vuidController.Isbroken(args.Vuid) {
214
		err = errors.New("get shard fake error")
215
		if vuidController.IsBNRealError() {
216
			err = randBlobnodeRealError(getErrors)
217
		}
218
		return
219
	}
220
	if vuidController.Isblocked(args.Vuid) {
221
		vuidController.block()
222
		if rand.Intn(2) == 0 {
223
			err = errors.New("get shard timeout")
224
		} else {
225
			err = errors.New("get shard Timeout")
226
		}
227
		return
228
	}
229

230
	buff := dataShards.get(args.Vuid, args.Bid)
231
	if len(buff) == 0 {
232
		return nil, 0, errNotFound
233
	}
234
	if len(buff) < int(args.Offset+args.Size) {
235
		err = errors.New("get shard concurrently")
236
		return
237
	}
238

239
	buff = buff[int(args.Offset):int(args.Offset+args.Size)]
240
	shardCrc = crc32.ChecksumIEEE(buff)
241
	body = ioutil.NopCloser(bytes.NewReader(buff))
242
	return
243
}
244

245
var storageAPIPutShard = func(ctx context.Context, host string, args *blobnode.PutShardArgs) (
246
	crc uint32, err error) {
247
	if vuidController.Isbroken(args.Vuid) {
248
		err = errors.New("put shard fake error")
249
		if vuidController.IsBNRealError() {
250
			err = randBlobnodeRealError(putErrors)
251
		}
252
		return
253
	}
254
	if vuidController.Isblocked(args.Vuid) {
255
		vuidController.block()
256
		err = errors.New("put shard timeout")
257
		return
258
	}
259

260
	buffer, _ := memPool.Alloc(int(args.Size))
261
	defer memPool.Put(buffer)
262

263
	buffer = buffer[:int(args.Size)]
264
	_, err = io.ReadFull(args.Body, buffer)
265
	if err != nil {
266
		return
267
	}
268

269
	crc = crc32.ChecksumIEEE(buffer)
270
	dataShards.set(args.Vuid, args.Bid, buffer)
271
	return
272
}
273

274
func initMockData() {
275
	dataAllocs = make([]proxy.AllocRet, 2)
276
	dataAllocs[0] = proxy.AllocRet{
277
		BidStart: 10000,
278
		BidEnd:   10000,
279
		Vid:      volumeID,
280
	}
281
	dataAllocs[1] = proxy.AllocRet{
282
		BidStart: 20000,
283
		BidEnd:   50000,
284
		Vid:      volumeID,
285
	}
286

287
	dataVolume = &proxy.VersionVolume{VolumeInfo: clustermgr.VolumeInfo{
288
		VolumeInfoBase: clustermgr.VolumeInfoBase{
289
			Vid:      volumeID,
290
			CodeMode: codemode.EC6P6,
291
		},
292
		Units: func() (units []clustermgr.Unit) {
293
			for _, id := range allID {
294
				units = append(units, clustermgr.Unit{
295
					Vuid:   proto.Vuid(id),
296
					DiskID: proto.DiskID(id),
297
					Host:   strconv.Itoa(id),
298
				})
299
			}
300
			return
301
		}(),
302
	}}
303

304
	proxyNodes := make([]clustermgr.ServiceNode, 32)
305
	for idx := range proxyNodes {
306
		proxyNodes[idx] = clustermgr.ServiceNode{
307
			ClusterID: 1,
308
			Name:      serviceProxy,
309
			Host:      fmt.Sprintf("proxy-%d", idx),
310
			Idc:       idc,
311
		}
312
	}
313

314
	dataNodes = make(map[string]clustermgr.ServiceInfo)
315
	dataNodes[serviceProxy] = clustermgr.ServiceInfo{
316
		Nodes: proxyNodes,
317
	}
318

319
	dataDisks = make(map[proto.DiskID]blobnode.DiskInfo)
320
	for _, id := range idcID {
321
		dataDisks[proto.DiskID(id)] = blobnode.DiskInfo{
322
			ClusterID: clusterID, Idc: idc, Host: strconv.Itoa(id),
323
			DiskHeartBeatInfo: blobnode.DiskHeartBeatInfo{DiskID: proto.DiskID(id)},
324
		}
325
	}
326
	for _, id := range idcOtherID {
327
		dataDisks[proto.DiskID(id)] = blobnode.DiskInfo{
328
			ClusterID: clusterID, Idc: idcOther, Host: strconv.Itoa(id),
329
			DiskHeartBeatInfo: blobnode.DiskHeartBeatInfo{DiskID: proto.DiskID(id)},
330
		}
331
	}
332

333
	dataShards = &shardsData{
334
		data: make(map[shardKey][]byte, len(allID)),
335
	}
336
	dataShards.clean()
337

338
	ctr := gomock.NewController(&testing.T{})
339
	cli := mocks.NewMockClientAPI(ctr)
340
	cli.EXPECT().GetService(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
341
		func(ctx context.Context, args clustermgr.GetServiceArgs) (clustermgr.ServiceInfo, error) {
342
			if val, ok := dataNodes[args.Name]; ok {
343
				return val, nil
344
			}
345
			return clustermgr.ServiceInfo{}, errNotFound
346
		})
347
	cmcli = cli
348

349
	clusterInfo = &clustermgr.ClusterInfo{
350
		Region:    "test-region",
351
		ClusterID: clusterID,
352
		Nodes:     []string{"node-1", "node-2", "node-3"},
353
	}
354

355
	ctr = gomock.NewController(&testing.T{})
356
	proxycli := mocks.NewMockProxyClient(ctr)
357
	proxycli.EXPECT().GetCacheVolume(gomock.Any(), gomock.Any(), gomock.Any()).
358
		AnyTimes().Return(dataVolume, nil)
359
	proxycli.EXPECT().GetCacheDisk(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
360
		func(_ context.Context, _ string, args *proxy.CacheDiskArgs) (*blobnode.DiskInfo, error) {
361
			if val, ok := dataDisks[args.DiskID]; ok {
362
				return &val, nil
363
			}
364
			return nil, errNotFound
365
		})
366

367
	serviceController, _ = controller.NewServiceController(
368
		controller.ServiceConfig{
369
			ClusterID: clusterID,
370
			IDC:       idc,
371
			ReloadSec: 1000,
372
		}, cmcli, proxycli, nil)
373
	volumeGetter, _ = controller.NewVolumeGetter(clusterID, serviceController, proxycli, 0)
374

375
	ctr = gomock.NewController(&testing.T{})
376
	c := NewMockClusterController(ctr)
377
	c.EXPECT().Region().AnyTimes().Return("test-region")
378
	c.EXPECT().ChooseOne().AnyTimes().Return(clusterInfo, nil)
379
	c.EXPECT().GetServiceController(gomock.Any()).AnyTimes().Return(serviceController, nil)
380
	c.EXPECT().GetVolumeGetter(gomock.Any()).AnyTimes().Return(volumeGetter, nil)
381
	c.EXPECT().ChangeChooseAlg(gomock.Any()).AnyTimes().DoAndReturn(
382
		func(alg controller.AlgChoose) error {
383
			if alg < 10 {
384
				return nil
385
			}
386
			return controller.ErrInvalidChooseAlg
387
		})
388
	cc = c
389

390
	ctr = gomock.NewController(&testing.T{})
391
	allocCli := mocks.NewMockProxyClient(ctr)
392
	allocCli.EXPECT().SendDeleteMsg(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
393
	allocCli.EXPECT().SendShardRepairMsg(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
394
	allocCli.EXPECT().VolumeAlloc(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
395
		func(ctx context.Context, host string, args *proxy.AllocVolsArgs) ([]proxy.AllocRet, error) {
396
			if args.Fsize > allocTimeoutSize {
397
				return nil, errAllocTimeout
398
			}
399
			return dataAllocs, nil
400
		})
401
	proxyClient = allocCli
402
}
403

404
func initPool() {
405
	memPool = resourcepool.NewMemPool(getDefaultMempoolSize())
406
}
407

408
func initEncoder() {
409
	coderEC6P6, _ := ec.NewEncoder(ec.Config{
410
		CodeMode:     codemode.EC6P6.Tactic(),
411
		EnableVerify: true,
412
	})
413
	coderEC6P10L2, _ := ec.NewEncoder(ec.Config{
414
		CodeMode:     codemode.EC6P10L2.Tactic(),
415
		EnableVerify: true,
416
	})
417
	coderEC15P12, _ := ec.NewEncoder(ec.Config{
418
		CodeMode:     codemode.EC15P12.Tactic(),
419
		EnableVerify: true,
420
	})
421
	coderEC16P20L2, _ := ec.NewEncoder(ec.Config{
422
		CodeMode:     codemode.EC16P20L2.Tactic(),
423
		EnableVerify: true,
424
	})
425
	encoder = map[codemode.CodeMode]ec.Encoder{
426
		codemode.EC6P6:     coderEC6P6,
427
		codemode.EC6P10L2:  coderEC6P10L2,
428
		codemode.EC15P12:   coderEC15P12,
429
		codemode.EC16P20L2: coderEC16P20L2,
430
	}
431
}
432

433
func initEC() {
434
	allCodeModes = CodeModePairs{
435
		codemode.EC6P6: CodeModePair{
436
			Policy: codemode.Policy{
437
				ModeName: codemode.EC6P6.Name(),
438
				MaxSize:  math.MaxInt64,
439
				Enable:   true,
440
			},
441
			Tactic: codemode.EC6P6.Tactic(),
442
		},
443
		codemode.EC6P10L2: CodeModePair{
444
			Policy: codemode.Policy{
445
				ModeName: codemode.EC6P10L2.Name(),
446
				MaxSize:  -1,
447
			},
448
			Tactic: codemode.EC6P10L2.Tactic(),
449
		},
450
		codemode.EC15P12: CodeModePair{
451
			Policy: codemode.Policy{
452
				ModeName: codemode.EC15P12.Name(),
453
				MaxSize:  -1,
454
			},
455
			Tactic: codemode.EC15P12.Tactic(),
456
		},
457
		codemode.EC16P20L2: CodeModePair{
458
			Policy: codemode.Policy{
459
				ModeName: codemode.EC16P20L2.Name(),
460
				MaxSize:  -1,
461
			},
462
			Tactic: codemode.EC16P20L2.Tactic(),
463
		},
464
	}
465
}
466

467
func initController() {
468
	vuidController = &vuidControl{
469
		broken:  make(map[proto.Vuid]bool),
470
		blocked: make(map[proto.Vuid]bool),
471
		block: func() {
472
			time.Sleep(200 * time.Millisecond)
473
		},
474
		duration:      200 * time.Millisecond,
475
		isBNRealError: false,
476
	}
477
	// initialized broken 1005
478
	vuidController.Break(1005)
479
}
480

481
func newMockStorageAPI() blobnode.StorageAPI {
482
	ctr := gomock.NewController(&testing.T{})
483
	api := mocks.NewMockStorageAPI(ctr)
484
	api.EXPECT().RangeGetShard(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
485
		DoAndReturn(storageAPIRangeGetShard)
486
	api.EXPECT().PutShard(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
487
		DoAndReturn(storageAPIPutShard)
488
	return api
489
}
490

491
func init() {
492
	rand.Seed(int64(time.Now().Nanosecond()))
493

494
	hystrix.ConfigureCommand(rwCommand, hystrix.CommandConfig{
495
		Timeout:               9000,
496
		MaxConcurrentRequests: 9000,
497
		ErrorPercentThreshold: 90,
498
	})
499

500
	initPool()
501
	initEncoder()
502
	initEC()
503
	initMockData()
504
	initController()
505

506
	streamer = &Handler{
507
		memPool:           memPool,
508
		encoder:           encoder,
509
		clusterController: cc,
510

511
		blobnodeClient: newMockStorageAPI(),
512
		proxyClient:    proxyClient,
513

514
		allCodeModes:  allCodeModes,
515
		maxObjectSize: defaultMaxObjectSize,
516
		StreamConfig: StreamConfig{
517
			IDC:                    idc,
518
			MaxBlobSize:            uint32(blobSize), // 4M
519
			DiskPunishIntervalS:    punishServiceS,
520
			ServicePunishIntervalS: punishServiceS,
521
			AllocRetryTimes:        3,
522
			AllocRetryIntervalMS:   3000,
523
			MinReadShardsX:         minReadShardsX,
524
		},
525
		discardVidChan: make(chan discardVid, 8),
526
		stopCh:         make(chan struct{}),
527
	}
528
	streamer.loopDiscardVids()
529
}
530

531
func ctxWithName(funcName string) func() context.Context {
532
	return func() context.Context {
533
		_, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), funcName, funcName)
534
		return ctx
535
	}
536
}
537

538
func getBufSizes(size int) ec.BufferSizes {
539
	sizes, _ := ec.GetBufferSizes(size, codemode.EC6P6.Tactic())
540
	return sizes
541
}
542

543
func dataEqual(exp, act []byte) bool {
544
	return crc32.ChecksumIEEE(exp) == crc32.ChecksumIEEE(act)
545
}
546

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.