Dragonfly2

Форк
0
/
peertask_stream_backsource_partial_test.go 
329 строк · 10.2 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"bytes"
21
	"context"
22
	"fmt"
23
	"io"
24
	"log"
25
	"math"
26
	"net"
27
	"os"
28
	"sync"
29
	"testing"
30
	"time"
31

32
	"github.com/phayes/freeport"
33
	testifyassert "github.com/stretchr/testify/assert"
34
	"github.com/stretchr/testify/require"
35
	"go.uber.org/atomic"
36
	"go.uber.org/mock/gomock"
37
	"google.golang.org/grpc"
38
	"google.golang.org/grpc/credentials/insecure"
39
	"google.golang.org/grpc/health"
40

41
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
42
	dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
43
	dfdaemonv1mocks "d7y.io/api/v2/pkg/apis/dfdaemon/v1/mocks"
44
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
45
	schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
46

47
	"d7y.io/dragonfly/v2/client/config"
48
	"d7y.io/dragonfly/v2/client/daemon/storage"
49
	"d7y.io/dragonfly/v2/client/daemon/test"
50
	"d7y.io/dragonfly/v2/client/util"
51
	"d7y.io/dragonfly/v2/internal/dferrors"
52
	"d7y.io/dragonfly/v2/pkg/dfnet"
53
	"d7y.io/dragonfly/v2/pkg/digest"
54
	"d7y.io/dragonfly/v2/pkg/rpc"
55
	daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
56
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
57
	clientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
58
	"d7y.io/dragonfly/v2/pkg/source"
59
	"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
60
	sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"
61
)
62

63
func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, opt componentsOption) (
64
	schedulerclient.V1, storage.Manager) {
65
	port := int32(freeport.GetPort())
66
	// 1. set up a mock daemon server for uploading pieces info
67
	var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl)
68

69
	var piecesMd5 []string
70
	pieceCount := int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize)))
71
	for i := int32(0); i < pieceCount; i++ {
72
		if int64(i+1)*int64(opt.pieceSize) > opt.contentLength {
73
			piecesMd5 = append(piecesMd5, digest.MD5FromBytes(testBytes[int(i)*int(opt.pieceSize):]))
74
		} else {
75
			piecesMd5 = append(piecesMd5, digest.MD5FromBytes(testBytes[int(i)*int(opt.pieceSize):int(i+1)*int(opt.pieceSize)]))
76
		}
77
	}
78
	daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
79
		var tasks []*commonv1.PieceInfo
80
		// only return first piece
81
		if request.StartNum == 0 {
82
			tasks = append(tasks,
83
				&commonv1.PieceInfo{
84
					PieceNum:    int32(request.StartNum),
85
					RangeStart:  uint64(0),
86
					RangeSize:   opt.pieceSize,
87
					PieceMd5:    digest.MD5FromBytes(testBytes[0:opt.pieceSize]),
88
					PieceOffset: 0,
89
					PieceStyle:  0,
90
				})
91
		}
92
		return &commonv1.PiecePacket{
93
			PieceMd5Sign:  digest.SHA256FromStrings(piecesMd5...),
94
			TaskId:        request.TaskId,
95
			DstPid:        "peer-x",
96
			PieceInfos:    tasks,
97
			ContentLength: opt.contentLength,
98
			TotalPiece:    pieceCount,
99
		}, nil
100
	})
101
	daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
102
		request, err := s.Recv()
103
		if err != nil {
104
			return err
105
		}
106
		var tasks []*commonv1.PieceInfo
107
		// only return first piece
108
		if request.StartNum == 0 {
109
			tasks = append(tasks,
110
				&commonv1.PieceInfo{
111
					PieceNum:    int32(request.StartNum),
112
					RangeStart:  uint64(0),
113
					RangeSize:   opt.pieceSize,
114
					PieceMd5:    digest.MD5FromBytes(testBytes[0:opt.pieceSize]),
115
					PieceOffset: 0,
116
					PieceStyle:  0,
117
				})
118
		}
119
		pp := &commonv1.PiecePacket{
120
			PieceMd5Sign:  digest.SHA256FromStrings(piecesMd5...),
121
			TaskId:        request.TaskId,
122
			DstPid:        "peer-x",
123
			PieceInfos:    tasks,
124
			ContentLength: opt.contentLength,
125
			TotalPiece:    pieceCount,
126
		}
127
		if err = s.Send(pp); err != nil {
128
			return err
129
		}
130
		for {
131
			_, err = s.Recv()
132
			if err == io.EOF {
133
				break
134
			}
135
			if err != nil {
136
				return err
137
			}
138
		}
139
		return nil
140
	})
141
	ln, _ := rpc.Listen(dfnet.NetAddr{
142
		Type: "tcp",
143
		Addr: fmt.Sprintf("0.0.0.0:%d", port),
144
	})
145
	go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) {
146
		hs := health.NewServer()
147
		if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
148
			log.Fatal(err)
149
		}
150
	}(daemon, ln)
151
	time.Sleep(100 * time.Millisecond)
152

153
	// 2. setup a scheduler
154
	pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
155
	var (
156
		wg             = sync.WaitGroup{}
157
		backSourceSent = atomic.Bool{}
158
	)
159
	wg.Add(1)
160

161
	pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
162
		func(pr *schedulerv1.PieceResult) error {
163
			if pr.PieceInfo.PieceNum == 0 && pr.Success {
164
				if !backSourceSent.Load() {
165
					wg.Done()
166
					backSourceSent.Store(true)
167
				}
168
			}
169
			return nil
170
		})
171
	var (
172
		delayCount      int
173
		schedPeerPacket bool
174
	)
175
	pps.EXPECT().Recv().AnyTimes().DoAndReturn(
176
		func() (*schedulerv1.PeerPacket, error) {
177
			if len(opt.peerPacketDelay) > delayCount {
178
				if delay := opt.peerPacketDelay[delayCount]; delay > 0 {
179
					time.Sleep(delay)
180
				}
181
				delayCount++
182
			}
183
			if schedPeerPacket {
184
				// send back source after piece 0 is done
185
				wg.Wait()
186
				return nil, dferrors.New(commonv1.Code_SchedNeedBackSource, "")
187
			}
188
			schedPeerPacket = true
189
			return &schedulerv1.PeerPacket{
190
				Code:   commonv1.Code_Success,
191
				TaskId: opt.taskID,
192
				SrcPid: "127.0.0.1",
193
				MainPeer: &schedulerv1.PeerPacket_DestPeer{
194
					Ip:      "127.0.0.1",
195
					RpcPort: port,
196
					PeerId:  "peer-x",
197
				},
198
				CandidatePeers: nil,
199
			}, nil
200
		})
201
	pps.EXPECT().CloseSend().AnyTimes()
202
	sched := clientmocks.NewMockV1(ctrl)
203
	sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
204
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {
205
			return &schedulerv1.RegisterResult{
206
				TaskId:      opt.taskID,
207
				SizeScope:   commonv1.SizeScope_NORMAL,
208
				DirectPiece: nil,
209
			}, nil
210
		})
211
	sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
212
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_ReportPieceResultClient, error) {
213
			return pps, nil
214
		})
215
	sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
216
		func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {
217
			return nil
218
		})
219
	tempDir, _ := os.MkdirTemp("", "d7y-test-*")
220
	storageManager, _ := storage.NewStorageManager(
221
		config.SimpleLocalTaskStoreStrategy,
222
		&config.StorageOption{
223
			DataPath: tempDir,
224
			TaskExpireTime: util.Duration{
225
				Duration: -1 * time.Second,
226
			},
227
		}, func(request storage.CommonTaskRequest) {}, os.FileMode(0700))
228
	return sched, storageManager
229
}
230

231
// TestStreamPeerTask_BackSource_Partial_WithContentLength tests that get piece from other peers first, then scheduler says back source
232
func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) {
233
	assert := testifyassert.New(t)
234
	ctrl := gomock.NewController(t)
235

236
	testBytes, err := os.ReadFile(test.File)
237
	assert.Nil(err, "load test file")
238

239
	var (
240
		pieceParallelCount = int32(4)
241
		pieceSize          = 1024
242

243
		mockContentLength = len(testBytes)
244
		//mockPieceCount    = int(math.Ceil(float64(mockContentLength) / float64(pieceSize)))
245

246
		peerID = "peer-back-source-partial-0"
247
		taskID = "task-back-source-partial-0"
248

249
		url = "http://localhost/test/data"
250
	)
251
	schedulerClient, storageManager := setupBackSourcePartialComponents(
252
		ctrl, testBytes,
253
		componentsOption{
254
			taskID:             taskID,
255
			contentLength:      int64(mockContentLength),
256
			pieceSize:          uint32(pieceSize),
257
			pieceParallelCount: pieceParallelCount,
258
			content:            testBytes,
259
		})
260
	defer storageManager.CleanUp()
261

262
	downloader := NewMockPieceDownloader(ctrl)
263
	downloader.EXPECT().DownloadPiece(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
264
		func(ctx context.Context, task *DownloadPieceRequest) (io.Reader, io.Closer, error) {
265
			rc := io.NopCloser(
266
				bytes.NewBuffer(
267
					testBytes[task.piece.RangeStart : task.piece.RangeStart+uint64(task.piece.RangeSize)],
268
				))
269
			return rc, rc, nil
270
		})
271

272
	sourceClient := sourcemocks.NewMockResourceClient(ctrl)
273
	source.UnRegister("http")
274
	require.Nil(t, source.Register("http", sourceClient, httpprotocol.Adapter))
275
	defer source.UnRegister("http")
276
	sourceClient.EXPECT().Download(gomock.Any()).DoAndReturn(
277
		func(request *source.Request) (*source.Response, error) {
278
			response := source.NewResponse(io.NopCloser(bytes.NewBuffer(testBytes)))
279
			response.ContentLength = int64(len(testBytes))
280
			return response, nil
281
		})
282

283
	pm := &pieceManager{
284
		calculateDigest: true,
285
		pieceDownloader: downloader,
286
		computePieceSize: func(contentLength int64) uint32 {
287
			return uint32(pieceSize)
288
		},
289
	}
290
	ptm := &peerTaskManager{
291
		conductorLock:    &sync.Mutex{},
292
		runningPeerTasks: sync.Map{},
293
		trafficShaper:    NewTrafficShaper("plain", 0, nil),
294
		TaskManagerOption: TaskManagerOption{
295
			SchedulerClient: schedulerClient,
296
			TaskOption: TaskOption{
297
				CalculateDigest: true,
298
				PeerHost: &schedulerv1.PeerHost{
299
					Ip: "127.0.0.1",
300
				},
301
				PieceManager:   pm,
302
				StorageManager: storageManager,
303
				SchedulerOption: config.SchedulerOption{
304
					ScheduleTimeout: util.Duration{Duration: 10 * time.Minute},
305
				},
306
				GRPCDialTimeout: time.Second,
307
				GRPCCredentials: insecure.NewCredentials(),
308
			},
309
		},
310
	}
311
	req := &schedulerv1.PeerTaskRequest{
312
		Url: url,
313
		UrlMeta: &commonv1.UrlMeta{
314
			Tag: "d7y-test",
315
		},
316
		PeerId:   peerID,
317
		PeerHost: &schedulerv1.PeerHost{},
318
	}
319
	ctx := context.Background()
320
	pt, err := ptm.newStreamTask(ctx, taskID, req, nil)
321
	assert.Nil(err, "new stream peer task")
322

323
	rc, _, err := pt.Start(ctx)
324
	assert.Nil(err, "start stream peer task")
325

326
	outputBytes, err := io.ReadAll(rc)
327
	assert.Nil(err, "load read data")
328
	assert.Equal(testBytes, outputBytes, "output and desired output must match")
329
}
330

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.