cubefs

Форк
0
272 строки · 6.6 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package blobnode
16

17
import (
18
	"context"
19
	"fmt"
20
	"io"
21
	"math"
22
	"net/http"
23
	"strconv"
24

25
	bloberr "github.com/cubefs/cubefs/blobstore/common/errors"
26
	"github.com/cubefs/cubefs/blobstore/common/proto"
27
	"github.com/cubefs/cubefs/blobstore/common/rpc"
28
	"github.com/cubefs/cubefs/blobstore/common/trace"
29
)
30

31
const (
32
	MaxShardSize = math.MaxUint32
33
)
34

35
type ShardStatus uint8
36

37
const (
38
	ShardStatusDefault    ShardStatus = 0x0 // 0 ; 0000
39
	ShardStatusNormal     ShardStatus = 0x1 // 1 ; 0001
40
	ShardStatusMarkDelete ShardStatus = 0x2 // 2 ; 0010
41
)
42

43
const (
44
	ShardDataInline = 0x80 // 1000 0000
45
)
46

47
type PutShardArgs struct {
48
	DiskID proto.DiskID `json:"diskid"`
49
	Vuid   proto.Vuid   `json:"vuid"`
50
	Bid    proto.BlobID `json:"bid"`
51
	Size   int64        `json:"size"`
52
	Type   IOType       `json:"iotype,omitempty"`
53
	Body   io.Reader    `json:"-"`
54
}
55

56
type PutShardRet struct {
57
	Crc uint32 `json:"crc"`
58
}
59

60
func (c *client) PutShard(ctx context.Context, host string, args *PutShardArgs) (crc uint32, err error) {
61
	if args.Size > MaxShardSize {
62
		err = bloberr.ErrShardSizeTooLarge
63
		return
64
	}
65
	if !args.Type.IsValid() {
66
		err = bloberr.ErrInvalidParam
67
		return
68
	}
69

70
	if !IsValidDiskID(args.DiskID) {
71
		err = bloberr.ErrInvalidDiskId
72
		return
73
	}
74

75
	ret := &PutShardRet{
76
		Crc: proto.InvalidCrc32,
77
	}
78
	urlStr := fmt.Sprintf("%v/shard/put/diskid/%v/vuid/%v/bid/%v/size/%v?iotype=%d",
79
		host, args.DiskID, args.Vuid, args.Bid, args.Size, args.Type)
80
	req, err := http.NewRequest(http.MethodPost, urlStr, args.Body)
81
	if err != nil {
82
		return
83
	}
84
	req.ContentLength = args.Size
85
	err = c.DoWith(ctx, req, ret, rpc.WithCrcEncode())
86
	if err == nil {
87
		crc = ret.Crc
88
	}
89

90
	return
91
}
92

93
type GetShardArgs struct {
94
	DiskID proto.DiskID `json:"diskid"`
95
	Vuid   proto.Vuid   `json:"vuid"`
96
	Bid    proto.BlobID `json:"bid"`
97
	Type   IOType       `json:"iotype,omitempty"`
98
}
99

100
func (c *client) GetShard(ctx context.Context, host string, args *GetShardArgs) (
101
	body io.ReadCloser, shardCrc uint32, err error,
102
) {
103
	if !args.Type.IsValid() {
104
		err = bloberr.ErrInvalidParam
105
		return
106
	}
107

108
	if !IsValidDiskID(args.DiskID) {
109
		err = bloberr.ErrInvalidDiskId
110
		return
111
	}
112

113
	urlStr := fmt.Sprintf("%v/shard/get/diskid/%v/vuid/%v/bid/%v?iotype=%d",
114
		host, args.DiskID, args.Vuid, args.Bid, args.Type)
115

116
	resp, err := c.Get(ctx, urlStr)
117
	if err != nil {
118
		return nil, 0, err
119
	}
120

121
	if resp.StatusCode/100 != 2 {
122
		defer resp.Body.Close()
123
		err = rpc.ParseResponseErr(resp)
124
		return
125
	}
126

127
	if resp.Header.Get("CRC") != "" {
128
		crc, err := strconv.ParseUint(resp.Header.Get("CRC"), 10, 32)
129
		if err != nil {
130
			return nil, proto.InvalidCrc32, err
131
		}
132
		shardCrc = uint32(crc)
133
	}
134

135
	return resp.Body, shardCrc, nil
136
}
137

138
type RangeGetShardArgs struct {
139
	GetShardArgs
140
	Offset int64 `json:"offset"`
141
	Size   int64 `json:"size"`
142
}
143

144
func (c *client) RangeGetShard(ctx context.Context, host string, args *RangeGetShardArgs) (
145
	body io.ReadCloser, shardCrc uint32, err error,
146
) {
147
	span := trace.SpanFromContextSafe(ctx)
148

149
	if !args.Type.IsValid() {
150
		err = bloberr.ErrInvalidParam
151
		return
152
	}
153

154
	if !IsValidDiskID(args.DiskID) {
155
		err = bloberr.ErrInvalidDiskId
156
		return
157
	}
158

159
	urlStr := fmt.Sprintf("%v/shard/get/diskid/%v/vuid/%v/bid/%v?iotype=%d",
160
		host, args.DiskID, args.Vuid, args.Bid, args.Type)
161

162
	req, err := http.NewRequest(http.MethodGet, urlStr, nil)
163
	if err != nil {
164
		span.Errorf("Failed new req. urlStr:%s, err:%v", urlStr, err)
165
		return
166
	}
167

168
	// set http range header
169
	from, to := args.Offset, args.Size+args.Offset
170
	rangeStr := fmt.Sprintf("bytes=%v-%v", from, to-1)
171
	req.Header.Set("Range", rangeStr)
172

173
	resp, err := c.Do(ctx, req)
174
	if err != nil {
175
		span.Errorf("Failed get body, err:%v", err)
176
		return
177
	}
178

179
	if resp.StatusCode/100 != 2 {
180
		defer resp.Body.Close()
181
		err = rpc.ParseResponseErr(resp)
182
		return
183
	}
184

185
	if resp.Header.Get("CRC") != "" {
186
		crc, err := strconv.ParseUint(resp.Header.Get("CRC"), 10, 32)
187
		if err != nil {
188
			return nil, proto.InvalidCrc32, err
189
		}
190
		shardCrc = uint32(crc)
191
	}
192

193
	return resp.Body, shardCrc, nil
194
}
195

196
type DeleteShardArgs struct {
197
	DiskID proto.DiskID `json:"diskid"`
198
	Vuid   proto.Vuid   `json:"vuid"`
199
	Bid    proto.BlobID `json:"bid"`
200
}
201

202
func (c *client) MarkDeleteShard(ctx context.Context, host string, args *DeleteShardArgs) (err error) {
203
	if !IsValidDiskID(args.DiskID) {
204
		err = bloberr.ErrInvalidDiskId
205
		return
206
	}
207

208
	urlStr := fmt.Sprintf("%v/shard/markdelete/diskid/%v/vuid/%v/bid/%v", host, args.DiskID, args.Vuid, args.Bid)
209
	err = c.PostWith(ctx, urlStr, nil, rpc.NoneBody)
210
	return
211
}
212

213
func (c *client) DeleteShard(ctx context.Context, host string, args *DeleteShardArgs) (err error) {
214
	if !IsValidDiskID(args.DiskID) {
215
		err = bloberr.ErrInvalidDiskId
216
		return
217
	}
218

219
	urlStr := fmt.Sprintf("%v/shard/delete/diskid/%v/vuid/%v/bid/%v", host, args.DiskID, args.Vuid, args.Bid)
220
	err = c.PostWith(ctx, urlStr, nil, rpc.NoneBody)
221
	return
222
}
223

224
type StatShardArgs struct {
225
	DiskID proto.DiskID `json:"diskid"`
226
	Vuid   proto.Vuid   `json:"vuid"`
227
	Bid    proto.BlobID `json:"bid"`
228
}
229

230
func (c *client) StatShard(ctx context.Context, host string, args *StatShardArgs) (si *ShardInfo, err error) {
231
	if !IsValidDiskID(args.DiskID) {
232
		err = bloberr.ErrInvalidDiskId
233
		return
234
	}
235

236
	urlStr := fmt.Sprintf("%v/shard/stat/diskid/%v/vuid/%v/bid/%v",
237
		host, args.DiskID, args.Vuid, args.Bid)
238
	si = &ShardInfo{}
239
	err = c.GetWith(ctx, urlStr, si)
240
	return
241
}
242

243
type ListShardsArgs struct {
244
	DiskID   proto.DiskID `json:"diskid"`
245
	Vuid     proto.Vuid   `json:"vuid" `
246
	StartBid proto.BlobID `json:"startbid"`
247
	Status   ShardStatus  `json:"status" `
248
	Count    int          `json:"count" `
249
}
250

251
type ListShardsRet struct {
252
	ShardInfos []*ShardInfo `json:"shard_infos"`
253
	Next       proto.BlobID `json:"next"`
254
}
255

256
func (c *client) ListShards(ctx context.Context, host string, args *ListShardsArgs) (sis []*ShardInfo, next proto.BlobID, err error) {
257
	if !IsValidDiskID(args.DiskID) {
258
		err = bloberr.ErrInvalidDiskId
259
		return
260
	}
261

262
	urlStr := fmt.Sprintf("%v/shard/list/diskid/%v/vuid/%v/startbid/%v/status/%v/count/%v",
263
		host, args.DiskID, args.Vuid, args.StartBid, args.Status, args.Count)
264

265
	listRet := ListShardsRet{}
266
	err = c.GetWith(ctx, urlStr, &listRet)
267
	if err != nil {
268
		return nil, proto.InValidBlobID, err
269
	}
270

271
	return listRet.ShardInfos, listRet.Next, nil
272
}
273

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.