cubefs

Форк
0
/
blobstore_client_test.go 
220 строк · 5.3 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package blobstore
16

17
import (
18
	"context"
19
	"encoding/json"
20
	"fmt"
21
	"hash/crc32"
22
	"io"
23
	"net/http"
24
	"net/http/httptest"
25
	"strconv"
26
	"testing"
27

28
	"github.com/cubefs/cubefs/blobstore/api/access"
29
	"github.com/cubefs/cubefs/blobstore/common/crc32block"
30
	"github.com/cubefs/cubefs/blobstore/util/bytespool"
31
	cproto "github.com/cubefs/cubefs/proto"
32
	"github.com/stretchr/testify/require"
33
)
34

35
const (
36
	blobSize = 1 << 20
37
)
38

39
var dataCache []byte
40

41
type MockEbsService struct {
42
	service *httptest.Server
43
}
44

45
func NewMockEbsService() *MockEbsService {
46
	dataCache = make([]byte, 1<<25)
47
	mockServer := httptest.NewServer(
48
		http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
49
			if req.URL.Path == "/put" {
50
				putSize := req.URL.Query().Get("size")
51

52
				dataSize, _ := strconv.Atoi(putSize)
53
				size := req.Header.Get("Content-Length")
54
				l, _ := strconv.Atoi(size)
55

56
				w.Header().Set("X-Ack-Crc-Encoded", "1")
57
				w.WriteHeader(http.StatusOK)
58

59
				body := crc32block.NewDecoderReader(req.Body)
60
				dataCache = dataCache[:cap(dataCache)]
61
				dataCache = dataCache[:crc32block.DecodeSizeWithDefualtBlock(int64(l))]
62
				io.ReadFull(body, dataCache)
63

64
				hashesStr := req.URL.Query().Get("hashes")
65
				algsInt, _ := strconv.Atoi(hashesStr)
66
				algs := access.HashAlgorithm(algsInt)
67

68
				hashSumMap := algs.ToHashSumMap()
69
				for alg := range hashSumMap {
70
					hasher := alg.ToHasher()
71
					hasher.Write(dataCache)
72
					hashSumMap[alg] = hasher.Sum(nil)
73
				}
74

75
				loc := access.Location{Size: uint64(dataSize)}
76
				fillCrc(&loc)
77
				resp := access.PutResp{
78
					Location:   loc,
79
					HashSumMap: hashSumMap,
80
				}
81
				b, _ := json.Marshal(resp)
82
				w.Write(b)
83

84
			} else if req.URL.Path == "/get" {
85
				var args access.GetArgs
86
				requestBody(req, &args)
87
				if !verifyCrc(&args.Location) {
88
					w.WriteHeader(http.StatusForbidden)
89
					return
90
				}
91

92
				data := make([]byte, args.ReadSize)
93
				w.Header().Set("Content-Length", strconv.Itoa(len(data)))
94
				w.WriteHeader(http.StatusOK)
95
				w.Write(data)
96

97
			} else if req.URL.Path == "/delete" {
98
				args := access.DeleteArgs{}
99
				requestBody(req, &args)
100
				if !args.IsValid() {
101
					w.WriteHeader(http.StatusBadRequest)
102
					return
103
				}
104
				for _, loc := range args.Locations {
105
					if !verifyCrc(&loc) {
106
						w.WriteHeader(http.StatusBadRequest)
107
						return
108
					}
109
				}
110

111
				b, _ := json.Marshal(access.DeleteResp{})
112
				w.Header().Set("Content-Type", "application/json")
113
				w.Header().Set("Content-Length", strconv.Itoa(len(b)))
114
				w.WriteHeader(http.StatusOK)
115
				w.Write(b)
116

117
			} else {
118
				w.WriteHeader(http.StatusOK)
119
			}
120
		}))
121

122
	return &MockEbsService{
123
		service: mockServer,
124
	}
125
}
126

127
func requestBody(req *http.Request, val interface{}) {
128
	l := req.Header.Get("Content-Length")
129
	size, _ := strconv.Atoi(l)
130
	data := make([]byte, size)
131
	io.ReadFull(req.Body, data)
132
	json.Unmarshal(data, val)
133
}
134

135
func calcCrc(loc *access.Location) (uint32, error) {
136
	crcWriter := crc32.New(crc32.IEEETable)
137

138
	buf := bytespool.Alloc(1024)
139
	defer bytespool.Free(buf)
140

141
	n := loc.Encode2(buf)
142
	if n < 4 {
143
		return 0, fmt.Errorf("no enough bytes(%d) fill into buf", n)
144
	}
145

146
	if _, err := crcWriter.Write(buf[4:n]); err != nil {
147
		return 0, fmt.Errorf("fill crc %s", err.Error())
148
	}
149

150
	return crcWriter.Sum32(), nil
151
}
152

153
func fillCrc(loc *access.Location) error {
154
	crc, err := calcCrc(loc)
155
	if err != nil {
156
		return err
157
	}
158
	loc.Crc = crc
159
	return nil
160
}
161

162
func verifyCrc(loc *access.Location) bool {
163
	crc, err := calcCrc(loc)
164
	if err != nil {
165
		return false
166
	}
167
	return loc.Crc == crc
168
}
169

170
func TestEbsClient_Write_Read(t *testing.T) {
171
	cfg := access.Config{}
172
	mockServer := NewMockEbsService()
173
	cfg.PriorityAddrs = []string{mockServer.service.URL}
174
	cfg.ConnMode = access.QuickConnMode
175
	cfg.MaxSizePutOnce = 1 << 20
176
	defer mockServer.service.Close()
177

178
	blobStoreClient, err := NewEbsClient(cfg)
179
	if err != nil {
180
		panic(err)
181
	}
182
	testCases := []struct {
183
		size int
184
	}{
185
		{1},
186
		{1023},
187
		{1 << 10},
188
		{1 << 20},
189
	}
190
	for _, tc := range testCases {
191
		data := make([]byte, tc.size)
192
		ctx := context.Background()
193
		location, err := blobStoreClient.Write(ctx, "testVol", data, uint32(tc.size))
194
		require.Exactly(t, nil, err)
195

196
		// read prepare
197
		blobs := make([]cproto.Blob, 0)
198
		for _, info := range location.Blobs {
199
			blob := cproto.Blob{
200
				MinBid: uint64(info.MinBid),
201
				Count:  uint64(info.Count),
202
				Vid:    uint64(info.Vid),
203
			}
204
			blobs = append(blobs, blob)
205
		}
206
		oek := cproto.ObjExtentKey{
207
			Cid:      uint64(location.ClusterID),
208
			CodeMode: uint8(location.CodeMode),
209
			Size:     location.Size,
210
			BlobSize: location.BlobSize,
211
			Blobs:    blobs,
212
			BlobsLen: uint32(len(blobs)),
213
			Crc:      location.Crc,
214
		}
215
		buf := make([]byte, oek.Size)
216
		read, err := blobStoreClient.Read(ctx, "", buf, 0, oek.Size, oek)
217
		require.NoError(t, err)
218
		require.Exactly(t, tc.size, read)
219
	}
220
}
221

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.