30
"github.com/golang/mock/gomock"
31
"github.com/stretchr/testify/require"
33
"github.com/cubefs/cubefs/blobstore/api/access"
34
"github.com/cubefs/cubefs/blobstore/common/codemode"
35
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
36
"github.com/cubefs/cubefs/blobstore/common/proto"
37
"github.com/cubefs/cubefs/blobstore/common/rpc"
38
"github.com/cubefs/cubefs/blobstore/common/uptoken"
42
ctx = context.Background()
43
_blobSize uint32 = 1 << 20
44
location = &access.Location{
49
Blobs: []access.SliceInfo{{
56
testServer *httptest.Server
60
func runMockService(s *Service) string {
62
testServer = httptest.NewServer(NewHandler(s))
67
func newService() *Service {
68
ctr := gomock.NewController(&testing.T{})
69
s := NewMockStreamHandler(ctr)
71
s.EXPECT().Alloc(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
72
func(ctx context.Context, size uint64, blobSize uint32,
73
assignClusterID proto.ClusterID, codeMode codemode.CodeMode) (*access.Location, error) {
75
return nil, errors.New("fake alloc location")
77
loc := location.Copy()
78
loc.Size = uint64(size)
83
s.EXPECT().PutAt(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
84
gomock.Any()).AnyTimes().DoAndReturn(
85
func(ctx context.Context, rc io.Reader,
86
clusterID proto.ClusterID, vid proto.Vid, bid proto.BlobID, size int64,
87
hasherMap access.HasherMap) error {
89
return errcode.ErrAccessLimited
94
s.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
95
func(ctx context.Context, rc io.Reader, size int64, hasherMap access.HasherMap) (*access.Location, error) {
97
return nil, errors.New("fake put nil body")
99
loc := location.Copy()
100
loc.Size = uint64(size)
105
s.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
106
func(ctx context.Context, w io.Writer, location access.Location, readSize, offset uint64) (func() error, error) {
108
return nil, errors.New("fake get nil body")
110
return func() error { return nil }, nil
112
s.EXPECT().Delete(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
113
func(ctx context.Context, location *access.Location) error {
114
if location.ClusterID >= 10 {
115
return errors.New("fake delete error with cluster")
116
} else if location.ClusterID == 1 && location.Crc > 0 && location.Size < 1024 {
117
return errors.New("fake delete error")
124
limiter: NewLimiter(LimitConfig{
125
NameRps: map[string]int{
134
func newClient() rpc.Client {
135
return rpc.NewClient(&rpc.Config{})
138
func TestAccessServiceNew(t *testing.T) {
139
runMockService(newService())
142
func TestAccessServiceAlloc(t *testing.T) {
143
host := runMockService(newService())
146
url := func() string {
147
return fmt.Sprintf("%s/alloc", host)
149
args := access.AllocArgs{
156
resp := &access.AllocResp{}
157
err := cli.PostWith(ctx, url(), resp, args)
158
assertErrorCode(t, 400, err)
162
resp := &access.AllocResp{}
163
err := cli.PostWith(ctx, url(), resp, args)
164
assertErrorCode(t, 500, err)
168
resp := &access.AllocResp{}
169
err := cli.PostWith(ctx, url(), resp, args)
170
require.NoError(t, err)
171
require.Equal(t, uint64(1024), resp.Location.Size)
174
args.Size = uint64(_blobSize)
175
resp := &access.AllocResp{}
176
err := cli.PostWith(ctx, url(), resp, args)
177
require.NoError(t, err)
178
require.Equal(t, uint64(_blobSize), resp.Location.Size)
181
args.Size = uint64(_blobSize) + 1
182
resp := &access.AllocResp{}
183
err := cli.PostWith(ctx, url(), resp, args)
184
require.NoError(t, err)
185
require.Equal(t, uint64(_blobSize)+1, resp.Location.Size)
189
func TestAccessServicePutAt(t *testing.T) {
190
host := runMockService(newService())
193
url := func(size int64, token string) string {
194
return fmt.Sprintf("%s/putat?clusterid=1&volumeid=1111&blobid=111&size=%d&hashes=14&token=%s",
198
for _, method := range []string{http.MethodPut, http.MethodPost} {
199
args := access.PutArgs{
203
buf := make([]byte, args.Size)
204
req, _ := http.NewRequest(method, url(args.Size, ""), bytes.NewReader(buf))
205
resp := &access.PutAtResp{}
206
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
207
assertErrorCode(t, 400, err)
211
buf := make([]byte, args.Size)
212
req, _ := http.NewRequest(method, url(args.Size, ""), bytes.NewReader(buf))
213
resp := &access.PutAtResp{}
214
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
215
assertErrorCode(t, 400, err)
219
buf := make([]byte, args.Size)
220
resp := &access.PutAtResp{}
221
req, _ := http.NewRequest(method, url(args.Size, "c1fdcecaacbfafd86f0b00"), bytes.NewReader(buf))
222
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
223
assertErrorCode(t, 552, err)
227
buf := make([]byte, args.Size)
228
req, _ := http.NewRequest(method, url(args.Size, "8238436d05ecf2366f0b00"), bytes.NewReader(buf))
229
resp := &access.PutAtResp{}
230
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
231
require.NoError(t, err)
233
req, _ = http.NewRequest(method, url(args.Size, "1238436d05ecf2366f0b00"), bytes.NewReader(buf))
234
err = cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
235
assertErrorCode(t, 400, err)
240
func TestAccessServicePut(t *testing.T) {
241
host := runMockService(newService())
244
url := func(size int64, hashes access.HashAlgorithm) string {
245
return fmt.Sprintf("%s/put?size=%d&hashes=%d", host, size, hashes)
248
for _, method := range []string{http.MethodPut, http.MethodPost} {
249
args := access.PutArgs{
255
req, _ := http.NewRequest(method, fmt.Sprintf("%s/put?size=size", host), args.Body)
256
resp := &access.PutResp{}
257
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
258
assertErrorCode(t, 400, err)
261
req, _ := http.NewRequest(method, url(args.Size, args.Hashes), args.Body)
262
resp := &access.PutResp{}
263
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
264
assertErrorCode(t, 400, err)
267
args.Body = bytes.NewReader(make([]byte, 1023))
268
req, _ := http.NewRequest(method, url(1023, args.Hashes), args.Body)
269
resp := &access.PutResp{}
270
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
271
assertErrorCode(t, 500, err)
274
args.Body = bytes.NewReader(make([]byte, 1024))
275
req, _ := http.NewRequest(method, url(1024, args.Hashes), args.Body)
276
resp := &access.PutResp{}
277
err := cli.DoWith(ctx, req, resp, rpc.WithCrcEncode())
278
require.NoError(t, err)
279
require.Equal(t, uint64(1024), resp.Location.Size)
284
func TestAccessServiceGet(t *testing.T) {
285
host := runMockService(newService())
288
url := func() string {
289
return fmt.Sprintf("%s/get", host)
291
args := access.GetArgs{
292
Location: location.Copy(),
298
resp, err := cli.Post(ctx, url(), args)
299
require.NoError(t, err)
301
require.Equal(t, 400, resp.StatusCode, resp.Status)
304
args.Location.Size = 1023
306
fillCrc(&args.Location)
307
resp, err := cli.Post(ctx, url(), args)
308
require.NoError(t, err)
310
require.Equal(t, 500, resp.StatusCode, resp.Status)
313
args.Location.Size = 1024
315
fillCrc(&args.Location)
316
resp, err := cli.Post(ctx, url(), args)
317
require.NoError(t, err)
319
require.Equal(t, 200, resp.StatusCode, resp.Status)
322
args.Location.Size = 10240
325
fillCrc(&args.Location)
326
resp, err := cli.Post(ctx, url(), args)
327
require.NoError(t, err)
329
require.Equal(t, 206, resp.StatusCode, resp.Status)
333
func TestAccessServiceDelete(t *testing.T) {
334
host := runMockService(newService())
337
url := fmt.Sprintf("%s/delete", host)
338
deleteRequest := func(args interface{}) (code int, ret access.DeleteResp, err error) {
339
resp, err := cli.Post(ctx, url, args)
343
defer resp.Body.Close()
345
code = resp.StatusCode
347
size, _ := strconv.Atoi(resp.Header.Get("Content-Length"))
348
buf := make([]byte, size)
349
_, err = io.ReadFull(resp.Body, buf)
353
if err = json.Unmarshal(buf, &ret); err != nil {
358
err = rpc.NewError(code, "Code", fmt.Errorf("httpcode: %d", code))
364
args := access.DeleteArgs{
365
Locations: []access.Location{location.Copy()},
368
code, _, err := deleteRequest(access.DeleteArgs{})
369
require.Error(t, err)
370
require.Equal(t, 400, code)
373
code, _, err := deleteRequest(args)
374
require.Error(t, err)
375
require.Equal(t, 400, code)
378
fillCrc(&args.Locations[0])
379
code, resp, err := deleteRequest(args)
380
require.NoError(t, err)
381
require.Equal(t, 226, code)
382
require.Equal(t, args.Locations[0], resp.FailedLocations[0])
385
loc := &args.Locations[0]
388
code, _, err := deleteRequest(args)
389
require.NoError(t, err)
390
require.Equal(t, 200, code)
393
loc := location.Copy()
396
locs := make([]access.Location, access.MaxDeleteLocations)
397
for idx := range locs {
400
code, resp, err := deleteRequest(access.DeleteArgs{Locations: locs})
401
require.NoError(t, err)
402
require.Equal(t, 200, code)
403
require.Equal(t, 0, len(resp.FailedLocations))
406
loc := location.Copy()
409
locs := make([]access.Location, access.MaxDeleteLocations+1)
410
for idx := range locs {
413
code, _, err := deleteRequest(access.DeleteArgs{Locations: locs})
414
require.Error(t, err)
415
require.Equal(t, 400, code)
418
loc := location.Copy()
420
loc.ClusterID = proto.ClusterID(11)
422
code, resp, err := deleteRequest(access.DeleteArgs{Locations: []access.Location{loc}})
423
require.NoError(t, err)
424
require.Equal(t, 226, code)
425
require.Equal(t, 1, len(resp.FailedLocations))
426
require.Equal(t, proto.ClusterID(11), resp.FailedLocations[0].ClusterID)
429
locs := make([]access.Location, access.MaxDeleteLocations)
430
for idx := range locs {
431
loc := location.Copy()
433
loc.ClusterID = proto.ClusterID(idx % 11)
437
code, resp, err := deleteRequest(access.DeleteArgs{Locations: locs})
438
require.NoError(t, err)
439
require.Equal(t, 226, code)
440
require.Equal(t, 93, len(resp.FailedLocations))
444
func TestAccessServiceDeleteBlob(t *testing.T) {
445
host := runMockService(newService())
448
url := func(size int64, token string) string {
449
return fmt.Sprintf("%s/deleteblob?clusterid=1&volumeid=1111&blobid=111&size=%d&token=%s",
453
method := http.MethodDelete
454
args := access.PutArgs{
458
req, _ := http.NewRequest(method, url(args.Size, "xxx"), nil)
459
err := cli.DoWith(ctx, req, nil)
460
assertErrorCode(t, 400, err)
463
req, _ := http.NewRequest(method, url(args.Size, ""), nil)
464
err := cli.DoWith(ctx, req, nil)
465
assertErrorCode(t, 400, err)
469
req, _ := http.NewRequest(method, url(args.Size, ""), nil)
470
err := cli.DoWith(ctx, req, nil)
471
assertErrorCode(t, 400, err)
475
req, _ := http.NewRequest(method, url(args.Size, "xxx"), nil)
476
err := cli.DoWith(ctx, req, nil)
477
assertErrorCode(t, 400, err)
481
req, _ := http.NewRequest(method, url(args.Size, "c1fdcecaacbfafd86f0b00"), nil)
482
err := cli.DoWith(ctx, req, nil)
483
require.NoError(t, err)
487
url := func() string {
488
return fmt.Sprintf("%s/deleteblob?clusterid=11&volumeid=1111&blobid=111&size=%d&token=%s",
489
host, 1024, "f034db4503d5dc3f6f0100")
491
req, _ := http.NewRequest(method, url(), nil)
492
err := cli.DoWith(ctx, req, nil)
493
assertErrorCode(t, 500, err)
497
func TestAccessServiceSign(t *testing.T) {
498
host := runMockService(newService())
501
url := func() string {
502
return fmt.Sprintf("%s/sign", host)
504
args := access.SignArgs{
505
Locations: []access.Location{location.Copy()},
506
Location: location.Copy(),
509
resp := &access.SignResp{}
510
err := cli.PostWith(ctx, url(), resp, access.SignArgs{})
511
assertErrorCode(t, 400, err)
514
resp := &access.SignResp{}
515
err := cli.PostWith(ctx, url(), resp, args)
516
assertErrorCode(t, 400, err)
519
fillCrc(&args.Locations[0])
520
resp := &access.SignResp{}
521
err := cli.PostWith(ctx, url(), resp, args)
522
require.NoError(t, err)
526
func assertErrorCode(t *testing.T, code int, err error) {
527
require.Error(t, err)
528
codeActual := rpc.DetectStatusCode(err)
529
require.Equal(t, code, codeActual, err.Error())
532
func TestAccessServiceTokens(t *testing.T) {
533
skey := tokenSecretKeys[0][:]
534
checker := func(loc *access.Location, tokens []string) {
536
require.Equal(t, 0, len(tokens))
540
hasMultiBlobs := loc.Size >= uint64(loc.BlobSize)
541
lastSize := uint32(loc.Size % uint64(loc.BlobSize))
543
require.Equal(t, 1, len(tokens))
545
token := uptoken.DecodeToken(tokens[0])
547
for bid := blob.MinBid - 100; bid < blob.MinBid+100; bid++ {
548
require.False(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
550
require.True(t, token.IsValid(loc.ClusterID, blob.Vid, blob.MinBid, lastSize, skey))
555
require.Equal(t, len(loc.Blobs), len(tokens))
556
for idx, blob := range loc.Blobs {
557
token := uptoken.DecodeToken(tokens[idx])
558
for ii := uint32(0); ii < 100; ii++ {
559
bid := blob.MinBid - proto.BlobID(ii) - 1
560
require.False(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
561
bid = blob.MinBid + proto.BlobID(blob.Count+ii)
562
require.False(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
564
for ii := uint32(0); ii < blob.Count; ii++ {
565
bid := blob.MinBid + proto.BlobID(ii)
566
require.True(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
572
require.Equal(t, len(loc.Blobs)+1, len(tokens))
573
for ii := 0; ii < len(loc.Blobs)-1; ii++ {
574
token := uptoken.DecodeToken(tokens[ii])
575
blob := loc.Blobs[ii]
576
for ii := uint32(0); ii < blob.Count; ii++ {
577
bid := blob.MinBid + proto.BlobID(ii)
578
require.True(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
582
token := uptoken.DecodeToken(tokens[len(loc.Blobs)-1])
583
blob := loc.Blobs[len(loc.Blobs)-1]
584
for ii := uint32(0); ii < 100; ii++ {
585
bid := blob.MinBid - proto.BlobID(ii) - 1
586
require.False(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
587
bid = blob.MinBid + proto.BlobID(blob.Count+ii) - 1
588
require.False(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
590
for ii := uint32(0); ii < blob.Count-1; ii++ {
591
bid := blob.MinBid + proto.BlobID(ii)
592
require.True(t, token.IsValid(loc.ClusterID, blob.Vid, bid, loc.BlobSize, skey))
595
token = uptoken.DecodeToken(tokens[len(loc.Blobs)])
596
lastbid := blob.MinBid + proto.BlobID(blob.Count) - 1
597
require.True(t, token.IsValid(loc.ClusterID, blob.Vid, lastbid, lastSize, skey))
601
loc := &access.Location{
604
Blobs: []access.SliceInfo{},
606
checker(loc, genTokens(loc))
609
loc := &access.Location{
612
Blobs: []access.SliceInfo{
613
{MinBid: 100, Vid: 1000, Count: 1},
616
checker(loc, genTokens(loc))
619
loc := &access.Location{
622
Blobs: []access.SliceInfo{
623
{MinBid: 100, Vid: 1000, Count: 1},
626
checker(loc, genTokens(loc))
629
loc := &access.Location{
632
Blobs: []access.SliceInfo{
633
{MinBid: 100, Vid: 1000, Count: 2},
636
checker(loc, genTokens(loc))
639
loc := &access.Location{
642
Blobs: []access.SliceInfo{
643
{MinBid: 100, Vid: 1000, Count: 2},
646
checker(loc, genTokens(loc))
649
loc := &access.Location{
652
Blobs: []access.SliceInfo{
653
{MinBid: 100, Vid: 1000, Count: 4},
654
{MinBid: 200, Vid: 1000, Count: 6},
657
checker(loc, genTokens(loc))
660
loc := &access.Location{
663
Blobs: []access.SliceInfo{
664
{MinBid: 100, Vid: 1000, Count: 1},
665
{MinBid: 200, Vid: 1000, Count: 1},
668
checker(loc, genTokens(loc))
671
loc := &access.Location{
674
Blobs: []access.SliceInfo{
675
{MinBid: 100, Vid: 1000, Count: 5},
676
{MinBid: 200, Vid: 1000, Count: 6},
679
checker(loc, genTokens(loc))
683
func TestAccessServiceLimited(t *testing.T) {
684
host := runMockService(newService())
687
url := func() string {
688
return fmt.Sprintf("%s/alloc", host)
690
args := access.AllocArgs{Size: 1024}
691
var wg sync.WaitGroup
693
for i := 0; i < 100; i++ {
696
resp := &access.AllocResp{}
697
err := cli.PostWith(ctx, url(), resp, args)
699
assertErrorCode(t, errcode.CodeAccessLimited, err)
701
require.Equal(t, uint64(1024), resp.Location.Size)