cubefs

Форк
0
/
multipart.go 
462 строки · 10.5 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"sort"
21
	"sync"
22
	"time"
23

24
	"github.com/cubefs/cubefs/util/btree"
25
	"github.com/cubefs/cubefs/util/log"
26
)
27

28
// Part defined necessary fields for multipart part management.
29
type Part struct {
30
	ID         uint16
31
	UploadTime time.Time
32
	MD5        string
33
	Size       uint64
34
	Inode      uint64
35
}
36

37
func (m *Part) Equal(o *Part) bool {
38
	return m.ID == o.ID &&
39
		m.Inode == o.Inode &&
40
		m.Size == o.Size &&
41
		m.MD5 == o.MD5
42
}
43

44
func (m Part) Bytes() ([]byte, error) {
45
	var err error
46
	buffer := bytes.NewBuffer(nil)
47
	tmp := make([]byte, binary.MaxVarintLen64)
48
	var n int
49
	// ID
50
	n = binary.PutUvarint(tmp, uint64(m.ID))
51
	if _, err = buffer.Write(tmp[:n]); err != nil {
52
		return nil, err
53
	}
54
	// upload time
55
	n = binary.PutVarint(tmp, m.UploadTime.UnixNano())
56
	if _, err = buffer.Write(tmp[:n]); err != nil {
57
		return nil, err
58
	}
59
	// MD5
60
	n = binary.PutUvarint(tmp, uint64(len(m.MD5)))
61
	if _, err = buffer.Write(tmp[:n]); err != nil {
62
		return nil, err
63
	}
64
	if _, err = buffer.WriteString(m.MD5); err != nil {
65
		return nil, err
66
	}
67
	// size
68
	n = binary.PutUvarint(tmp, m.Size)
69
	if _, err = buffer.Write(tmp[:n]); err != nil {
70
		return nil, err
71
	}
72
	// inode
73
	n = binary.PutUvarint(tmp, m.Inode)
74
	if _, err = buffer.Write(tmp[:n]); err != nil {
75
		return nil, err
76
	}
77
	return buffer.Bytes(), nil
78
}
79

80
func PartFromBytes(raw []byte) *Part {
81
	var offset, n int
82
	// decode ID
83
	var u64ID uint64
84
	u64ID, n = binary.Uvarint(raw)
85
	offset += n
86
	// decode upload time
87
	var uploadTimeI64 int64
88
	uploadTimeI64, n = binary.Varint(raw[offset:])
89
	offset += n
90
	// decode MD5
91
	var md5Len uint64
92
	md5Len, n = binary.Uvarint(raw[offset:])
93
	offset += n
94
	md5Content := string(raw[offset : offset+int(md5Len)])
95
	offset += int(md5Len)
96
	// decode size
97
	var sizeU64 uint64
98
	sizeU64, n = binary.Uvarint(raw[offset:])
99
	offset += n
100
	// decode inode
101
	var inode uint64
102
	inode, n = binary.Uvarint(raw[offset:])
103

104
	muPart := &Part{
105
		ID:         uint16(u64ID),
106
		UploadTime: time.Unix(0, uploadTimeI64),
107
		MD5:        md5Content,
108
		Size:       sizeU64,
109
		Inode:      inode,
110
	}
111
	return muPart
112
}
113

114
type Parts []*Part
115

116
func (m Parts) Len() int {
117
	return len(m)
118
}
119

120
func (m Parts) sort() {
121
	sort.SliceStable(m, func(i, j int) bool {
122
		return m[i].ID < m[j].ID
123
	})
124
}
125

126
func (m *Parts) Hash(part *Part) (has bool) {
127
	i := sort.Search(len(*m), func(i int) bool {
128
		return (*m)[i].ID >= part.ID
129
	})
130
	has = i < len(*m) && (*m)[i].ID == part.ID
131
	return
132
}
133

134
func (m *Parts) UpdateOrStore(part *Part) (oldInode uint64, update, conflict bool) {
135
	i := sort.Search(len(*m), func(i int) bool {
136
		return (*m)[i].ID >= part.ID
137
	})
138
	if i >= 0 && i < len(*m) && (*m)[i].ID == part.ID {
139
		oldPart := (*m)[i]
140
		oldInode = oldPart.Inode
141
		if part.Inode == oldInode {
142
			log.LogWarnf("Request already success,the same partinode[%d] must not be overwritten.", oldInode)
143
			return
144
		}
145
		if part.UploadTime.Before(oldPart.UploadTime) {
146
			log.LogWarnf("Request part putTime[%v] is less than old part putTime[%v], partNumber[%v]",
147
				part.UploadTime.UnixNano(), oldPart.UploadTime.UnixNano(), part.ID)
148
			conflict = true
149
			return
150
		}
151
		update = true
152
		(*m)[i] = part
153
		return
154
	}
155
	*m = append(*m, part)
156
	update = false
157
	m.sort()
158
	return
159
}
160

161
// Deprecated
162
func (m *Parts) Insert(part *Part, replace bool) (success bool) {
163
	i := sort.Search(len(*m), func(i int) bool {
164
		return (*m)[i].ID >= part.ID
165
	})
166
	if i < len(*m) && (*m)[i].ID == part.ID {
167
		if replace {
168
			(*m)[i] = part
169
			return true
170
		}
171
		return false
172
	}
173
	*m = append(*m, part)
174
	m.sort()
175
	return true
176
}
177

178
func (m *Parts) Remove(id uint16) {
179
	i := sort.Search(len(*m), func(i int) bool {
180
		return (*m)[i].ID >= id
181
	})
182
	if i < len(*m) && (*m)[i].ID == id {
183
		if len(*m) > i+1 {
184
			*m = append((*m)[:i], (*m)[i+1:]...)
185
		} else {
186
			*m = (*m)[:i]
187
		}
188
	}
189
}
190

191
func (m Parts) Search(id uint16) (part *Part, found bool) {
192
	i := sort.Search(len(m), func(i int) bool {
193
		return m[i].ID >= id
194
	})
195
	if i < len(m) && m[i].ID == id {
196
		return m[i], true
197
	}
198
	return nil, false
199
}
200

201
func (m Parts) Bytes() ([]byte, error) {
202
	var err error
203
	var n int
204
	buffer := bytes.NewBuffer(nil)
205
	tmp := make([]byte, binary.MaxVarintLen64)
206
	n = binary.PutUvarint(tmp, uint64(len(m)))
207
	if _, err = buffer.Write(tmp[:n]); err != nil {
208
		return nil, err
209
	}
210
	var marshaled []byte
211
	for _, p := range m {
212
		marshaled, err = p.Bytes()
213
		if err != nil {
214
			return nil, err
215
		}
216
		// write part length
217
		n = binary.PutUvarint(tmp, uint64(len(marshaled)))
218
		if _, err = buffer.Write(tmp[:n]); err != nil {
219
			return nil, err
220
		}
221
		// write part bytes
222
		if _, err = buffer.Write(marshaled); err != nil {
223
			return nil, err
224
		}
225
	}
226
	return buffer.Bytes(), nil
227
}
228

229
func PartsFromBytes(raw []byte) Parts {
230
	var offset, n int
231
	var numPartsU64 uint64
232
	numPartsU64, n = binary.Uvarint(raw)
233
	offset += n
234
	muParts := make([]*Part, int(numPartsU64))
235
	for i := 0; i < int(numPartsU64); i++ {
236
		var partLengthU64 uint64
237
		partLengthU64, n = binary.Uvarint(raw[offset:])
238
		offset += n
239
		part := PartFromBytes(raw[offset : offset+int(partLengthU64)])
240
		muParts[i] = part
241
		offset += int(partLengthU64)
242
	}
243
	return muParts
244
}
245

246
type MultipartExtend map[string]string
247

248
func NewMultipartExtend() MultipartExtend {
249
	return make(map[string]string)
250
}
251

252
func (me MultipartExtend) Bytes() ([]byte, error) {
253
	var n int
254
	var err error
255
	buffer := bytes.NewBuffer(nil)
256
	tmp := make([]byte, binary.MaxVarintLen64)
257
	n = binary.PutUvarint(tmp, uint64(len(me)))
258
	if _, err = buffer.Write(tmp[:n]); err != nil {
259
		return nil, err
260
	}
261
	marshalStr := func(src string) error {
262
		n = binary.PutUvarint(tmp, uint64(len(src)))
263
		if _, err = buffer.Write(tmp[:n]); err != nil {
264
			return err
265
		}
266
		if _, err = buffer.WriteString(src); err != nil {
267
			return err
268
		}
269
		return nil
270
	}
271
	for key, val := range me {
272
		if err = marshalStr(key); err != nil {
273
			return nil, err
274
		}
275
		if err = marshalStr(val); err != nil {
276
			return nil, err
277
		}
278
	}
279
	return buffer.Bytes(), nil
280
}
281

282
func MultipartExtendFromBytes(raw []byte) MultipartExtend {
283
	var offset, n int
284
	var el uint64
285
	me := NewMultipartExtend()
286
	unmarshalStr := func(data []byte) (string, int) {
287
		var n int
288
		var lengthU64 uint64
289
		lengthU64, n = binary.Uvarint(data)
290
		return string(data[n : n+int(lengthU64)]), n + int(lengthU64)
291
	}
292
	el, n = binary.Uvarint(raw)
293
	if el <= 0 {
294
		return nil
295
	}
296
	offset += n
297
	for i := 0; i < int(el); i++ {
298
		var key, val string
299
		key, n = unmarshalStr(raw[offset:])
300
		offset += n
301
		val, n = unmarshalStr(raw[offset:])
302
		offset += n
303
		me[key] = val
304
	}
305
	return me
306
}
307

308
// Multipart defined necessary fields for multipart session management.
309
type Multipart struct {
310
	// session fields
311
	id       string
312
	key      string
313
	initTime time.Time
314
	parts    Parts
315
	extend   MultipartExtend
316

317
	mu sync.RWMutex
318
}
319

320
func (m *Multipart) Less(than btree.Item) bool {
321
	tm, is := than.(*Multipart)
322
	return is && ((m.key < tm.key) || ((m.key == tm.key) && (m.id < tm.id)))
323
}
324

325
func (m *Multipart) Copy() btree.Item {
326
	return &Multipart{
327
		id:       m.id,
328
		key:      m.key,
329
		initTime: m.initTime,
330
		parts:    append(Parts{}, m.parts...),
331
		extend:   m.extend,
332
	}
333
}
334

335
func (m *Multipart) ID() string {
336
	return m.id
337
}
338

339
func (m *Multipart) UpdateOrStorePart(part *Part) (oldInode uint64, updated, conflict bool) {
340
	m.mu.Lock()
341
	defer m.mu.Unlock()
342
	if m.parts == nil {
343
		m.parts = PartsFromBytes(nil)
344
	}
345
	oldInode, updated, conflict = m.parts.UpdateOrStore(part)
346
	return
347
}
348

349
// Deprecated
350
func (m *Multipart) InsertPart(part *Part, replace bool) (success bool) {
351
	m.mu.Lock()
352
	defer m.mu.Unlock()
353
	if m.parts == nil {
354
		m.parts = PartsFromBytes(nil)
355
	}
356
	success = m.parts.Insert(part, replace)
357
	return
358
}
359

360
func (m *Multipart) Parts() []*Part {
361
	m.mu.RLock()
362
	defer m.mu.RUnlock()
363
	return append([]*Part{}, m.parts...)
364
}
365

366
func (m *Multipart) Bytes() ([]byte, error) {
367
	var n int
368
	buffer := bytes.NewBuffer(nil)
369
	var err error
370
	tmp := make([]byte, binary.MaxVarintLen64)
371
	// marshal id
372
	marshalStr := func(src string) error {
373
		n = binary.PutUvarint(tmp, uint64(len(src)))
374
		if _, err = buffer.Write(tmp[:n]); err != nil {
375
			return err
376
		}
377
		if _, err = buffer.WriteString(src); err != nil {
378
			return err
379
		}
380
		return nil
381
	}
382
	// marshal id
383
	if err = marshalStr(m.id); err != nil {
384
		return nil, err
385
	}
386
	// marshal key
387
	if err = marshalStr(m.key); err != nil {
388
		return nil, err
389
	}
390
	// marshal init time
391
	n = binary.PutVarint(tmp, m.initTime.UnixNano())
392
	if _, err = buffer.Write(tmp[:n]); err != nil {
393
		return nil, err
394
	}
395
	// marshal parts
396
	var marshaledParts []byte
397
	if marshaledParts, err = m.parts.Bytes(); err != nil {
398
		return nil, err
399
	}
400
	n = binary.PutUvarint(tmp, uint64(len(marshaledParts)))
401
	if _, err = buffer.Write(tmp[:n]); err != nil {
402
		return nil, err
403
	}
404
	if _, err = buffer.Write(marshaledParts); err != nil {
405
		return nil, err
406
	}
407
	// marshall extend
408
	var extendBytes []byte
409
	if extendBytes, err = m.extend.Bytes(); err != nil {
410
		return nil, err
411
	}
412
	n = binary.PutUvarint(tmp, uint64(len(extendBytes)))
413
	if _, err = buffer.Write(tmp[:n]); err != nil {
414
		return nil, err
415
	}
416
	if _, err = buffer.Write(extendBytes); err != nil {
417
		return nil, err
418
	}
419
	return buffer.Bytes(), nil
420
}
421

422
func MultipartFromBytes(raw []byte) *Multipart {
423
	unmarshalStr := func(data []byte) (string, int) {
424
		var n int
425
		var lengthU64 uint64
426
		lengthU64, n = binary.Uvarint(data)
427
		return string(data[n : n+int(lengthU64)]), n + int(lengthU64)
428
	}
429
	var offset, n int
430
	// decode id
431
	var id string
432
	id, n = unmarshalStr(raw)
433
	offset += n
434
	// decode key
435
	var key string
436
	key, n = unmarshalStr(raw[offset:])
437
	offset += n
438
	// decode init time
439
	var initTimeI64 int64
440
	initTimeI64, n = binary.Varint(raw[offset:])
441
	offset += n
442
	// decode parts
443
	var partsLengthU64 uint64
444
	partsLengthU64, n = binary.Uvarint(raw[offset:])
445
	offset += n
446
	parts := PartsFromBytes(raw[offset : offset+int(partsLengthU64)])
447
	offset += int(partsLengthU64)
448
	// decode multipart extend
449
	var extendLengthU64 uint64
450
	extendLengthU64, n = binary.Uvarint(raw[offset:])
451
	offset += n
452
	me := MultipartExtendFromBytes(raw[offset : offset+int(extendLengthU64)])
453

454
	muSession := &Multipart{
455
		id:       id,
456
		key:      key,
457
		initTime: time.Unix(0, initTimeI64),
458
		parts:    parts,
459
		extend:   me,
460
	}
461
	return muSession
462
}
463

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.