Dragonfly2
329 строк · 10.2 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"bytes"
21"context"
22"fmt"
23"io"
24"log"
25"math"
26"net"
27"os"
28"sync"
29"testing"
30"time"
31
32"github.com/phayes/freeport"
33testifyassert "github.com/stretchr/testify/assert"
34"github.com/stretchr/testify/require"
35"go.uber.org/atomic"
36"go.uber.org/mock/gomock"
37"google.golang.org/grpc"
38"google.golang.org/grpc/credentials/insecure"
39"google.golang.org/grpc/health"
40
41commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
42dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
43dfdaemonv1mocks "d7y.io/api/v2/pkg/apis/dfdaemon/v1/mocks"
44schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
45schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
46
47"d7y.io/dragonfly/v2/client/config"
48"d7y.io/dragonfly/v2/client/daemon/storage"
49"d7y.io/dragonfly/v2/client/daemon/test"
50"d7y.io/dragonfly/v2/client/util"
51"d7y.io/dragonfly/v2/internal/dferrors"
52"d7y.io/dragonfly/v2/pkg/dfnet"
53"d7y.io/dragonfly/v2/pkg/digest"
54"d7y.io/dragonfly/v2/pkg/rpc"
55daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
56schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
57clientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
58"d7y.io/dragonfly/v2/pkg/source"
59"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
60sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"
61)
62
63func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, opt componentsOption) (
64schedulerclient.V1, storage.Manager) {
65port := int32(freeport.GetPort())
66// 1. set up a mock daemon server for uploading pieces info
67var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl)
68
69var piecesMd5 []string
70pieceCount := int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize)))
71for i := int32(0); i < pieceCount; i++ {
72if int64(i+1)*int64(opt.pieceSize) > opt.contentLength {
73piecesMd5 = append(piecesMd5, digest.MD5FromBytes(testBytes[int(i)*int(opt.pieceSize):]))
74} else {
75piecesMd5 = append(piecesMd5, digest.MD5FromBytes(testBytes[int(i)*int(opt.pieceSize):int(i+1)*int(opt.pieceSize)]))
76}
77}
78daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
79var tasks []*commonv1.PieceInfo
80// only return first piece
81if request.StartNum == 0 {
82tasks = append(tasks,
83&commonv1.PieceInfo{
84PieceNum: int32(request.StartNum),
85RangeStart: uint64(0),
86RangeSize: opt.pieceSize,
87PieceMd5: digest.MD5FromBytes(testBytes[0:opt.pieceSize]),
88PieceOffset: 0,
89PieceStyle: 0,
90})
91}
92return &commonv1.PiecePacket{
93PieceMd5Sign: digest.SHA256FromStrings(piecesMd5...),
94TaskId: request.TaskId,
95DstPid: "peer-x",
96PieceInfos: tasks,
97ContentLength: opt.contentLength,
98TotalPiece: pieceCount,
99}, nil
100})
101daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
102request, err := s.Recv()
103if err != nil {
104return err
105}
106var tasks []*commonv1.PieceInfo
107// only return first piece
108if request.StartNum == 0 {
109tasks = append(tasks,
110&commonv1.PieceInfo{
111PieceNum: int32(request.StartNum),
112RangeStart: uint64(0),
113RangeSize: opt.pieceSize,
114PieceMd5: digest.MD5FromBytes(testBytes[0:opt.pieceSize]),
115PieceOffset: 0,
116PieceStyle: 0,
117})
118}
119pp := &commonv1.PiecePacket{
120PieceMd5Sign: digest.SHA256FromStrings(piecesMd5...),
121TaskId: request.TaskId,
122DstPid: "peer-x",
123PieceInfos: tasks,
124ContentLength: opt.contentLength,
125TotalPiece: pieceCount,
126}
127if err = s.Send(pp); err != nil {
128return err
129}
130for {
131_, err = s.Recv()
132if err == io.EOF {
133break
134}
135if err != nil {
136return err
137}
138}
139return nil
140})
141ln, _ := rpc.Listen(dfnet.NetAddr{
142Type: "tcp",
143Addr: fmt.Sprintf("0.0.0.0:%d", port),
144})
145go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) {
146hs := health.NewServer()
147if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
148log.Fatal(err)
149}
150}(daemon, ln)
151time.Sleep(100 * time.Millisecond)
152
153// 2. setup a scheduler
154pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
155var (
156wg = sync.WaitGroup{}
157backSourceSent = atomic.Bool{}
158)
159wg.Add(1)
160
161pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
162func(pr *schedulerv1.PieceResult) error {
163if pr.PieceInfo.PieceNum == 0 && pr.Success {
164if !backSourceSent.Load() {
165wg.Done()
166backSourceSent.Store(true)
167}
168}
169return nil
170})
171var (
172delayCount int
173schedPeerPacket bool
174)
175pps.EXPECT().Recv().AnyTimes().DoAndReturn(
176func() (*schedulerv1.PeerPacket, error) {
177if len(opt.peerPacketDelay) > delayCount {
178if delay := opt.peerPacketDelay[delayCount]; delay > 0 {
179time.Sleep(delay)
180}
181delayCount++
182}
183if schedPeerPacket {
184// send back source after piece 0 is done
185wg.Wait()
186return nil, dferrors.New(commonv1.Code_SchedNeedBackSource, "")
187}
188schedPeerPacket = true
189return &schedulerv1.PeerPacket{
190Code: commonv1.Code_Success,
191TaskId: opt.taskID,
192SrcPid: "127.0.0.1",
193MainPeer: &schedulerv1.PeerPacket_DestPeer{
194Ip: "127.0.0.1",
195RpcPort: port,
196PeerId: "peer-x",
197},
198CandidatePeers: nil,
199}, nil
200})
201pps.EXPECT().CloseSend().AnyTimes()
202sched := clientmocks.NewMockV1(ctrl)
203sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
204func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {
205return &schedulerv1.RegisterResult{
206TaskId: opt.taskID,
207SizeScope: commonv1.SizeScope_NORMAL,
208DirectPiece: nil,
209}, nil
210})
211sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
212func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_ReportPieceResultClient, error) {
213return pps, nil
214})
215sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
216func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {
217return nil
218})
219tempDir, _ := os.MkdirTemp("", "d7y-test-*")
220storageManager, _ := storage.NewStorageManager(
221config.SimpleLocalTaskStoreStrategy,
222&config.StorageOption{
223DataPath: tempDir,
224TaskExpireTime: util.Duration{
225Duration: -1 * time.Second,
226},
227}, func(request storage.CommonTaskRequest) {}, os.FileMode(0700))
228return sched, storageManager
229}
230
231// TestStreamPeerTask_BackSource_Partial_WithContentLength tests that get piece from other peers first, then scheduler says back source
232func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) {
233assert := testifyassert.New(t)
234ctrl := gomock.NewController(t)
235
236testBytes, err := os.ReadFile(test.File)
237assert.Nil(err, "load test file")
238
239var (
240pieceParallelCount = int32(4)
241pieceSize = 1024
242
243mockContentLength = len(testBytes)
244//mockPieceCount = int(math.Ceil(float64(mockContentLength) / float64(pieceSize)))
245
246peerID = "peer-back-source-partial-0"
247taskID = "task-back-source-partial-0"
248
249url = "http://localhost/test/data"
250)
251schedulerClient, storageManager := setupBackSourcePartialComponents(
252ctrl, testBytes,
253componentsOption{
254taskID: taskID,
255contentLength: int64(mockContentLength),
256pieceSize: uint32(pieceSize),
257pieceParallelCount: pieceParallelCount,
258content: testBytes,
259})
260defer storageManager.CleanUp()
261
262downloader := NewMockPieceDownloader(ctrl)
263downloader.EXPECT().DownloadPiece(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
264func(ctx context.Context, task *DownloadPieceRequest) (io.Reader, io.Closer, error) {
265rc := io.NopCloser(
266bytes.NewBuffer(
267testBytes[task.piece.RangeStart : task.piece.RangeStart+uint64(task.piece.RangeSize)],
268))
269return rc, rc, nil
270})
271
272sourceClient := sourcemocks.NewMockResourceClient(ctrl)
273source.UnRegister("http")
274require.Nil(t, source.Register("http", sourceClient, httpprotocol.Adapter))
275defer source.UnRegister("http")
276sourceClient.EXPECT().Download(gomock.Any()).DoAndReturn(
277func(request *source.Request) (*source.Response, error) {
278response := source.NewResponse(io.NopCloser(bytes.NewBuffer(testBytes)))
279response.ContentLength = int64(len(testBytes))
280return response, nil
281})
282
283pm := &pieceManager{
284calculateDigest: true,
285pieceDownloader: downloader,
286computePieceSize: func(contentLength int64) uint32 {
287return uint32(pieceSize)
288},
289}
290ptm := &peerTaskManager{
291conductorLock: &sync.Mutex{},
292runningPeerTasks: sync.Map{},
293trafficShaper: NewTrafficShaper("plain", 0, nil),
294TaskManagerOption: TaskManagerOption{
295SchedulerClient: schedulerClient,
296TaskOption: TaskOption{
297CalculateDigest: true,
298PeerHost: &schedulerv1.PeerHost{
299Ip: "127.0.0.1",
300},
301PieceManager: pm,
302StorageManager: storageManager,
303SchedulerOption: config.SchedulerOption{
304ScheduleTimeout: util.Duration{Duration: 10 * time.Minute},
305},
306GRPCDialTimeout: time.Second,
307GRPCCredentials: insecure.NewCredentials(),
308},
309},
310}
311req := &schedulerv1.PeerTaskRequest{
312Url: url,
313UrlMeta: &commonv1.UrlMeta{
314Tag: "d7y-test",
315},
316PeerId: peerID,
317PeerHost: &schedulerv1.PeerHost{},
318}
319ctx := context.Background()
320pt, err := ptm.newStreamTask(ctx, taskID, req, nil)
321assert.Nil(err, "new stream peer task")
322
323rc, _, err := pt.Start(ctx)
324assert.Nil(err, "start stream peer task")
325
326outputBytes, err := io.ReadAll(rc)
327assert.Nil(err, "load read data")
328assert.Equal(testBytes, outputBytes, "output and desired output must match")
329}
330