Dragonfly2
584 строки · 14.2 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package storage
18
19import (
20"bytes"
21"context"
22"crypto/md5"
23"encoding/hex"
24"io"
25"math"
26"math/rand"
27"os"
28"path"
29"testing"
30"time"
31
32testifyassert "github.com/stretchr/testify/assert"
33"go.uber.org/atomic"
34
35commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
36
37"d7y.io/dragonfly/v2/client/config"
38"d7y.io/dragonfly/v2/client/daemon/test"
39clientutil "d7y.io/dragonfly/v2/client/util"
40logger "d7y.io/dragonfly/v2/internal/dflog"
41"d7y.io/dragonfly/v2/internal/util"
42"d7y.io/dragonfly/v2/pkg/digest"
43"d7y.io/dragonfly/v2/pkg/net/http"
44_ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
45)
46
47func TestLocalTaskStore_PutAndGetPiece(t *testing.T) {
48assert := testifyassert.New(t)
49testBytes, err := os.ReadFile(test.File)
50assert.Nil(err, "load test file")
51md5Test, _ := calcFileMd5(test.File, nil)
52
53dst := path.Join(test.DataDir, taskData+".copy")
54defer os.Remove(dst)
55
56testCases := []struct {
57name string
58strategy config.StoreStrategy
59create func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error)
60}{
61{
62name: "normal",
63strategy: config.SimpleLocalTaskStoreStrategy,
64create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
65return s.CreateTask(
66&RegisterTaskRequest{
67PeerTaskMetadata: PeerTaskMetadata{
68PeerID: peerID,
69TaskID: taskID,
70},
71DesiredLocation: dst,
72ContentLength: int64(len(testBytes)),
73})
74},
75},
76{
77name: "normal",
78strategy: config.AdvanceLocalTaskStoreStrategy,
79create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
80return s.CreateTask(
81&RegisterTaskRequest{
82PeerTaskMetadata: PeerTaskMetadata{
83PeerID: peerID,
84TaskID: taskID,
85},
86DesiredLocation: dst,
87ContentLength: int64(len(testBytes)),
88})
89},
90},
91{
92name: "subtask",
93strategy: config.AdvanceLocalTaskStoreStrategy,
94create: func(s *storageManager, taskID, peerID string) (TaskStorageDriver, error) {
95var (
96parentPeerID = peerID + "-parent"
97parentTaskID = taskID + "-parent"
98)
99
100_, err := s.CreateTask(
101&RegisterTaskRequest{
102PeerTaskMetadata: PeerTaskMetadata{
103PeerID: parentPeerID,
104TaskID: parentTaskID,
105},
106DesiredLocation: dst,
107ContentLength: int64(len(testBytes)),
108})
109assert.Nil(err)
110
111return s.RegisterSubTask(
112context.Background(),
113&RegisterSubTaskRequest{
114Parent: PeerTaskMetadata{
115PeerID: parentPeerID,
116TaskID: parentTaskID,
117},
118SubTask: PeerTaskMetadata{
119PeerID: peerID,
120TaskID: taskID,
121},
122Range: &http.Range{
123Start: 100,
124Length: int64(len(testBytes)),
125},
126})
127},
128},
129}
130
131for _, tc := range testCases {
132t.Run(tc.name+"-"+string(tc.strategy), func(t *testing.T) {
133var (
134taskID = "task-d4bb1c273a9889fea14abd4651994fe8"
135peerID = "peer-d4bb1c273a9889fea14abd4651994fe8"
136pieceSize = 512
137)
138sm, err := NewStorageManager(config.SimpleLocalTaskStoreStrategy,
139&config.StorageOption{
140DataPath: test.DataDir,
141TaskExpireTime: clientutil.Duration{
142Duration: time.Minute,
143},
144}, func(request CommonTaskRequest) {
145}, defaultDirectoryMode)
146assert.Nil(err)
147
148_, err = tc.create(sm.(*storageManager), taskID, peerID)
149assert.Nil(err, "create task storage")
150
151ts, ok := sm.(*storageManager).LoadTask(PeerTaskMetadata{
152PeerID: peerID,
153TaskID: taskID,
154})
155assert.True(ok, "load created task")
156
157var pieces []struct {
158index int
159start int
160end int // not contain in data
161}
162var piecesMd5 []string
163for i := 0; i*pieceSize < len(testBytes); i++ {
164start := i * pieceSize
165end := start + pieceSize
166if end > len(testBytes) {
167end = len(testBytes)
168}
169pieces = append(pieces, struct {
170index int
171start int
172end int
173}{
174index: i,
175start: start,
176end: end,
177})
178piecesMd5 = append(piecesMd5, calcPieceMd5(testBytes[start:end]))
179}
180rand.Seed(time.Now().UnixNano())
181rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
182
183// random put all pieces
184for _, p := range pieces {
185_, err = ts.WritePiece(context.Background(), &WritePieceRequest{
186PeerTaskMetadata: PeerTaskMetadata{
187TaskID: taskID,
188},
189PieceMetadata: PieceMetadata{
190Num: int32(p.index),
191Md5: piecesMd5[p.index],
192Offset: uint64(p.start),
193Range: http.Range{
194Start: int64(p.start),
195Length: int64(p.end - p.start),
196},
197Style: commonv1.PieceStyle_PLAIN,
198},
199Reader: bytes.NewBuffer(testBytes[p.start:p.end]),
200})
201assert.Nil(err, "put piece")
202}
203
204if lts, ok := ts.(*localTaskStore); ok {
205md5TaskData, _ := calcFileMd5(path.Join(lts.dataDir, taskData), nil)
206assert.Equal(md5Test, md5TaskData, "md5 must match")
207} else if lsts, ok := ts.(*localSubTaskStore); ok {
208md5TaskData, _ := calcFileMd5(path.Join(lsts.parent.dataDir, taskData), lsts.Range)
209assert.Equal(md5Test, md5TaskData, "md5 must match")
210}
211
212// shuffle again for get all pieces
213rand.Shuffle(len(pieces), func(i, j int) { pieces[i], pieces[j] = pieces[j], pieces[i] })
214for _, p := range pieces {
215rd, cl, err := ts.ReadPiece(context.Background(), &ReadPieceRequest{
216PeerTaskMetadata: PeerTaskMetadata{
217TaskID: taskID,
218},
219PieceMetadata: PieceMetadata{
220Num: int32(p.index),
221Md5: piecesMd5[p.index],
222Offset: uint64(p.start),
223Range: http.Range{
224Start: int64(p.start),
225Length: int64(p.end - p.start),
226},
227Style: commonv1.PieceStyle_PLAIN,
228},
229})
230assert.Nil(err, "get piece reader should be ok")
231data, err := io.ReadAll(rd)
232cl.Close()
233assert.Nil(err, "read piece should be ok")
234assert.Equal(p.end-p.start, len(data), "piece length should match")
235assert.Equal(testBytes[p.start:p.end], data, "piece data should match")
236}
237
238rd, err := ts.ReadAllPieces(context.Background(), &ReadAllPiecesRequest{
239PeerTaskMetadata: PeerTaskMetadata{
240TaskID: taskID,
241},
242Range: nil,
243})
244assert.Nil(err, "get all pieces reader should be ok")
245data, err := io.ReadAll(rd)
246assert.Nil(err, "read all pieces should be ok")
247rd.Close()
248assert.Equal(testBytes, data, "all pieces data should match")
249
250if lts, ok := ts.(*localTaskStore); ok {
251lts.genMetadata(0, &WritePieceRequest{
252NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
253return int32(len(pieces)), int64(len(testBytes)), true
254},
255})
256assert.Equal(digest.SHA256FromStrings(piecesMd5...), lts.PieceMd5Sign)
257
258// clean up test data
259lts.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
260ok = lts.CanReclaim()
261assert.True(ok, "task should gc")
262err = lts.Reclaim()
263assert.Nil(err, "task gc")
264} else if lsts, ok := ts.(*localSubTaskStore); ok {
265lsts.genMetadata(0, &WritePieceRequest{
266NeedGenMetadata: func(n int64) (total int32, length int64, gen bool) {
267return int32(len(pieces)), int64(len(testBytes)), true
268},
269})
270assert.Equal(digest.SHA256FromStrings(piecesMd5...), lsts.PieceMd5Sign)
271
272// keep original offset
273err = lsts.Store(context.Background(),
274&StoreRequest{
275CommonTaskRequest: CommonTaskRequest{
276Destination: dst,
277},
278MetadataOnly: false,
279StoreDataOnly: false,
280TotalPieces: 0,
281OriginalOffset: true,
282})
283assert.Nil(err)
284md5Store, err := calcFileMd5(dst, lsts.Range)
285assert.Nil(err)
286assert.Equal(md5Test, md5Store)
287
288// just ranged data
289err = lsts.Store(context.Background(),
290&StoreRequest{
291CommonTaskRequest: CommonTaskRequest{
292Destination: dst,
293},
294MetadataOnly: false,
295StoreDataOnly: false,
296TotalPieces: 0,
297OriginalOffset: false,
298})
299assert.Nil(err)
300md5Store, err = calcFileMd5(dst, nil)
301assert.Nil(err)
302assert.Equal(md5Test, md5Store)
303
304// clean up test data
305lsts.parent.lastAccess.Store(time.Now().Add(-1 * time.Hour).UnixNano())
306lsts.parent.Done = true
307
308ok = lsts.CanReclaim()
309assert.True(ok, "sub task should gc")
310err = lsts.Reclaim()
311assert.Nil(err, "sub task gc")
312
313ok = lsts.parent.CanReclaim()
314assert.True(ok, "parent task should gc")
315err = lsts.parent.Reclaim()
316assert.Nil(err, "parent task gc")
317}
318})
319}
320}
321
322func TestLocalTaskStore_StoreTaskData_Simple(t *testing.T) {
323assert := testifyassert.New(t)
324src := path.Join(test.DataDir, taskData)
325dst := path.Join(test.DataDir, taskData+".copy")
326meta := path.Join(test.DataDir, taskData+".meta")
327// prepare test data
328testData := []byte("test data")
329err := os.WriteFile(src, testData, defaultFileMode)
330assert.Nil(err, "prepare test data")
331defer os.Remove(src)
332defer os.Remove(dst)
333defer os.Remove(meta)
334
335data, err := os.OpenFile(src, os.O_RDWR, defaultFileMode)
336assert.Nil(err, "open test data")
337defer data.Close()
338
339matadata, err := os.OpenFile(meta, os.O_RDWR|os.O_CREATE, defaultFileMode)
340assert.Nil(err, "open test meta data")
341matadata.Close()
342ts := localTaskStore{
343SugaredLoggerOnWith: logger.With("test", "localTaskStore"),
344persistentMetadata: persistentMetadata{
345TaskID: "test",
346DataFilePath: src,
347},
348dataDir: test.DataDir,
349metadataFilePath: meta,
350}
351ts.lastAccess.Store(time.Now().UnixNano())
352err = ts.Store(context.Background(), &StoreRequest{
353CommonTaskRequest: CommonTaskRequest{
354TaskID: ts.TaskID,
355Destination: dst,
356},
357})
358assert.Nil(err, "store test data")
359bs, err := os.ReadFile(dst)
360assert.Nil(err, "read output test data")
361assert.Equal(testData, bs, "data must match")
362}
363
364func calcFileMd5(filePath string, rg *http.Range) (string, error) {
365var md5String string
366file, err := os.Open(filePath)
367if err != nil {
368return md5String, err
369}
370defer file.Close()
371
372var rd io.Reader = file
373if rg != nil {
374rd = io.LimitReader(file, rg.Length)
375_, err = file.Seek(rg.Start, io.SeekStart)
376if err != nil {
377return "", err
378}
379}
380
381hash := md5.New()
382if _, err := io.Copy(hash, rd); err != nil {
383return md5String, err
384}
385hashInBytes := hash.Sum(nil)[:16]
386md5String = hex.EncodeToString(hashInBytes)
387return md5String, nil
388}
389
390func calcPieceMd5(data []byte) string {
391hash := md5.New()
392hash.Write(data)
393return hex.EncodeToString(hash.Sum(nil)[:16])
394}
395
396func Test_computePiecePosition(t *testing.T) {
397var testCases = []struct {
398name string
399total int64
400rg *http.Range
401start int32
402end int32
403piece uint32
404}{
405{
406name: "0",
407total: 500,
408rg: &http.Range{
409Start: 0,
410Length: 10,
411},
412start: 0,
413end: 0,
414piece: 100,
415},
416{
417name: "1",
418total: 500,
419rg: &http.Range{
420Start: 30,
421Length: 60,
422},
423start: 0,
424end: 0,
425piece: 100,
426},
427{
428name: "2",
429total: 500,
430rg: &http.Range{
431Start: 30,
432Length: 130,
433},
434start: 0,
435end: 1,
436piece: 100,
437},
438{
439name: "3",
440total: 500,
441rg: &http.Range{
442Start: 350,
443Length: 100,
444},
445start: 3,
446end: 4,
447piece: 100,
448},
449{
450name: "4",
451total: 500,
452rg: &http.Range{
453Start: 400,
454Length: 100,
455},
456start: 4,
457end: 4,
458piece: 100,
459},
460{
461name: "5",
462total: 500,
463rg: &http.Range{
464Start: 0,
465Length: 500,
466},
467start: 0,
468end: 4,
469piece: 100,
470},
471}
472
473assert := testifyassert.New(t)
474for _, tc := range testCases {
475t.Run(tc.name, func(t *testing.T) {
476start, end := computePiecePosition(tc.total, tc.rg, func(length int64) uint32 {
477return tc.piece
478})
479assert.Equal(tc.start, start)
480assert.Equal(tc.end, end)
481})
482}
483}
484
485func TestLocalTaskStore_partialCompleted(t *testing.T) {
486var testCases = []struct {
487name string
488ContentLength int64
489ReadyPieceCount int32
490Range http.Range
491Found bool
492}{
493{
494name: "range bytes=x-y partial completed",
495ContentLength: 1024,
496ReadyPieceCount: 1,
497Range: http.Range{
498Start: 1,
499Length: 1023,
500},
501Found: true,
502},
503{
504name: "range bytes=x-y no partial completed",
505ContentLength: util.DefaultPieceSize * 10,
506ReadyPieceCount: 1,
507Range: http.Range{
508Start: 1,
509Length: util.DefaultPieceSize * 2,
510},
511Found: false,
512},
513{
514name: "range bytes=x- no partial completed",
515ContentLength: util.DefaultPieceSizeLimit * 1,
516ReadyPieceCount: 1,
517Range: http.Range{
518Start: 1,
519Length: math.MaxInt - 1,
520},
521Found: false,
522},
523}
524
525for _, tc := range testCases {
526t.Run(tc.name, func(t *testing.T) {
527assert := testifyassert.New(t)
528lts := &localTaskStore{
529persistentMetadata: persistentMetadata{
530ContentLength: tc.ContentLength,
531Pieces: map[int32]PieceMetadata{},
532},
533}
534for i := int32(0); i < tc.ReadyPieceCount; i++ {
535lts.Pieces[i] = PieceMetadata{}
536}
537ok := lts.partialCompleted(&tc.Range)
538assert.Equal(tc.Found, ok)
539})
540}
541}
542
543func TestLocalTaskStore_CanReclaim(t *testing.T) {
544testCases := []struct {
545name string
546lts *localTaskStore
547expect bool
548}{
549{
550name: "normal task",
551lts: &localTaskStore{},
552expect: false,
553},
554{
555name: "invalid task",
556lts: &localTaskStore{
557invalid: *atomic.NewBool(true),
558},
559expect: true,
560},
561{
562name: "never expire task",
563lts: &localTaskStore{
564expireTime: 0,
565},
566expect: false,
567},
568{
569name: "expired task",
570lts: &localTaskStore{
571expireTime: time.Second,
572lastAccess: *atomic.NewInt64(1),
573},
574expect: true,
575},
576}
577
578for _, tc := range testCases {
579t.Run(tc.name, func(t *testing.T) {
580assert := testifyassert.New(t)
581assert.Equal(tc.lts.CanReclaim(), tc.expect)
582})
583}
584}
585