1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
25
bloberr "github.com/cubefs/cubefs/blobstore/common/errors"
26
"github.com/cubefs/cubefs/blobstore/common/proto"
27
"github.com/cubefs/cubefs/blobstore/common/rpc"
28
"github.com/cubefs/cubefs/blobstore/common/trace"
32
MaxShardSize = math.MaxUint32
38
ShardStatusDefault ShardStatus = 0x0 // 0 ; 0000
39
ShardStatusNormal ShardStatus = 0x1 // 1 ; 0001
40
ShardStatusMarkDelete ShardStatus = 0x2 // 2 ; 0010
44
ShardDataInline = 0x80 // 1000 0000
47
type PutShardArgs struct {
48
DiskID proto.DiskID `json:"diskid"`
49
Vuid proto.Vuid `json:"vuid"`
50
Bid proto.BlobID `json:"bid"`
51
Size int64 `json:"size"`
52
Type IOType `json:"iotype,omitempty"`
53
Body io.Reader `json:"-"`
56
type PutShardRet struct {
57
Crc uint32 `json:"crc"`
60
func (c *client) PutShard(ctx context.Context, host string, args *PutShardArgs) (crc uint32, err error) {
61
if args.Size > MaxShardSize {
62
err = bloberr.ErrShardSizeTooLarge
65
if !args.Type.IsValid() {
66
err = bloberr.ErrInvalidParam
70
if !IsValidDiskID(args.DiskID) {
71
err = bloberr.ErrInvalidDiskId
76
Crc: proto.InvalidCrc32,
78
urlStr := fmt.Sprintf("%v/shard/put/diskid/%v/vuid/%v/bid/%v/size/%v?iotype=%d",
79
host, args.DiskID, args.Vuid, args.Bid, args.Size, args.Type)
80
req, err := http.NewRequest(http.MethodPost, urlStr, args.Body)
84
req.ContentLength = args.Size
85
err = c.DoWith(ctx, req, ret, rpc.WithCrcEncode())
93
type GetShardArgs struct {
94
DiskID proto.DiskID `json:"diskid"`
95
Vuid proto.Vuid `json:"vuid"`
96
Bid proto.BlobID `json:"bid"`
97
Type IOType `json:"iotype,omitempty"`
100
func (c *client) GetShard(ctx context.Context, host string, args *GetShardArgs) (
101
body io.ReadCloser, shardCrc uint32, err error,
103
if !args.Type.IsValid() {
104
err = bloberr.ErrInvalidParam
108
if !IsValidDiskID(args.DiskID) {
109
err = bloberr.ErrInvalidDiskId
113
urlStr := fmt.Sprintf("%v/shard/get/diskid/%v/vuid/%v/bid/%v?iotype=%d",
114
host, args.DiskID, args.Vuid, args.Bid, args.Type)
116
resp, err := c.Get(ctx, urlStr)
121
if resp.StatusCode/100 != 2 {
122
defer resp.Body.Close()
123
err = rpc.ParseResponseErr(resp)
127
if resp.Header.Get("CRC") != "" {
128
crc, err := strconv.ParseUint(resp.Header.Get("CRC"), 10, 32)
130
return nil, proto.InvalidCrc32, err
132
shardCrc = uint32(crc)
135
return resp.Body, shardCrc, nil
138
type RangeGetShardArgs struct {
140
Offset int64 `json:"offset"`
141
Size int64 `json:"size"`
144
func (c *client) RangeGetShard(ctx context.Context, host string, args *RangeGetShardArgs) (
145
body io.ReadCloser, shardCrc uint32, err error,
147
span := trace.SpanFromContextSafe(ctx)
149
if !args.Type.IsValid() {
150
err = bloberr.ErrInvalidParam
154
if !IsValidDiskID(args.DiskID) {
155
err = bloberr.ErrInvalidDiskId
159
urlStr := fmt.Sprintf("%v/shard/get/diskid/%v/vuid/%v/bid/%v?iotype=%d",
160
host, args.DiskID, args.Vuid, args.Bid, args.Type)
162
req, err := http.NewRequest(http.MethodGet, urlStr, nil)
164
span.Errorf("Failed new req. urlStr:%s, err:%v", urlStr, err)
168
// set http range header
169
from, to := args.Offset, args.Size+args.Offset
170
rangeStr := fmt.Sprintf("bytes=%v-%v", from, to-1)
171
req.Header.Set("Range", rangeStr)
173
resp, err := c.Do(ctx, req)
175
span.Errorf("Failed get body, err:%v", err)
179
if resp.StatusCode/100 != 2 {
180
defer resp.Body.Close()
181
err = rpc.ParseResponseErr(resp)
185
if resp.Header.Get("CRC") != "" {
186
crc, err := strconv.ParseUint(resp.Header.Get("CRC"), 10, 32)
188
return nil, proto.InvalidCrc32, err
190
shardCrc = uint32(crc)
193
return resp.Body, shardCrc, nil
196
type DeleteShardArgs struct {
197
DiskID proto.DiskID `json:"diskid"`
198
Vuid proto.Vuid `json:"vuid"`
199
Bid proto.BlobID `json:"bid"`
202
func (c *client) MarkDeleteShard(ctx context.Context, host string, args *DeleteShardArgs) (err error) {
203
if !IsValidDiskID(args.DiskID) {
204
err = bloberr.ErrInvalidDiskId
208
urlStr := fmt.Sprintf("%v/shard/markdelete/diskid/%v/vuid/%v/bid/%v", host, args.DiskID, args.Vuid, args.Bid)
209
err = c.PostWith(ctx, urlStr, nil, rpc.NoneBody)
213
func (c *client) DeleteShard(ctx context.Context, host string, args *DeleteShardArgs) (err error) {
214
if !IsValidDiskID(args.DiskID) {
215
err = bloberr.ErrInvalidDiskId
219
urlStr := fmt.Sprintf("%v/shard/delete/diskid/%v/vuid/%v/bid/%v", host, args.DiskID, args.Vuid, args.Bid)
220
err = c.PostWith(ctx, urlStr, nil, rpc.NoneBody)
224
type StatShardArgs struct {
225
DiskID proto.DiskID `json:"diskid"`
226
Vuid proto.Vuid `json:"vuid"`
227
Bid proto.BlobID `json:"bid"`
230
func (c *client) StatShard(ctx context.Context, host string, args *StatShardArgs) (si *ShardInfo, err error) {
231
if !IsValidDiskID(args.DiskID) {
232
err = bloberr.ErrInvalidDiskId
236
urlStr := fmt.Sprintf("%v/shard/stat/diskid/%v/vuid/%v/bid/%v",
237
host, args.DiskID, args.Vuid, args.Bid)
239
err = c.GetWith(ctx, urlStr, si)
243
type ListShardsArgs struct {
244
DiskID proto.DiskID `json:"diskid"`
245
Vuid proto.Vuid `json:"vuid" `
246
StartBid proto.BlobID `json:"startbid"`
247
Status ShardStatus `json:"status" `
248
Count int `json:"count" `
251
type ListShardsRet struct {
252
ShardInfos []*ShardInfo `json:"shard_infos"`
253
Next proto.BlobID `json:"next"`
256
func (c *client) ListShards(ctx context.Context, host string, args *ListShardsArgs) (sis []*ShardInfo, next proto.BlobID, err error) {
257
if !IsValidDiskID(args.DiskID) {
258
err = bloberr.ErrInvalidDiskId
262
urlStr := fmt.Sprintf("%v/shard/list/diskid/%v/vuid/%v/startbid/%v/status/%v/count/%v",
263
host, args.DiskID, args.Vuid, args.StartBid, args.Status, args.Count)
265
listRet := ListShardsRet{}
266
err = c.GetWith(ctx, urlStr, &listRet)
268
return nil, proto.InValidBlobID, err
271
return listRet.ShardInfos, listRet.Next, nil