1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
28
"github.com/cubefs/cubefs/blobstore/api/access"
29
"github.com/cubefs/cubefs/blobstore/common/crc32block"
30
"github.com/cubefs/cubefs/blobstore/util/bytespool"
31
cproto "github.com/cubefs/cubefs/proto"
32
"github.com/stretchr/testify/require"
41
type MockEbsService struct {
42
service *httptest.Server
45
func NewMockEbsService() *MockEbsService {
46
dataCache = make([]byte, 1<<25)
47
mockServer := httptest.NewServer(
48
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
49
if req.URL.Path == "/put" {
50
putSize := req.URL.Query().Get("size")
52
dataSize, _ := strconv.Atoi(putSize)
53
size := req.Header.Get("Content-Length")
54
l, _ := strconv.Atoi(size)
56
w.Header().Set("X-Ack-Crc-Encoded", "1")
57
w.WriteHeader(http.StatusOK)
59
body := crc32block.NewDecoderReader(req.Body)
60
dataCache = dataCache[:cap(dataCache)]
61
dataCache = dataCache[:crc32block.DecodeSizeWithDefualtBlock(int64(l))]
62
io.ReadFull(body, dataCache)
64
hashesStr := req.URL.Query().Get("hashes")
65
algsInt, _ := strconv.Atoi(hashesStr)
66
algs := access.HashAlgorithm(algsInt)
68
hashSumMap := algs.ToHashSumMap()
69
for alg := range hashSumMap {
70
hasher := alg.ToHasher()
71
hasher.Write(dataCache)
72
hashSumMap[alg] = hasher.Sum(nil)
75
loc := access.Location{Size: uint64(dataSize)}
77
resp := access.PutResp{
79
HashSumMap: hashSumMap,
81
b, _ := json.Marshal(resp)
84
} else if req.URL.Path == "/get" {
85
var args access.GetArgs
86
requestBody(req, &args)
87
if !verifyCrc(&args.Location) {
88
w.WriteHeader(http.StatusForbidden)
92
data := make([]byte, args.ReadSize)
93
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
94
w.WriteHeader(http.StatusOK)
97
} else if req.URL.Path == "/delete" {
98
args := access.DeleteArgs{}
99
requestBody(req, &args)
101
w.WriteHeader(http.StatusBadRequest)
104
for _, loc := range args.Locations {
105
if !verifyCrc(&loc) {
106
w.WriteHeader(http.StatusBadRequest)
111
b, _ := json.Marshal(access.DeleteResp{})
112
w.Header().Set("Content-Type", "application/json")
113
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
114
w.WriteHeader(http.StatusOK)
118
w.WriteHeader(http.StatusOK)
122
return &MockEbsService{
127
func requestBody(req *http.Request, val interface{}) {
128
l := req.Header.Get("Content-Length")
129
size, _ := strconv.Atoi(l)
130
data := make([]byte, size)
131
io.ReadFull(req.Body, data)
132
json.Unmarshal(data, val)
135
func calcCrc(loc *access.Location) (uint32, error) {
136
crcWriter := crc32.New(crc32.IEEETable)
138
buf := bytespool.Alloc(1024)
139
defer bytespool.Free(buf)
141
n := loc.Encode2(buf)
143
return 0, fmt.Errorf("no enough bytes(%d) fill into buf", n)
146
if _, err := crcWriter.Write(buf[4:n]); err != nil {
147
return 0, fmt.Errorf("fill crc %s", err.Error())
150
return crcWriter.Sum32(), nil
153
func fillCrc(loc *access.Location) error {
154
crc, err := calcCrc(loc)
162
func verifyCrc(loc *access.Location) bool {
163
crc, err := calcCrc(loc)
167
return loc.Crc == crc
170
func TestEbsClient_Write_Read(t *testing.T) {
171
cfg := access.Config{}
172
mockServer := NewMockEbsService()
173
cfg.PriorityAddrs = []string{mockServer.service.URL}
174
cfg.ConnMode = access.QuickConnMode
175
cfg.MaxSizePutOnce = 1 << 20
176
defer mockServer.service.Close()
178
blobStoreClient, err := NewEbsClient(cfg)
182
testCases := []struct {
190
for _, tc := range testCases {
191
data := make([]byte, tc.size)
192
ctx := context.Background()
193
location, err := blobStoreClient.Write(ctx, "testVol", data, uint32(tc.size))
194
require.Exactly(t, nil, err)
197
blobs := make([]cproto.Blob, 0)
198
for _, info := range location.Blobs {
200
MinBid: uint64(info.MinBid),
201
Count: uint64(info.Count),
202
Vid: uint64(info.Vid),
204
blobs = append(blobs, blob)
206
oek := cproto.ObjExtentKey{
207
Cid: uint64(location.ClusterID),
208
CodeMode: uint8(location.CodeMode),
210
BlobSize: location.BlobSize,
212
BlobsLen: uint32(len(blobs)),
215
buf := make([]byte, oek.Size)
216
read, err := blobStoreClient.Read(ctx, "", buf, 0, oek.Size, oek)
217
require.NoError(t, err)
218
require.Exactly(t, tc.size, read)