23
"github.com/Shopify/sarama"
24
"github.com/golang/mock/gomock"
25
"github.com/stretchr/testify/require"
27
"github.com/cubefs/cubefs/blobstore/api/blobnode"
28
"github.com/cubefs/cubefs/blobstore/api/clustermgr"
29
"github.com/cubefs/cubefs/blobstore/api/proxy"
30
"github.com/cubefs/cubefs/blobstore/common/codemode"
31
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
32
"github.com/cubefs/cubefs/blobstore/common/kafka"
33
"github.com/cubefs/cubefs/blobstore/common/proto"
34
"github.com/cubefs/cubefs/blobstore/common/rpc"
35
"github.com/cubefs/cubefs/blobstore/proxy/allocator"
36
"github.com/cubefs/cubefs/blobstore/proxy/mock"
37
_ "github.com/cubefs/cubefs/blobstore/testing/nolog"
38
"github.com/cubefs/cubefs/blobstore/util/errors"
44
errCodeMode = errors.New("codeMode not exist")
45
errBidCount = errors.New("count too large")
46
ctx = context.Background()
48
proxyServer *httptest.Server
52
func runMockService(s *Service) string {
54
proxyServer = httptest.NewServer(NewHandler(s))
56
return proxyServer.URL
59
func newMockService(t *testing.T) *Service {
60
ctr := gomock.NewController(t)
62
blobDeleteMgr := mock.NewMockBlobDeleteHandler(ctr)
63
blobDeleteMgr.EXPECT().SendDeleteMsg(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
64
func(ctx context.Context, info *proxy.DeleteArgs) error {
65
if len(info.Blobs) > 1 {
66
return errors.New("fake send delete message failed")
72
shardRepairMgr := mock.NewMockShardRepairHandler(ctr)
73
shardRepairMgr.EXPECT().SendShardRepairMsg(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
74
func(ctx context.Context, info *proxy.ShardRepairArgs) error {
76
return errors.New("fake send shard repair message failed")
81
volumeMgr := mock.NewMockVolumeMgr(ctr)
82
volumeMgr.EXPECT().Alloc(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
83
func(ctx context.Context, args *proxy.AllocVolsArgs) (allocVols []proxy.AllocRet, err error) {
84
if args.CodeMode != codemode.EC6P6 && args.CodeMode != codemode.EC15P12 {
85
return nil, errCodeMode
87
if args.BidCount > 10000 || args.BidCount < 1 {
88
return nil, errBidCount
92
volumeMgr.EXPECT().List(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
93
func(ctx context.Context, codeMode codemode.CodeMode) (vids []proto.Vid, volumes []clustermgr.AllocVolumeInfo, err error) {
94
if codeMode != codemode.EC6P6 && codeMode != codemode.EC15P12 {
95
return nil, nil, errCodeMode
99
volumeMgr.EXPECT().Discard(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
100
func(ctx context.Context, args *proxy.DiscardVolsArgs) (err error) {
101
if args.CodeMode != codemode.EC6P6 && args.CodeMode != codemode.EC15P12 {
107
cacher := mock.NewMockCacher(ctr)
108
cacher.EXPECT().GetVolume(A, A).AnyTimes().DoAndReturn(
109
func(_ context.Context, args *proxy.CacheVolumeArgs) (*proxy.VersionVolume, error) {
110
volume := new(proxy.VersionVolume)
112
volume.Vid = args.Vid
116
return nil, errcode.ErrVolumeNotExist
118
return nil, errors.New("internal error")
120
cacher.EXPECT().GetDisk(A, A).AnyTimes().DoAndReturn(
121
func(_ context.Context, args *proxy.CacheDiskArgs) (*blobnode.DiskInfo, error) {
122
disk := new(blobnode.DiskInfo)
123
if args.DiskID%2 == 0 {
124
disk.DiskID = args.DiskID
128
return nil, errcode.ErrCMDiskNotFound
130
return nil, errors.New("internal error")
132
cacher.EXPECT().Erase(A, A).AnyTimes().DoAndReturn(
133
func(_ context.Context, key string) error {
135
return errors.New("internal error")
142
VolConfig: allocator.VolConfig{
146
shardRepairMgr: shardRepairMgr,
147
blobDeleteMgr: blobDeleteMgr,
148
volumeMgr: volumeMgr,
153
func newClient() rpc.Client {
154
return rpc.NewClient(&rpc.Config{})
157
func TestService_New(t *testing.T) {
159
seedBroker, leader := newBrokersWith2Responses(t)
160
defer seedBroker.Close()
162
cmcli := mock.ProxyMockClusterMgrCli(t)
164
testCases := []struct {
171
BlobDeleteTopic: "test1",
172
ShardRepairTopic: "test2",
173
ShardRepairPriorityTopic: "test3",
174
MsgSender: kafka.ProducerCfg{
175
BrokerList: []string{seedBroker.Addr()},
182
for _, tc := range testCases {
187
func TestService_MQ(t *testing.T) {
188
runMockService(newMockService(t))
191
deleteCases := []struct {
192
args proxy.DeleteArgs
196
args: proxy.DeleteArgs{
198
Blobs: []proxy.BlobDelete{{Bid: 0, Vid: 0}},
203
args: proxy.DeleteArgs{
205
Blobs: []proxy.BlobDelete{{Bid: 0, Vid: 0}},
210
args: proxy.DeleteArgs{
212
Blobs: []proxy.BlobDelete{{Bid: 0, Vid: 0}, {Bid: 1, Vid: 1}},
217
for _, tc := range deleteCases {
218
err := cli.PostWith(ctx, proxyServer.URL+"/deletemsg", nil, tc.args)
219
require.Equal(t, tc.code, rpc.DetectStatusCode(err))
222
shardRepairCases := []struct {
223
args proxy.ShardRepairArgs
227
args: proxy.ShardRepairArgs{
237
args: proxy.ShardRepairArgs{
247
args: proxy.ShardRepairArgs{
257
for _, tc := range shardRepairCases {
258
err := cli.PostWith(ctx, proxyServer.URL+"/repairmsg", nil, tc.args)
259
require.Equal(t, tc.code, rpc.DetectStatusCode(err))
263
func TestService_Allocator(t *testing.T) {
264
url := runMockService(newMockService(t))
266
allocURL := url + "/volume/alloc"
268
args := proxy.AllocVolsArgs{
271
CodeMode: codemode.CodeMode(2),
274
err := cli.PostWith(ctx, allocURL, nil, args)
275
require.NoError(t, err)
278
args := proxy.AllocVolsArgs{
281
CodeMode: codemode.CodeMode(3),
284
err := cli.PostWith(ctx, allocURL, nil, args)
285
require.Error(t, err)
286
require.Equal(t, errCodeMode.Error(), err.Error())
289
args := proxy.AllocVolsArgs{
292
CodeMode: codemode.CodeMode(2),
294
err := cli.PostWith(ctx, allocURL, nil, args)
295
require.Error(t, err)
296
require.Equal(t, errBidCount.Error(), err.Error())
299
args := proxy.AllocVolsArgs{
302
CodeMode: codemode.CodeMode(2),
304
err := cli.PostWith(ctx, allocURL, nil, args)
305
require.Error(t, err)
306
require.Equal(t, errcode.ErrIllegalArguments.Error(), err.Error())
308
discardUrl := url + "/volume/discard"
310
args := proxy.DiscardVolsArgs{
311
CodeMode: codemode.CodeMode(2),
312
Discards: []proto.Vid{},
314
err := cli.PostWith(ctx, discardUrl, nil, args)
315
require.NoError(t, err)
319
err := cli.GetWith(ctx, url+"/volume/list?code_mode=0", nil)
320
require.Error(t, err)
321
require.Equal(t, errcode.ErrIllegalArguments.Error(), err.Error())
324
err := cli.GetWith(ctx, url+"/volume/list?code_mode=3", nil)
325
require.Error(t, err)
326
require.Equal(t, errCodeMode.Error(), err.Error())
329
err := cli.GetWith(ctx, url+"/volume/list?code_mode=2", nil)
330
require.NoError(t, err)
333
err := cli.GetWith(ctx, url+"/volume/list?code_mode=2", nil)
334
require.NoError(t, err)
338
func TestService_CacherVolume(t *testing.T) {
339
url := runMockService(newMockService(t)) + "/cache/volume/"
341
var volume clustermgr.VolumeInfo
343
err := cli.GetWith(ctx, url+"1024", &volume)
344
require.NoError(t, err)
345
require.Equal(t, proto.Vid(1024), volume.Vid)
348
err := cli.GetWith(ctx, url+"111", &volume)
349
require.Error(t, err)
352
err := cli.GetWith(ctx, url+"111?flush=0", &volume)
353
require.Error(t, err)
354
require.Equal(t, 500, rpc.DetectStatusCode(err))
357
err := cli.GetWith(ctx, url+"111?flush=1", &volume)
358
require.Error(t, err)
359
require.Equal(t, errcode.CodeVolumeNotExist, rpc.DetectStatusCode(err))
362
err := cli.GetWith(ctx, url+"111?flush=true", &volume)
363
require.Error(t, err)
364
require.Equal(t, errcode.CodeVolumeNotExist, rpc.DetectStatusCode(err))
368
func TestService_CacherDisk(t *testing.T) {
369
url := runMockService(newMockService(t)) + "/cache/disk/"
371
var disk blobnode.DiskInfo
373
err := cli.GetWith(ctx, url+"1024", &disk)
374
require.NoError(t, err)
375
require.Equal(t, proto.DiskID(1024), disk.DiskID)
378
err := cli.GetWith(ctx, url+"111", nil)
379
require.Error(t, err)
382
err := cli.GetWith(ctx, url+"111?flush=0", nil)
383
require.Error(t, err)
384
require.Equal(t, 500, rpc.DetectStatusCode(err))
387
err := cli.GetWith(ctx, url+"111?flush=true", nil)
388
require.Error(t, err)
389
require.Equal(t, errcode.CodeCMDiskNotFound, rpc.DetectStatusCode(err))
393
func TestService_CacherErase(t *testing.T) {
394
url := runMockService(newMockService(t)) + "/cache/erase/"
397
resp, err := cli.Delete(ctx, url+"volume-10")
398
require.NoError(t, err)
399
require.Equal(t, 200, resp.StatusCode)
403
resp, err := cli.Delete(ctx, url+"ALL")
404
require.NoError(t, err)
405
require.Equal(t, 500, resp.StatusCode)
410
func TestConfigFix(t *testing.T) {
411
testCases := []struct {
415
{cfg: &Config{}, err: ErrIllegalTopic},
416
{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test"}}, err: ErrIllegalTopic},
417
{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1"}}, err: ErrIllegalTopic},
418
{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test", ShardRepairPriorityTopic: "test3"}}, err: ErrIllegalTopic},
419
{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1", ShardRepairPriorityTopic: "test"}}, err: ErrIllegalTopic},
420
{cfg: &Config{MQ: MQConfig{BlobDeleteTopic: "test", ShardRepairTopic: "test1", ShardRepairPriorityTopic: "test3"}}, err: nil},
423
for _, tc := range testCases {
424
err := tc.cfg.checkAndFix()
425
require.Equal(t, true, errors.Is(err, tc.err))
426
tc.cfg.shardRepairCfg()
427
tc.cfg.blobDeleteCfg()
431
func newBrokersWith2Responses(t *testing.T) (*sarama.MockBroker, *sarama.MockBroker) {
432
kafka.DefaultKafkaVersion = sarama.V0_9_0_1
434
seedBroker := sarama.NewMockBrokerAddr(t, 1, "127.0.0.1:0")
435
leader := sarama.NewMockBrokerAddr(t, 2, "127.0.0.1:0")
437
metadataResponse := new(sarama.MetadataResponse)
438
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
439
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, 0)
440
seedBroker.Returns(metadataResponse)
441
seedBroker.Returns(metadataResponse)
443
return seedBroker, leader