Dragonfly2
958 строк · 29.8 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"bytes"
21"context"
22"fmt"
23"io"
24"math"
25"net/http"
26"net/http/httptest"
27"os"
28"runtime"
29"strings"
30"sync"
31"testing"
32"time"
33
34"github.com/go-http-utils/headers"
35"github.com/phayes/freeport"
36testifyassert "github.com/stretchr/testify/assert"
37testifyrequire "github.com/stretchr/testify/require"
38"go.uber.org/mock/gomock"
39"go.uber.org/zap/zapcore"
40"golang.org/x/time/rate"
41"google.golang.org/grpc"
42"google.golang.org/grpc/codes"
43"google.golang.org/grpc/credentials/insecure"
44"google.golang.org/grpc/health"
45"google.golang.org/grpc/status"
46
47commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
48dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
49dfdaemonv1mocks "d7y.io/api/v2/pkg/apis/dfdaemon/v1/mocks"
50schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
51schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
52
53"d7y.io/dragonfly/v2/client/config"
54"d7y.io/dragonfly/v2/client/daemon/storage"
55"d7y.io/dragonfly/v2/client/daemon/test"
56"d7y.io/dragonfly/v2/client/util"
57"d7y.io/dragonfly/v2/internal/dferrors"
58logger "d7y.io/dragonfly/v2/internal/dflog"
59"d7y.io/dragonfly/v2/pkg/dfnet"
60"d7y.io/dragonfly/v2/pkg/digest"
61"d7y.io/dragonfly/v2/pkg/idgen"
62nethttp "d7y.io/dragonfly/v2/pkg/net/http"
63"d7y.io/dragonfly/v2/pkg/rpc"
64daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
65schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
66schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
67"d7y.io/dragonfly/v2/pkg/source"
68"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
69sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"
70)
71
72func TestMain(m *testing.M) {
73logger.SetLevel(zapcore.DebugLevel)
74os.Exit(m.Run())
75}
76
77type componentsOption struct {
78taskID string
79contentLength int64
80pieceSize uint32
81pieceParallelCount int32
82pieceDownloader PieceDownloader
83sourceClient source.ResourceClient
84peerPacketDelay []time.Duration
85backSource bool
86scope commonv1.SizeScope
87content []byte
88reregister bool
89}
90
91func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) (
92schedulerclient.V1, storage.Manager) {
93port := int32(freeport.GetPort())
94// 1. set up a mock daemon server for uploading pieces info
95var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl)
96
97// 1.1 calculate piece digest and total digest
98r := bytes.NewBuffer(opt.content)
99var pieces = make([]string, int(math.Ceil(float64(len(opt.content))/float64(opt.pieceSize))))
100for i := range pieces {
101pieces[i] = digest.MD5FromReader(io.LimitReader(r, int64(opt.pieceSize)))
102}
103totalDigests := digest.SHA256FromStrings(pieces...)
104genPiecePacket := func(request *commonv1.PieceTaskRequest) *commonv1.PiecePacket {
105var tasks []*commonv1.PieceInfo
106for i := uint32(0); i < request.Limit; i++ {
107start := opt.pieceSize * (request.StartNum + i)
108if int64(start)+1 > opt.contentLength {
109break
110}
111size := opt.pieceSize
112if int64(start+opt.pieceSize) > opt.contentLength {
113size = uint32(opt.contentLength) - start
114}
115tasks = append(tasks,
116&commonv1.PieceInfo{
117PieceNum: int32(request.StartNum + i),
118RangeStart: uint64(start),
119RangeSize: size,
120PieceMd5: pieces[request.StartNum+i],
121PieceOffset: 0,
122PieceStyle: 0,
123})
124}
125return &commonv1.PiecePacket{
126TaskId: request.TaskId,
127DstPid: "peer-x",
128PieceInfos: tasks,
129ContentLength: opt.contentLength,
130TotalPiece: int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))),
131PieceMd5Sign: totalDigests,
132}
133}
134daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().
135DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
136return nil, status.Error(codes.Unimplemented, "TODO")
137})
138daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
139request, err := s.Recv()
140if err != nil {
141return err
142}
143if err = s.Send(genPiecePacket(request)); err != nil {
144return err
145}
146for {
147request, err = s.Recv()
148if err == io.EOF {
149break
150}
151if err != nil {
152return err
153}
154if err = s.Send(genPiecePacket(request)); err != nil {
155return err
156}
157}
158return nil
159})
160ln, _ := rpc.Listen(dfnet.NetAddr{
161Type: "tcp",
162Addr: fmt.Sprintf("0.0.0.0:%d", port),
163})
164
165go func() {
166hs := health.NewServer()
167if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
168panic(err)
169}
170}()
171
172time.Sleep(100 * time.Millisecond)
173
174// 2. setup a scheduler
175pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
176pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
177func(pr *schedulerv1.PieceResult) error {
178return nil
179})
180var (
181delayCount int
182sent = make(chan struct{}, 1)
183)
184sent <- struct{}{}
185var reregistered bool
186pps.EXPECT().Recv().AnyTimes().DoAndReturn(
187func() (*schedulerv1.PeerPacket, error) {
188if opt.reregister && !reregistered {
189reregistered = true
190return nil, dferrors.New(commonv1.Code_SchedReregister, "reregister")
191}
192if len(opt.peerPacketDelay) > delayCount {
193if delay := opt.peerPacketDelay[delayCount]; delay > 0 {
194time.Sleep(delay)
195}
196delayCount++
197}
198<-sent
199if opt.backSource {
200return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
201}
202return &schedulerv1.PeerPacket{
203Code: commonv1.Code_Success,
204TaskId: opt.taskID,
205SrcPid: "127.0.0.1",
206MainPeer: &schedulerv1.PeerPacket_DestPeer{
207Ip: "127.0.0.1",
208RpcPort: port,
209PeerId: "peer-x",
210},
211CandidatePeers: nil,
212}, nil
213})
214pps.EXPECT().CloseSend().AnyTimes()
215
216sched := schedulerclientmocks.NewMockV1(ctrl)
217sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
218func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {
219switch opt.scope {
220case commonv1.SizeScope_TINY:
221return &schedulerv1.RegisterResult{
222TaskId: opt.taskID,
223SizeScope: commonv1.SizeScope_TINY,
224DirectPiece: &schedulerv1.RegisterResult_PieceContent{
225PieceContent: opt.content,
226},
227}, nil
228case commonv1.SizeScope_SMALL:
229return &schedulerv1.RegisterResult{
230TaskId: opt.taskID,
231SizeScope: commonv1.SizeScope_SMALL,
232DirectPiece: &schedulerv1.RegisterResult_SinglePiece{
233SinglePiece: &schedulerv1.SinglePiece{
234DstPid: "fake-pid",
235DstAddr: "fake-addr",
236PieceInfo: &commonv1.PieceInfo{
237PieceNum: 0,
238RangeStart: 0,
239RangeSize: uint32(opt.contentLength),
240PieceMd5: pieces[0],
241PieceOffset: 0,
242PieceStyle: 0,
243},
244},
245},
246}, nil
247}
248return &schedulerv1.RegisterResult{
249TaskId: opt.taskID,
250SizeScope: commonv1.SizeScope_NORMAL,
251DirectPiece: nil,
252}, nil
253})
254sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
255func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (
256schedulerv1.Scheduler_ReportPieceResultClient, error) {
257return pps, nil
258})
259sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
260func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {
261return nil
262})
263tempDir, _ := os.MkdirTemp("", "d7y-test-*")
264storageManager, _ := storage.NewStorageManager(
265config.SimpleLocalTaskStoreStrategy,
266&config.StorageOption{
267DataPath: tempDir,
268TaskExpireTime: util.Duration{
269Duration: -1 * time.Second,
270},
271}, func(request storage.CommonTaskRequest) {}, os.FileMode(0700))
272return sched, storageManager
273}
274
275type mockManager struct {
276testSpec *testSpec
277peerTaskManager *peerTaskManager
278schedulerClient schedulerclient.V1
279storageManager storage.Manager
280}
281
282func (m *mockManager) CleanUp() {
283m.storageManager.CleanUp()
284for _, f := range m.testSpec.cleanUp {
285f()
286}
287}
288
289func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOption) *mockManager {
290schedulerClient, storageManager := setupPeerTaskManagerComponents(ctrl, opt)
291scheduleTimeout := util.Duration{Duration: 10 * time.Minute}
292if ts.scheduleTimeout > 0 {
293scheduleTimeout = util.Duration{Duration: ts.scheduleTimeout}
294}
295ptm := &peerTaskManager{
296conductorLock: &sync.Mutex{},
297runningPeerTasks: sync.Map{},
298trafficShaper: NewTrafficShaper("plain", 0, nil),
299TaskManagerOption: TaskManagerOption{
300SchedulerClient: schedulerClient,
301TaskOption: TaskOption{
302CalculateDigest: true,
303PeerHost: &schedulerv1.PeerHost{
304Ip: "127.0.0.1",
305},
306PieceManager: &pieceManager{
307calculateDigest: true,
308pieceDownloader: opt.pieceDownloader,
309computePieceSize: func(contentLength int64) uint32 {
310return opt.pieceSize
311},
312},
313StorageManager: storageManager,
314SchedulerOption: config.SchedulerOption{
315ScheduleTimeout: scheduleTimeout,
316},
317GRPCDialTimeout: time.Second,
318GRPCCredentials: insecure.NewCredentials(),
319},
320},
321}
322return &mockManager{
323testSpec: ts,
324peerTaskManager: ptm,
325schedulerClient: schedulerClient,
326storageManager: storageManager,
327}
328}
329
330const (
331taskTypeFile = iota
332taskTypeStream
333taskTypeConductor
334taskTypeSeed
335)
336
337type testSpec struct {
338runTaskTypes []int
339taskType int
340name string
341taskData []byte
342httpRange *nethttp.Range // only used in back source cases
343pieceParallelCount int32
344pieceSize int
345sizeScope commonv1.SizeScope
346peerID string
347url string
348reregister bool
349// when urlGenerator is not nil, use urlGenerator instead url
350// it's useful for httptest server
351urlGenerator func(ts *testSpec) string
352
353// mock schedule timeout
354peerPacketDelay []time.Duration
355scheduleTimeout time.Duration
356backSource bool
357
358mockPieceDownloader func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader
359mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient
360
361cleanUp []func()
362}
363
364func TestPeerTaskManager_TaskSuite(t *testing.T) {
365assert := testifyassert.New(t)
366require := testifyrequire.New(t)
367testBytes, err := os.ReadFile(test.File)
368require.Nil(err, "load test file")
369
370commonPieceDownloader := func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader {
371downloader := NewMockPieceDownloader(ctrl)
372downloader.EXPECT().DownloadPiece(gomock.Any(), gomock.Any()).Times(
373int(math.Ceil(float64(len(taskData)) / float64(pieceSize)))).DoAndReturn(
374func(ctx context.Context, task *DownloadPieceRequest) (io.Reader, io.Closer, error) {
375rc := io.NopCloser(
376bytes.NewBuffer(
377taskData[task.piece.RangeStart : task.piece.RangeStart+uint64(task.piece.RangeSize)],
378))
379return rc, rc, nil
380})
381return downloader
382}
383
384taskTypes := []int{taskTypeConductor, taskTypeFile, taskTypeStream} // seed task need back source client
385taskTypeNames := []string{"conductor", "file", "stream", "seed"}
386
387testCases := []testSpec{
388{
389name: "normal size scope - p2p",
390taskData: testBytes,
391pieceParallelCount: 4,
392pieceSize: 1024,
393peerID: "normal-size-peer",
394url: "http://localhost/test/data",
395sizeScope: commonv1.SizeScope_NORMAL,
396mockPieceDownloader: commonPieceDownloader,
397mockHTTPSourceClient: nil,
398},
399{
400name: "normal size scope - p2p - reregister",
401taskData: testBytes,
402pieceParallelCount: 4,
403pieceSize: 1024,
404peerID: "normal-size-peer",
405url: "http://localhost/test/data",
406sizeScope: commonv1.SizeScope_NORMAL,
407mockPieceDownloader: commonPieceDownloader,
408mockHTTPSourceClient: nil,
409reregister: true,
410},
411{
412name: "small size scope - p2p",
413taskData: testBytes,
414pieceParallelCount: 4,
415pieceSize: 16384,
416peerID: "small-size-peer",
417url: "http://localhost/test/data",
418sizeScope: commonv1.SizeScope_SMALL,
419mockPieceDownloader: commonPieceDownloader,
420mockHTTPSourceClient: nil,
421},
422{
423name: "tiny size scope - p2p",
424taskData: testBytes[:64],
425pieceParallelCount: 4,
426pieceSize: 1024,
427peerID: "tiny-size-peer",
428url: "http://localhost/test/data",
429sizeScope: commonv1.SizeScope_TINY,
430mockPieceDownloader: nil,
431mockHTTPSourceClient: nil,
432},
433{
434name: "empty file - p2p",
435taskData: []byte{},
436pieceParallelCount: 4,
437pieceSize: 1024,
438peerID: "empty-file-peer",
439url: "http://localhost/test/data",
440sizeScope: commonv1.SizeScope_NORMAL,
441mockPieceDownloader: commonPieceDownloader,
442mockHTTPSourceClient: nil,
443},
444{
445name: "normal size scope - back source - content length",
446runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
447taskData: testBytes,
448pieceParallelCount: 4,
449pieceSize: 1024,
450peerID: "normal-size-peer-back-source",
451backSource: true,
452url: "http://localhost/test/data",
453sizeScope: commonv1.SizeScope_NORMAL,
454mockPieceDownloader: nil,
455mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
456sourceClient := sourcemocks.NewMockResourceClient(ctrl)
457sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
458func(request *source.Request) (int64, error) {
459return int64(len(taskData)), nil
460})
461sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
462func(request *source.Request) (*source.Response, error) {
463return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
464})
465return sourceClient
466},
467},
468{
469name: "normal size scope - range - back source - content length",
470runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
471taskData: testBytes[0:4096],
472pieceParallelCount: 4,
473pieceSize: 1024,
474peerID: "normal-size-peer-range-back-source",
475backSource: true,
476url: "http://localhost/test/data",
477sizeScope: commonv1.SizeScope_NORMAL,
478httpRange: &nethttp.Range{
479Start: 0,
480Length: 4096,
481},
482mockPieceDownloader: nil,
483mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
484sourceClient := sourcemocks.NewMockResourceClient(ctrl)
485sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
486func(request *source.Request) (int64, error) {
487assert := testifyassert.New(t)
488if rg != nil {
489rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
490assert.Nil(err)
491assert.Equal(1, len(rgs))
492assert.Equal(rg.String(), rgs[0].String())
493}
494return int64(len(taskData)), nil
495})
496sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
497func(request *source.Request) (*source.Response, error) {
498assert := testifyassert.New(t)
499if rg != nil {
500rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
501assert.Nil(err)
502assert.Equal(1, len(rgs))
503assert.Equal(rg.String(), rgs[0].String())
504}
505return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
506})
507return sourceClient
508},
509},
510{
511name: "normal size scope - back source - no content length",
512runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
513taskData: testBytes,
514pieceParallelCount: 4,
515pieceSize: 1024,
516peerID: "normal-size-peer-back-source-no-length",
517backSource: true,
518url: "http://localhost/test/data",
519sizeScope: commonv1.SizeScope_NORMAL,
520mockPieceDownloader: nil,
521mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
522sourceClient := sourcemocks.NewMockResourceClient(ctrl)
523sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
524func(request *source.Request) (int64, error) {
525return -1, nil
526})
527sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
528func(request *source.Request) (*source.Response, error) {
529return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
530})
531return sourceClient
532},
533},
534{
535name: "normal size scope - back source - no content length - aligning",
536runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
537taskData: testBytes[:8192],
538pieceParallelCount: 4,
539pieceSize: 1024,
540peerID: "normal-size-peer-back-source-aligning-no-length",
541backSource: true,
542url: "http://localhost/test/data",
543sizeScope: commonv1.SizeScope_NORMAL,
544mockPieceDownloader: nil,
545mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
546sourceClient := sourcemocks.NewMockResourceClient(ctrl)
547sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
548func(request *source.Request) (int64, error) {
549return -1, nil
550})
551sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
552func(request *source.Request) (*source.Response, error) {
553return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
554})
555return sourceClient
556},
557},
558{
559name: "normal size scope - schedule timeout - auto back source",
560taskData: testBytes,
561pieceParallelCount: 4,
562pieceSize: 1024,
563peerID: "normal-size-peer-schedule-timeout",
564peerPacketDelay: []time.Duration{time.Second},
565scheduleTimeout: time.Nanosecond,
566urlGenerator: func(ts *testSpec) string {
567server := httptest.NewServer(http.HandlerFunc(
568func(w http.ResponseWriter, r *http.Request) {
569n, err := w.Write(testBytes)
570assert.Nil(err)
571assert.Equal(len(ts.taskData), n)
572}))
573ts.cleanUp = append(ts.cleanUp, func() {
574server.Close()
575})
576return server.URL
577},
578sizeScope: commonv1.SizeScope_NORMAL,
579mockPieceDownloader: nil,
580mockHTTPSourceClient: nil,
581},
582{
583name: "empty file peer - back source - content length",
584runTaskTypes: []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
585taskData: []byte{},
586pieceParallelCount: 4,
587pieceSize: 1024,
588peerID: "empty-file-peer-back-source",
589backSource: true,
590url: "http://localhost/test/data",
591sizeScope: commonv1.SizeScope_NORMAL,
592mockPieceDownloader: nil,
593mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
594sourceClient := sourcemocks.NewMockResourceClient(ctrl)
595sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
596func(request *source.Request) (int64, error) {
597return int64(len(taskData)), nil
598})
599sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
600func(request *source.Request) (*source.Response, error) {
601return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
602})
603return sourceClient
604},
605},
606}
607
608for _, _tc := range testCases {
609t.Run(_tc.name, func(t *testing.T) {
610var types = _tc.runTaskTypes
611if _tc.runTaskTypes == nil {
612types = taskTypes
613}
614assert = testifyassert.New(t)
615require = testifyrequire.New(t)
616for _, typ := range types {
617// dup a new test case with the task type
618logger.Infof("-------------------- test %s - type %s, started --------------------",
619_tc.name, taskTypeNames[typ])
620tc := _tc
621tc.taskType = typ
622func() {
623ctrl := gomock.NewController(t)
624defer ctrl.Finish()
625mockContentLength := len(tc.taskData)
626
627urlMeta := &commonv1.UrlMeta{
628Tag: "d7y-test",
629}
630
631if tc.httpRange != nil {
632urlMeta.Range = strings.TrimPrefix(tc.httpRange.String(), "bytes=")
633}
634
635if tc.urlGenerator != nil {
636tc.url = tc.urlGenerator(&tc)
637}
638taskID := idgen.TaskIDV1(tc.url, urlMeta)
639
640var (
641downloader PieceDownloader
642sourceClient source.ResourceClient
643)
644
645if tc.mockPieceDownloader != nil {
646downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize)
647}
648
649if tc.mockHTTPSourceClient != nil {
650source.UnRegister("http")
651defer func() {
652// reset source client
653source.UnRegister("http")
654require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter))
655}()
656// replace source client
657sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url)
658require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
659}
660
661option := componentsOption{
662taskID: taskID,
663contentLength: int64(mockContentLength),
664pieceSize: uint32(tc.pieceSize),
665pieceParallelCount: tc.pieceParallelCount,
666pieceDownloader: downloader,
667sourceClient: sourceClient,
668content: tc.taskData,
669scope: tc.sizeScope,
670peerPacketDelay: tc.peerPacketDelay,
671backSource: tc.backSource,
672reregister: tc.reregister,
673}
674// keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same
675if tc.taskType == taskTypeConductor {
676option.peerPacketDelay = []time.Duration{time.Second}
677}
678mm := setupMockManager(ctrl, &tc, option)
679defer mm.CleanUp()
680
681tc.run(assert, require, mm, urlMeta)
682}()
683logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ])
684}
685})
686}
687}
688
689func (ts *testSpec) run(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
690switch ts.taskType {
691case taskTypeFile:
692ts.runFileTaskTest(assert, require, mm, urlMeta)
693case taskTypeStream:
694ts.runStreamTaskTest(assert, require, mm, urlMeta)
695case taskTypeConductor:
696ts.runConductorTest(assert, require, mm, urlMeta)
697case taskTypeSeed:
698ts.runSeedTaskTest(assert, require, mm, urlMeta)
699default:
700panic("unknown test type")
701}
702}
703
704func (ts *testSpec) runFileTaskTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
705var output = "../test/testdata/test.output"
706defer func() {
707assert.Nil(os.Remove(output))
708}()
709progress, err := mm.peerTaskManager.StartFileTask(
710context.Background(),
711&FileTaskRequest{
712PeerTaskRequest: schedulerv1.PeerTaskRequest{
713Url: ts.url,
714UrlMeta: urlMeta,
715PeerId: ts.peerID,
716PeerHost: &schedulerv1.PeerHost{},
717},
718Output: output,
719})
720require.Nil(err, "start file peer task")
721
722var p *FileTaskProgress
723for p = range progress {
724require.True(p.State.Success)
725if p.PeerTaskDone {
726p.DoneCallback()
727break
728}
729}
730require.NotNil(p)
731require.True(p.PeerTaskDone)
732
733outputBytes, err := os.ReadFile(output)
734require.Nil(err, "load output file")
735require.Equal(ts.taskData, outputBytes, "output and desired output must match")
736}
737
738func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
739r, _, err := mm.peerTaskManager.StartStreamTask(
740context.Background(),
741&StreamTaskRequest{
742URL: ts.url,
743URLMeta: urlMeta,
744PeerID: ts.peerID,
745})
746require.Nil(err, "start stream peer task")
747
748outputBytes, err := io.ReadAll(r)
749require.Nil(err, "load read data")
750require.Equal(ts.taskData, outputBytes, "output and desired output must match")
751}
752
753func (ts *testSpec) runSeedTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
754r, _, err := mm.peerTaskManager.StartSeedTask(
755context.Background(),
756&SeedTaskRequest{
757PeerTaskRequest: schedulerv1.PeerTaskRequest{
758Url: ts.url,
759UrlMeta: urlMeta,
760PeerId: ts.peerID,
761PeerHost: &schedulerv1.PeerHost{},
762IsMigrating: false,
763},
764Limit: 0,
765Range: nil,
766})
767
768require.Nil(err, "start seed peer task")
769
770var success bool
771
772loop:
773for {
774select {
775case <-r.Context.Done():
776break loop
777case <-r.Success:
778success = true
779break loop
780case <-r.Fail:
781break loop
782case p := <-r.PieceInfoChannel:
783if p.Finished {
784success = true
785break loop
786}
787case <-time.After(5 * time.Minute):
788buf := make([]byte, 16384)
789buf = buf[:runtime.Stack(buf, true)]
790fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
791}
792}
793
794require.True(success, "seed task should success")
795}
796
797func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
798var (
799ptm = mm.peerTaskManager
800pieceSize = ts.pieceSize
801taskID = idgen.TaskIDV1(ts.url, urlMeta)
802output = "../test/testdata/test.output"
803)
804defer func() {
805assert.Nil(os.Remove(output))
806}()
807
808peerTaskRequest := &schedulerv1.PeerTaskRequest{
809Url: ts.url,
810UrlMeta: urlMeta,
811PeerId: ts.peerID,
812PeerHost: &schedulerv1.PeerHost{},
813}
814
815ptc, created, err := ptm.getOrCreatePeerTaskConductor(
816context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4), nil, nil, "", false)
817assert.Nil(err, "load first peerTaskConductor")
818assert.True(created, "should create a new peerTaskConductor")
819
820var ptcCount = 100
821var wg = &sync.WaitGroup{}
822wg.Add(ptcCount + 1)
823
824var result = make([]bool, ptcCount)
825
826go func(ptc *peerTaskConductor) {
827defer wg.Done()
828select {
829case <-time.After(5 * time.Minute):
830ptc.Fail()
831case <-ptc.successCh:
832return
833case <-ptc.failCh:
834return
835}
836}(ptc)
837
838syncFunc := func(i int, ptc *peerTaskConductor) {
839pieceCh := ptc.broker.Subscribe()
840defer wg.Done()
841for {
842select {
843case <-pieceCh:
844case <-ptc.successCh:
845result[i] = true
846return
847case <-ptc.failCh:
848return
849}
850}
851}
852
853for i := 0; i < ptcCount; i++ {
854request := &schedulerv1.PeerTaskRequest{
855Url: ts.url,
856UrlMeta: urlMeta,
857PeerId: fmt.Sprintf("should-not-use-peer-%d", i),
858PeerHost: &schedulerv1.PeerHost{},
859}
860p, created, err := ptm.getOrCreatePeerTaskConductor(
861context.Background(), taskID, request, rate.Limit(pieceSize*3), nil, nil, "", false)
862assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i))
863assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i))
864assert.False(created, "should not create a new peerTaskConductor")
865go syncFunc(i, p)
866}
867
868require.Nil(ptc.start(), "peerTaskConductor start should be ok")
869
870switch ts.sizeScope {
871case commonv1.SizeScope_TINY:
872require.NotNil(ptc.tinyData)
873case commonv1.SizeScope_SMALL:
874require.NotNil(ptc.singlePiece)
875}
876
877wg.Wait()
878
879for i, r := range result {
880assert.True(r, fmt.Sprintf("task %d result should be true", i))
881}
882
883var (
884noRunningTask = true
885success bool
886)
887select {
888case <-ptc.successCh:
889success = true
890case <-ptc.failCh:
891case <-time.After(5 * time.Minute):
892buf := make([]byte, 16384)
893buf = buf[:runtime.Stack(buf, true)]
894fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
895}
896assert.True(success, "task should success")
897
898for i := 0; i < 3; i++ {
899ptm.runningPeerTasks.Range(func(key, value any) bool {
900noRunningTask = false
901return false
902})
903if noRunningTask {
904break
905}
906noRunningTask = true
907time.Sleep(100 * time.Millisecond)
908}
909assert.True(noRunningTask, "no running tasks")
910
911// test reuse stream task
912rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(),
913taskID,
914&StreamTaskRequest{
915URL: ts.url,
916URLMeta: urlMeta,
917PeerID: ts.peerID,
918})
919
920assert.True(ok, "reuse stream task")
921assert.NotNil(rc, "reuse stream task")
922if rc == nil {
923return
924}
925
926defer func() {
927assert.Nil(rc.Close())
928}()
929
930data, err := io.ReadAll(rc)
931assert.Nil(err, "read all should be ok")
932assert.Equal(ts.taskData, data, "stream output and desired output must match")
933
934// test reuse file task
935progress, ok := ptm.tryReuseFilePeerTask(
936context.Background(),
937&FileTaskRequest{
938PeerTaskRequest: schedulerv1.PeerTaskRequest{
939Url: ts.url,
940UrlMeta: urlMeta,
941PeerId: ts.peerID,
942PeerHost: &schedulerv1.PeerHost{},
943},
944Output: output,
945})
946
947assert.True(ok, "reuse file task")
948var p *FileTaskProgress
949select {
950case p = <-progress:
951default:
952}
953
954assert.NotNil(p, "progress should not be nil")
955outputBytes, err := os.ReadFile(output)
956assert.Nil(err, "load output file should be ok")
957assert.Equal(ts.taskData, outputBytes, "file output and desired output must match")
958}
959