Dragonfly2
282 строки · 7.9 Кб
1/*
2* Copyright 2023 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer18
19import (20"context"21"fmt"22"io"23"os"24"sync"25"testing"26"time"27
28"github.com/go-http-utils/headers"29testifyassert "github.com/stretchr/testify/assert"30"github.com/stretchr/testify/require"31"go.uber.org/mock/gomock"32"google.golang.org/grpc"33"google.golang.org/grpc/credentials/insecure"34
35commonv1 "d7y.io/api/v2/pkg/apis/common/v1"36schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"37schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"38
39"d7y.io/dragonfly/v2/client/config"40"d7y.io/dragonfly/v2/client/daemon/storage"41"d7y.io/dragonfly/v2/client/daemon/test"42"d7y.io/dragonfly/v2/client/util"43"d7y.io/dragonfly/v2/internal/dferrors"44"d7y.io/dragonfly/v2/pkg/net/http"45schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"46clientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"47"d7y.io/dragonfly/v2/pkg/source"48"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"49sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"50)
51
52func setupResumeStreamTaskComponents(ctrl *gomock.Controller, opt componentsOption) (53schedulerclient.V1, storage.Manager) {54// set up a scheduler to say back source only55pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)56pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(57func(pr *schedulerv1.PieceResult) error {58return nil59})60pps.EXPECT().Recv().AnyTimes().DoAndReturn(61func() (*schedulerv1.PeerPacket, error) {62return nil, dferrors.New(commonv1.Code_SchedNeedBackSource, "")63})64pps.EXPECT().CloseSend().AnyTimes()65
66sched := clientmocks.NewMockV1(ctrl)67sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(68func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {69return &schedulerv1.RegisterResult{70TaskId: opt.taskID,71SizeScope: commonv1.SizeScope_NORMAL,72DirectPiece: nil,73}, nil74})75sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(76func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_ReportPieceResultClient, error) {77return pps, nil78})79sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(80func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {81return nil82})83
84// set up storage manager85tempDir, _ := os.MkdirTemp("", "d7y-test-*")86storageManager, _ := storage.NewStorageManager(87config.SimpleLocalTaskStoreStrategy,88&config.StorageOption{89DataPath: tempDir,90TaskExpireTime: util.Duration{91Duration: -1 * time.Second,92},93}, func(request storage.CommonTaskRequest) {},94os.FileMode(0700))95return sched, storageManager96}
97
98type intervalSleepReader struct {99offset int100size int101data []byte102interval time.Duration103}
104
105func (i *intervalSleepReader) Read(p []byte) (n int, err error) {106if i.offset >= len(i.data) {107return 0, io.EOF108}109end := i.offset + i.size110if end > len(i.data) {111end = len(i.data)112}113
114n = copy(p, i.data[i.offset:end])115time.Sleep(i.interval)116
117i.offset += n118if i.offset >= len(i.data) {119return n, io.EOF120}121return n, nil122}
123
124func (i *intervalSleepReader) Close() error {125return nil126}
127
128func TestStreamPeerTask_Resume(t *testing.T) {129assert := testifyassert.New(t)130ctrl := gomock.NewController(t)131
132testBytes, err := os.ReadFile(test.File)133assert.Nil(err, "load test file")134
135var (136pieceParallelCount = int32(4)137pieceSize = 1024138
139pieceDownloadInterval = time.Millisecond * 100140
141mockContentLength = len(testBytes)142//mockPieceCount = int(math.Ceil(float64(mockContentLength) / float64(pieceSize)))143
144peerID = "peer-resume-0"145taskID = "task-resume-0"146
147url = "http://localhost/test/data"148)149
150schedulerClient, storageManager := setupResumeStreamTaskComponents(151ctrl,152componentsOption{153taskID: taskID,154contentLength: int64(mockContentLength),155pieceSize: uint32(pieceSize),156pieceParallelCount: pieceParallelCount,157content: testBytes,158})159defer storageManager.CleanUp()160
161sourceClient := sourcemocks.NewMockResourceClient(ctrl)162source.UnRegister("http")163require.Nil(t, source.Register("http", sourceClient, httpprotocol.Adapter))164defer source.UnRegister("http")165sourceClient.EXPECT().Download(gomock.Any()).DoAndReturn(166func(request *source.Request) (*source.Response, error) {167response := source.NewResponse(168&intervalSleepReader{169size: pieceSize,170data: testBytes,171interval: pieceDownloadInterval,172})173response.ContentLength = int64(len(testBytes))174return response, nil175})176
177pm := &pieceManager{178calculateDigest: true,179pieceDownloader: nil,180computePieceSize: func(contentLength int64) uint32 {181return uint32(pieceSize)182},183}184ptm := &peerTaskManager{185conductorLock: &sync.Mutex{},186runningPeerTasks: sync.Map{},187trafficShaper: NewTrafficShaper("plain", 0, nil),188TaskManagerOption: TaskManagerOption{189SchedulerClient: schedulerClient,190TaskOption: TaskOption{191CalculateDigest: true,192PeerHost: &schedulerv1.PeerHost{193Ip: "127.0.0.1",194},195PieceManager: pm,196StorageManager: storageManager,197SchedulerOption: config.SchedulerOption{198ScheduleTimeout: util.Duration{Duration: 10 * time.Minute},199},200GRPCDialTimeout: time.Second,201GRPCCredentials: insecure.NewCredentials(),202},203},204}205req := &schedulerv1.PeerTaskRequest{206Url: url,207UrlMeta: &commonv1.UrlMeta{208Tag: "d7y-test",209},210PeerId: peerID,211PeerHost: &schedulerv1.PeerHost{},212}213ctx := context.Background()214wg := &sync.WaitGroup{}215
216// set up parent task217wg.Add(1)218
219pt, err := ptm.newStreamTask(ctx, taskID, req, nil)220assert.Nil(err, "new parent stream peer task")221
222rc, _, err := pt.Start(ctx)223assert.Nil(err, "start parent stream peer task")224
225ptc := pt.peerTaskConductor226
227go func() {228outputBytes, err := io.ReadAll(rc)229assert.Nil(err, "load read data")230assert.Equal(testBytes, outputBytes, "output and desired output must match")231wg.Done()232}()233
234ranges := []*http.Range{235{236Start: 0,237Length: int64(mockContentLength),238},239{240Start: 10,241Length: int64(mockContentLength) - 10,242},243{244Start: 100,245Length: int64(mockContentLength) - 100,246},247{248Start: 1000,249Length: int64(mockContentLength) - 1000,250},251{252Start: 1024,253Length: int64(mockContentLength) - 1024,254},255}256
257wg.Add(len(ranges))258for _, rg := range ranges {259go func(rg *http.Range) {260pt := ptm.newResumeStreamTask(ctx, ptc, rg)261assert.NotNil(pt, "new stream peer task")262
263pt.computePieceSize = func(length int64) uint32 {264return uint32(pieceSize)265}266
267rc, attr, err := pt.Start(ctx)268assert.Nil(err, "start stream peer task")269
270assert.Equal(attr[headers.ContentLength], fmt.Sprintf("%d", rg.Length), "content length should match")271assert.Equal(attr[headers.ContentRange], fmt.Sprintf("bytes %d-%d/%d", rg.Start, mockContentLength-1, mockContentLength), "content length should match")272
273outputBytes, err := io.ReadAll(rc)274assert.Nil(err, "load read data")275assert.Equal(len(testBytes[rg.Start:rg.Start+rg.Length]), len(outputBytes), "output and desired output length must match")276assert.Equal(string(testBytes[rg.Start:rg.Start+rg.Length]), string(outputBytes), "output and desired output must match")277wg.Done()278}(rg)279}280
281wg.Wait()282}
283