1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
24
"github.com/cubefs/cubefs/util/btree"
25
"github.com/cubefs/cubefs/util/log"
28
// Part defined necessary fields for multipart part management.
37
func (m *Part) Equal(o *Part) bool {
38
return m.ID == o.ID &&
44
func (m Part) Bytes() ([]byte, error) {
46
buffer := bytes.NewBuffer(nil)
47
tmp := make([]byte, binary.MaxVarintLen64)
50
n = binary.PutUvarint(tmp, uint64(m.ID))
51
if _, err = buffer.Write(tmp[:n]); err != nil {
55
n = binary.PutVarint(tmp, m.UploadTime.UnixNano())
56
if _, err = buffer.Write(tmp[:n]); err != nil {
60
n = binary.PutUvarint(tmp, uint64(len(m.MD5)))
61
if _, err = buffer.Write(tmp[:n]); err != nil {
64
if _, err = buffer.WriteString(m.MD5); err != nil {
68
n = binary.PutUvarint(tmp, m.Size)
69
if _, err = buffer.Write(tmp[:n]); err != nil {
73
n = binary.PutUvarint(tmp, m.Inode)
74
if _, err = buffer.Write(tmp[:n]); err != nil {
77
return buffer.Bytes(), nil
80
func PartFromBytes(raw []byte) *Part {
84
u64ID, n = binary.Uvarint(raw)
87
var uploadTimeI64 int64
88
uploadTimeI64, n = binary.Varint(raw[offset:])
92
md5Len, n = binary.Uvarint(raw[offset:])
94
md5Content := string(raw[offset : offset+int(md5Len)])
98
sizeU64, n = binary.Uvarint(raw[offset:])
102
inode, n = binary.Uvarint(raw[offset:])
106
UploadTime: time.Unix(0, uploadTimeI64),
116
func (m Parts) Len() int {
120
func (m Parts) sort() {
121
sort.SliceStable(m, func(i, j int) bool {
122
return m[i].ID < m[j].ID
126
func (m *Parts) Hash(part *Part) (has bool) {
127
i := sort.Search(len(*m), func(i int) bool {
128
return (*m)[i].ID >= part.ID
130
has = i < len(*m) && (*m)[i].ID == part.ID
134
func (m *Parts) UpdateOrStore(part *Part) (oldInode uint64, update, conflict bool) {
135
i := sort.Search(len(*m), func(i int) bool {
136
return (*m)[i].ID >= part.ID
138
if i >= 0 && i < len(*m) && (*m)[i].ID == part.ID {
140
oldInode = oldPart.Inode
141
if part.Inode == oldInode {
142
log.LogWarnf("Request already success,the same partinode[%d] must not be overwritten.", oldInode)
145
if part.UploadTime.Before(oldPart.UploadTime) {
146
log.LogWarnf("Request part putTime[%v] is less than old part putTime[%v], partNumber[%v]",
147
part.UploadTime.UnixNano(), oldPart.UploadTime.UnixNano(), part.ID)
155
*m = append(*m, part)
162
func (m *Parts) Insert(part *Part, replace bool) (success bool) {
163
i := sort.Search(len(*m), func(i int) bool {
164
return (*m)[i].ID >= part.ID
166
if i < len(*m) && (*m)[i].ID == part.ID {
173
*m = append(*m, part)
178
func (m *Parts) Remove(id uint16) {
179
i := sort.Search(len(*m), func(i int) bool {
180
return (*m)[i].ID >= id
182
if i < len(*m) && (*m)[i].ID == id {
184
*m = append((*m)[:i], (*m)[i+1:]...)
191
func (m Parts) Search(id uint16) (part *Part, found bool) {
192
i := sort.Search(len(m), func(i int) bool {
195
if i < len(m) && m[i].ID == id {
201
func (m Parts) Bytes() ([]byte, error) {
204
buffer := bytes.NewBuffer(nil)
205
tmp := make([]byte, binary.MaxVarintLen64)
206
n = binary.PutUvarint(tmp, uint64(len(m)))
207
if _, err = buffer.Write(tmp[:n]); err != nil {
211
for _, p := range m {
212
marshaled, err = p.Bytes()
217
n = binary.PutUvarint(tmp, uint64(len(marshaled)))
218
if _, err = buffer.Write(tmp[:n]); err != nil {
222
if _, err = buffer.Write(marshaled); err != nil {
226
return buffer.Bytes(), nil
229
func PartsFromBytes(raw []byte) Parts {
231
var numPartsU64 uint64
232
numPartsU64, n = binary.Uvarint(raw)
234
muParts := make([]*Part, int(numPartsU64))
235
for i := 0; i < int(numPartsU64); i++ {
236
var partLengthU64 uint64
237
partLengthU64, n = binary.Uvarint(raw[offset:])
239
part := PartFromBytes(raw[offset : offset+int(partLengthU64)])
241
offset += int(partLengthU64)
246
type MultipartExtend map[string]string
248
func NewMultipartExtend() MultipartExtend {
249
return make(map[string]string)
252
func (me MultipartExtend) Bytes() ([]byte, error) {
255
buffer := bytes.NewBuffer(nil)
256
tmp := make([]byte, binary.MaxVarintLen64)
257
n = binary.PutUvarint(tmp, uint64(len(me)))
258
if _, err = buffer.Write(tmp[:n]); err != nil {
261
marshalStr := func(src string) error {
262
n = binary.PutUvarint(tmp, uint64(len(src)))
263
if _, err = buffer.Write(tmp[:n]); err != nil {
266
if _, err = buffer.WriteString(src); err != nil {
271
for key, val := range me {
272
if err = marshalStr(key); err != nil {
275
if err = marshalStr(val); err != nil {
279
return buffer.Bytes(), nil
282
func MultipartExtendFromBytes(raw []byte) MultipartExtend {
285
me := NewMultipartExtend()
286
unmarshalStr := func(data []byte) (string, int) {
289
lengthU64, n = binary.Uvarint(data)
290
return string(data[n : n+int(lengthU64)]), n + int(lengthU64)
292
el, n = binary.Uvarint(raw)
297
for i := 0; i < int(el); i++ {
299
key, n = unmarshalStr(raw[offset:])
301
val, n = unmarshalStr(raw[offset:])
308
// Multipart defined necessary fields for multipart session management.
309
type Multipart struct {
315
extend MultipartExtend
320
func (m *Multipart) Less(than btree.Item) bool {
321
tm, is := than.(*Multipart)
322
return is && ((m.key < tm.key) || ((m.key == tm.key) && (m.id < tm.id)))
325
func (m *Multipart) Copy() btree.Item {
329
initTime: m.initTime,
330
parts: append(Parts{}, m.parts...),
335
func (m *Multipart) ID() string {
339
func (m *Multipart) UpdateOrStorePart(part *Part) (oldInode uint64, updated, conflict bool) {
343
m.parts = PartsFromBytes(nil)
345
oldInode, updated, conflict = m.parts.UpdateOrStore(part)
350
func (m *Multipart) InsertPart(part *Part, replace bool) (success bool) {
354
m.parts = PartsFromBytes(nil)
356
success = m.parts.Insert(part, replace)
360
func (m *Multipart) Parts() []*Part {
363
return append([]*Part{}, m.parts...)
366
func (m *Multipart) Bytes() ([]byte, error) {
368
buffer := bytes.NewBuffer(nil)
370
tmp := make([]byte, binary.MaxVarintLen64)
372
marshalStr := func(src string) error {
373
n = binary.PutUvarint(tmp, uint64(len(src)))
374
if _, err = buffer.Write(tmp[:n]); err != nil {
377
if _, err = buffer.WriteString(src); err != nil {
383
if err = marshalStr(m.id); err != nil {
387
if err = marshalStr(m.key); err != nil {
391
n = binary.PutVarint(tmp, m.initTime.UnixNano())
392
if _, err = buffer.Write(tmp[:n]); err != nil {
396
var marshaledParts []byte
397
if marshaledParts, err = m.parts.Bytes(); err != nil {
400
n = binary.PutUvarint(tmp, uint64(len(marshaledParts)))
401
if _, err = buffer.Write(tmp[:n]); err != nil {
404
if _, err = buffer.Write(marshaledParts); err != nil {
408
var extendBytes []byte
409
if extendBytes, err = m.extend.Bytes(); err != nil {
412
n = binary.PutUvarint(tmp, uint64(len(extendBytes)))
413
if _, err = buffer.Write(tmp[:n]); err != nil {
416
if _, err = buffer.Write(extendBytes); err != nil {
419
return buffer.Bytes(), nil
422
func MultipartFromBytes(raw []byte) *Multipart {
423
unmarshalStr := func(data []byte) (string, int) {
426
lengthU64, n = binary.Uvarint(data)
427
return string(data[n : n+int(lengthU64)]), n + int(lengthU64)
432
id, n = unmarshalStr(raw)
436
key, n = unmarshalStr(raw[offset:])
439
var initTimeI64 int64
440
initTimeI64, n = binary.Varint(raw[offset:])
443
var partsLengthU64 uint64
444
partsLengthU64, n = binary.Uvarint(raw[offset:])
446
parts := PartsFromBytes(raw[offset : offset+int(partsLengthU64)])
447
offset += int(partsLengthU64)
448
// decode multipart extend
449
var extendLengthU64 uint64
450
extendLengthU64, n = binary.Uvarint(raw[offset:])
452
me := MultipartExtendFromBytes(raw[offset : offset+int(extendLengthU64)])
454
muSession := &Multipart{
457
initTime: time.Unix(0, initTimeI64),