Dragonfly2

Форк
0
/
local_storage_test.go 
584 строки · 14.2 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package storage
18

19
import (
20
	"bytes"
21
	"context"
22
	"crypto/md5"
23
	"encoding/hex"
24
	"io"
25
	"math"
26
	"math/rand"
27
	"os"
28
	"path"
29
	"testing"
30
	"time"
31

32
	testifyassert "github.com/stretchr/testify/assert"
33
	"go.uber.org/atomic"
34

35
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
36

37
	"d7y.io/dragonfly/v2/client/config"
38
	"d7y.io/dragonfly/v2/client/daemon/test"
39
	clientutil "d7y.io/dragonfly/v2/client/util"
40
	logger "d7y.io/dragonfly/v2/internal/dflog"
41
	"d7y.io/dragonfly/v2/internal/util"
42
	"d7y.io/dragonfly/v2/pkg/digest"
43
	"d7y.io/dragonfly/v2/pkg/net/http"
44
	_ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
45
)
46

47
func TestLocalTaskStore_PutAndGetPiece(t *testing.T) {
48
	assert := testifyassert.New(t)
49
	testBytes, err := os.ReadFile(test.File)
50
	assert.Nil(err, "load test file")
51
	md5Test, _ := calcFileMd5(test.File, nil)
52

53
	dst := path.Join(test.DataDir, taskData+".copy")
54
	defer os.Remove(dst)
55

56
	testCases := []struct {
57
		name     string
58
		strategy config.StoreStrategy
59
		create   func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error)
60
	}{
61
		{
62
			name:     "normal",
63
			strategy: config.SimpleLocalTaskStoreStrategy,
64
			create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
65
				return s.CreateTask(
66
					&RegisterTaskRequest{
67
						PeerTaskMetadata: PeerTaskMetadata{
68
							PeerID: peerID,
69
							TaskID: taskID,
70
						},
71
						DesiredLocation: dst,
72
						ContentLength:   int64(len(testBytes)),
73
					})
74
			},
75
		},
76
		{
77
			name:     "normal",
78
			strategy: config.AdvanceLocalTaskStoreStrategy,
79
			create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
80
				return s.CreateTask(
81
					&RegisterTaskRequest{
82
						PeerTaskMetadata: PeerTaskMetadata{
83
							PeerID: peerID,
84
							TaskID: taskID,
85
						},
86
						DesiredLocation: dst,
87
						ContentLength:   int64(len(testBytes)),
88
					})
89
			},
90
		},
91
		{
92
			name:     "subtask",
93
			strategy: config.AdvanceLocalTaskStoreStrategy,
94
			create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
95
				var (
96
					parentPeerID = peerID + "-parent"
97
					parentTaskID = taskID + "-parent"
98
				)
99

100
				_, err := s.CreateTask(
101
					&RegisterTaskRequest{
102
						PeerTaskMetadata: PeerTaskMetadata{
103
							PeerID: parentPeerID,
104
							TaskID: parentTaskID,
105
						},
106
						DesiredLocation: dst,
107
						ContentLength:   int64(len(testBytes)),
108
					})
109
				assert.Nil(err)
110

111
				return s.RegisterSubTask(
112
					context.Background(),
113
					&RegisterSubTaskRequest{
114
						Parent: PeerTaskMetadata{
115
							PeerID: parentPeerID,
116
							TaskID: parentTaskID,
117
						},
118
						SubTask: PeerTaskMetadata{
119
							PeerID: peerID,
120
							TaskID: taskID,
121
						},
122
						Range: &http.Range{
123
							Start:  100,
124
							Length: int64(len(testBytes)),
125
						},
126
					})
127
			},
128
		},
129
	}
130

131
	for _, tc := range testCases {
132
		t.Run(tc.name+"-"+string(tc.strategy), func(t *testing.T) {
133
			var (
134
				taskID    = "task-d4bb1c273a9889fea14abd4651994fe8"
135
				peerID    = "peer-d4bb1c273a9889fea14abd4651994fe8"
136
				pieceSize = 512
137
			)
138
			sm, err := NewStorageManager(config.SimpleLocalTaskStoreStrategy,
139
				&config.StorageOption{
140
					DataPath: test.DataDir,
141
					TaskExpireTime: clientutil.Duration{
142
						Duration: time.Minute,
143
					},
144
				}, func(request CommonTaskRequest) {
145
				}, defaultDirectoryMode)
146
			assert.Nil(err)
147

148
			_, err = tc.create(sm.(*storageManager), taskID, peerID)
149
			assert.Nil(err, "create task storage")
150

151
			ts, ok := sm.(*storageManager).LoadTask(PeerTaskMetadata{
152
				PeerID: peerID,
153
				TaskID: taskID,
154
			})
155
			assert.True(ok, "load created task")
156

157
			var pieces []struct {
158
				index int
159
				start int
160
				end   int // not contain in data
161
			}
162
			var piecesMd5 []string
163
			for i := 0; i*pieceSize < len(testBytes); i++ {
164
				start := i * pieceSize
165
				end := start + pieceSize
166
				if end > len(testBytes) {
167
					end = len(testBytes)
168
				}
169
				pieces = append(pieces, struct {
170
					index int
171
					start int
172
					end   int
173
				}{
174
					index: i,
175
					start: start,
176
					end:   end,
177
				})
178
				piecesMd5 = append(piecesMd5, calcPieceMd5(testBytes[start:end]))
179
			}
180
			rand.Seed(time.Now().UnixNano())
181
			rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
182

183
			// random put all pieces
184
			for _, p := range pieces {
185
				_, err = ts.WritePiece(context.Background(), &WritePieceRequest{
186
					PeerTaskMetadata: PeerTaskMetadata{
187
						TaskID: taskID,
188
					},
189
					PieceMetadata: PieceMetadata{
190
						Num:    int32(p.index),
191
						Md5:    piecesMd5[p.index],
192
						Offset: uint64(p.start),
193
						Range: http.Range{
194
							Start:  int64(p.start),
195
							Length: int64(p.end - p.start),
196
						},
197
						Style: commonv1.PieceStyle_PLAIN,
198
					},
199
					Reader: bytes.NewBuffer(testBytes[p.start:p.end]),
200
				})
201
				assert.Nil(err, "put piece")
202
			}
203

204
			if lts, ok := ts.(*localTaskStore); ok {
205
				md5TaskData, _ := calcFileMd5(path.Join(lts.dataDir, taskData), nil)
206
				assert.Equal(md5Test, md5TaskData, "md5 must match")
207
			} else if lsts, ok := ts.(*localSubTaskStore); ok {
208
				md5TaskData, _ := calcFileMd5(path.Join(lsts.parent.dataDir, taskData), lsts.Range)
209
				assert.Equal(md5Test, md5TaskData, "md5 must match")
210
			}
211

212
			// shuffle again for get all pieces
213
			rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
214
			for _, p := range pieces {
215
				rd, cl, err := ts.ReadPiece(context.Background(), &ReadPieceRequest{
216
					PeerTaskMetadata: PeerTaskMetadata{
217
						TaskID: taskID,
218
					},
219
					PieceMetadata: PieceMetadata{
220
						Num:    int32(p.index),
221
						Md5:    piecesMd5[p.index],
222
						Offset: uint64(p.start),
223
						Range: http.Range{
224
							Start:  int64(p.start),
225
							Length: int64(p.end - p.start),
226
						},
227
						Style: commonv1.PieceStyle_PLAIN,
228
					},
229
				})
230
				assert.Nil(err, "get piece reader should be ok")
231
				data, err := io.ReadAll(rd)
232
				cl.Close()
233
				assert.Nil(err, "read piece should be ok")
234
				assert.Equal(p.end-p.start, len(data), "piece length should match")
235
				assert.Equal(testBytes[p.start:p.end], data, "piece data should match")
236
			}
237

238
			rd, err := ts.ReadAllPieces(context.Background(), &ReadAllPiecesRequest{
239
				PeerTaskMetadata: PeerTaskMetadata{
240
					TaskID: taskID,
241
				},
242
				Range: nil,
243
			})
244
			assert.Nil(err, "get all pieces reader should be ok")
245
			data, err := io.ReadAll(rd)
246
			assert.Nil(err, "read all pieces should be ok")
247
			rd.Close()
248
			assert.Equal(testBytes, data, "all pieces data should match")
249

250
			if lts, ok := ts.(*localTaskStore); ok {
251
				lts.genMetadata(0, &WritePieceRequest{
252
					NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
253
						return int32(len(pieces)), int64(len(testBytes)), true
254
					},
255
				})
256
				assert.Equal(digest.SHA256FromStrings(piecesMd5...), lts.PieceMd5Sign)
257

258
				// clean up test data
259
				lts.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
260
				ok = lts.CanReclaim()
261
				assert.True(ok, "task should gc")
262
				err = lts.Reclaim()
263
				assert.Nil(err, "task gc")
264
			} else if lsts, ok := ts.(*localSubTaskStore); ok {
265
				lsts.genMetadata(0, &WritePieceRequest{
266
					NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
267
						return int32(len(pieces)), int64(len(testBytes)), true
268
					},
269
				})
270
				assert.Equal(digest.SHA256FromStrings(piecesMd5...), lsts.PieceMd5Sign)
271

272
				// keep original offset
273
				err = lsts.Store(context.Background(),
274
					&StoreRequest{
275
						CommonTaskRequest: CommonTaskRequest{
276
							Destination: dst,
277
						},
278
						MetadataOnly:   false,
279
						StoreDataOnly:  false,
280
						TotalPieces:    0,
281
						OriginalOffset: true,
282
					})
283
				assert.Nil(err)
284
				md5Store, err := calcFileMd5(dst, lsts.Range)
285
				assert.Nil(err)
286
				assert.Equal(md5Test, md5Store)
287

288
				// just ranged data
289
				err = lsts.Store(context.Background(),
290
					&StoreRequest{
291
						CommonTaskRequest: CommonTaskRequest{
292
							Destination: dst,
293
						},
294
						MetadataOnly:   false,
295
						StoreDataOnly:  false,
296
						TotalPieces:    0,
297
						OriginalOffset: false,
298
					})
299
				assert.Nil(err)
300
				md5Store, err = calcFileMd5(dst, nil)
301
				assert.Nil(err)
302
				assert.Equal(md5Test, md5Store)
303

304
				// clean up test data
305
				lsts.parent.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
306
				lsts.parent.Done = true
307

308
				ok = lsts.CanReclaim()
309
				assert.True(ok, "sub task should gc")
310
				err = lsts.Reclaim()
311
				assert.Nil(err, "sub task gc")
312

313
				ok = lsts.parent.CanReclaim()
314
				assert.True(ok, "parent task should gc")
315
				err = lsts.parent.Reclaim()
316
				assert.Nil(err, "parent task gc")
317
			}
318
		})
319
	}
320
}
321

322
func TestLocalTaskStore_StoreTaskData_Simple(t *testing.T) {
323
	assert := testifyassert.New(t)
324
	src := path.Join(test.DataDir, taskData)
325
	dst := path.Join(test.DataDir, taskData+".copy")
326
	meta := path.Join(test.DataDir, taskData+".meta")
327
	// prepare test data
328
	testData := []byte("test data")
329
	err := os.WriteFile(src, testData, defaultFileMode)
330
	assert.Nil(err, "prepare test data")
331
	defer os.Remove(src)
332
	defer os.Remove(dst)
333
	defer os.Remove(meta)
334

335
	data, err := os.OpenFile(src, os.O_RDWR, defaultFileMode)
336
	assert.Nil(err, "open test data")
337
	defer data.Close()
338

339
	matadata, err := os.OpenFile(meta, os.O_RDWR|os.O_CREATE, defaultFileMode)
340
	assert.Nil(err, "open test meta data")
341
	matadata.Close()
342
	ts := localTaskStore{
343
		SugaredLoggerOnWith: logger.With("test", "localTaskStore"),
344
		persistentMetadata: persistentMetadata{
345
			TaskID:       "test",
346
			DataFilePath: src,
347
		},
348
		dataDir:          test.DataDir,
349
		metadataFilePath: meta,
350
	}
351
	ts.lastAccess.Store(time.Now().UnixNano())
352
	err = ts.Store(context.Background(), &StoreRequest{
353
		CommonTaskRequest: CommonTaskRequest{
354
			TaskID:      ts.TaskID,
355
			Destination: dst,
356
		},
357
	})
358
	assert.Nil(err, "store test data")
359
	bs, err := os.ReadFile(dst)
360
	assert.Nil(err, "read output test data")
361
	assert.Equal(testData, bs, "data must match")
362
}
363

364
func calcFileMd5(filePath string, rg *http.Range) (string, error) {
365
	var md5String string
366
	file, err := os.Open(filePath)
367
	if err != nil {
368
		return md5String, err
369
	}
370
	defer file.Close()
371

372
	var rd io.Reader = file
373
	if rg != nil {
374
		rd = io.LimitReader(file, rg.Length)
375
		_, err = file.Seek(rg.Start, io.SeekStart)
376
		if err != nil {
377
			return "", err
378
		}
379
	}
380

381
	hash := md5.New()
382
	if _, err := io.Copy(hash, rd); err != nil {
383
		return md5String, err
384
	}
385
	hashInBytes := hash.Sum(nil)[:16]
386
	md5String = hex.EncodeToString(hashInBytes)
387
	return md5String, nil
388
}
389

390
func calcPieceMd5(data []byte) string {
391
	hash := md5.New()
392
	hash.Write(data)
393
	return hex.EncodeToString(hash.Sum(nil)[:16])
394
}
395

396
func Test_computePiecePosition(t *testing.T) {
397
	var testCases = []struct {
398
		name  string
399
		total int64
400
		rg    *http.Range
401
		start int32
402
		end   int32
403
		piece uint32
404
	}{
405
		{
406
			name:  "0",
407
			total: 500,
408
			rg: &http.Range{
409
				Start:  0,
410
				Length: 10,
411
			},
412
			start: 0,
413
			end:   0,
414
			piece: 100,
415
		},
416
		{
417
			name:  "1",
418
			total: 500,
419
			rg: &http.Range{
420
				Start:  30,
421
				Length: 60,
422
			},
423
			start: 0,
424
			end:   0,
425
			piece: 100,
426
		},
427
		{
428
			name:  "2",
429
			total: 500,
430
			rg: &http.Range{
431
				Start:  30,
432
				Length: 130,
433
			},
434
			start: 0,
435
			end:   1,
436
			piece: 100,
437
		},
438
		{
439
			name:  "3",
440
			total: 500,
441
			rg: &http.Range{
442
				Start:  350,
443
				Length: 100,
444
			},
445
			start: 3,
446
			end:   4,
447
			piece: 100,
448
		},
449
		{
450
			name:  "4",
451
			total: 500,
452
			rg: &http.Range{
453
				Start:  400,
454
				Length: 100,
455
			},
456
			start: 4,
457
			end:   4,
458
			piece: 100,
459
		},
460
		{
461
			name:  "5",
462
			total: 500,
463
			rg: &http.Range{
464
				Start:  0,
465
				Length: 500,
466
			},
467
			start: 0,
468
			end:   4,
469
			piece: 100,
470
		},
471
	}
472

473
	assert := testifyassert.New(t)
474
	for _, tc := range testCases {
475
		t.Run(tc.name, func(t *testing.T) {
476
			start, end := computePiecePosition(tc.total, tc.rg, func(length int64) uint32 {
477
				return tc.piece
478
			})
479
			assert.Equal(tc.start, start)
480
			assert.Equal(tc.end, end)
481
		})
482
	}
483
}
484

485
func TestLocalTaskStore_partialCompleted(t *testing.T) {
486
	var testCases = []struct {
487
		name            string
488
		ContentLength   int64
489
		ReadyPieceCount int32
490
		Range           http.Range
491
		Found           bool
492
	}{
493
		{
494
			name:            "range bytes=x-y partial completed",
495
			ContentLength:   1024,
496
			ReadyPieceCount: 1,
497
			Range: http.Range{
498
				Start:  1,
499
				Length: 1023,
500
			},
501
			Found: true,
502
		},
503
		{
504
			name:            "range bytes=x-y no partial completed",
505
			ContentLength:   util.DefaultPieceSize * 10,
506
			ReadyPieceCount: 1,
507
			Range: http.Range{
508
				Start:  1,
509
				Length: util.DefaultPieceSize * 2,
510
			},
511
			Found: false,
512
		},
513
		{
514
			name:            "range bytes=x- no partial completed",
515
			ContentLength:   util.DefaultPieceSizeLimit * 1,
516
			ReadyPieceCount: 1,
517
			Range: http.Range{
518
				Start:  1,
519
				Length: math.MaxInt - 1,
520
			},
521
			Found: false,
522
		},
523
	}
524

525
	for _, tc := range testCases {
526
		t.Run(tc.name, func(t *testing.T) {
527
			assert := testifyassert.New(t)
528
			lts := &localTaskStore{
529
				persistentMetadata: persistentMetadata{
530
					ContentLength: tc.ContentLength,
531
					Pieces:        map[int32]PieceMetadata{},
532
				},
533
			}
534
			for i := int32(0); i < tc.ReadyPieceCount; i++ {
535
				lts.Pieces[i] = PieceMetadata{}
536
			}
537
			ok := lts.partialCompleted(&tc.Range)
538
			assert.Equal(tc.Found, ok)
539
		})
540
	}
541
}
542

543
func TestLocalTaskStore_CanReclaim(t *testing.T) {
544
	testCases := []struct {
545
		name   string
546
		lts    *localTaskStore
547
		expect bool
548
	}{
549
		{
550
			name:   "normal task",
551
			lts:    &localTaskStore{},
552
			expect: false,
553
		},
554
		{
555
			name: "invalid task",
556
			lts: &localTaskStore{
557
				invalid: *atomic.NewBool(true),
558
			},
559
			expect: true,
560
		},
561
		{
562
			name: "never expire task",
563
			lts: &localTaskStore{
564
				expireTime: 0,
565
			},
566
			expect: false,
567
		},
568
		{
569
			name: "expired task",
570
			lts: &localTaskStore{
571
				expireTime: time.Second,
572
				lastAccess: *atomic.NewInt64(1),
573
			},
574
			expect: true,
575
		},
576
	}
577

578
	for _, tc := range testCases {
579
		t.Run(tc.name, func(t *testing.T) {
580
			assert := testifyassert.New(t)
581
			assert.Equal(tc.lts.CanReclaim(), tc.expect)
582
		})
583
	}
584
}
585

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.