Dragonfly2

Форк
0
/
peertask_manager_test.go 
958 строк · 29.8 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"bytes"
21
	"context"
22
	"fmt"
23
	"io"
24
	"math"
25
	"net/http"
26
	"net/http/httptest"
27
	"os"
28
	"runtime"
29
	"strings"
30
	"sync"
31
	"testing"
32
	"time"
33

34
	"github.com/go-http-utils/headers"
35
	"github.com/phayes/freeport"
36
	testifyassert "github.com/stretchr/testify/assert"
37
	testifyrequire "github.com/stretchr/testify/require"
38
	"go.uber.org/mock/gomock"
39
	"go.uber.org/zap/zapcore"
40
	"golang.org/x/time/rate"
41
	"google.golang.org/grpc"
42
	"google.golang.org/grpc/codes"
43
	"google.golang.org/grpc/credentials/insecure"
44
	"google.golang.org/grpc/health"
45
	"google.golang.org/grpc/status"
46

47
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
48
	dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
49
	dfdaemonv1mocks "d7y.io/api/v2/pkg/apis/dfdaemon/v1/mocks"
50
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
51
	schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
52

53
	"d7y.io/dragonfly/v2/client/config"
54
	"d7y.io/dragonfly/v2/client/daemon/storage"
55
	"d7y.io/dragonfly/v2/client/daemon/test"
56
	"d7y.io/dragonfly/v2/client/util"
57
	"d7y.io/dragonfly/v2/internal/dferrors"
58
	logger "d7y.io/dragonfly/v2/internal/dflog"
59
	"d7y.io/dragonfly/v2/pkg/dfnet"
60
	"d7y.io/dragonfly/v2/pkg/digest"
61
	"d7y.io/dragonfly/v2/pkg/idgen"
62
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
63
	"d7y.io/dragonfly/v2/pkg/rpc"
64
	daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server"
65
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
66
	schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
67
	"d7y.io/dragonfly/v2/pkg/source"
68
	"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
69
	sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"
70
)
71

72
func TestMain(m *testing.M) {
73
	logger.SetLevel(zapcore.DebugLevel)
74
	os.Exit(m.Run())
75
}
76

77
type componentsOption struct {
78
	taskID             string
79
	contentLength      int64
80
	pieceSize          uint32
81
	pieceParallelCount int32
82
	pieceDownloader    PieceDownloader
83
	sourceClient       source.ResourceClient
84
	peerPacketDelay    []time.Duration
85
	backSource         bool
86
	scope              commonv1.SizeScope
87
	content            []byte
88
	reregister         bool
89
}
90

91
func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) (
92
	schedulerclient.V1, storage.Manager) {
93
	port := int32(freeport.GetPort())
94
	// 1. set up a mock daemon server for uploading pieces info
95
	var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl)
96

97
	// 1.1 calculate piece digest and total digest
98
	r := bytes.NewBuffer(opt.content)
99
	var pieces = make([]string, int(math.Ceil(float64(len(opt.content))/float64(opt.pieceSize))))
100
	for i := range pieces {
101
		pieces[i] = digest.MD5FromReader(io.LimitReader(r, int64(opt.pieceSize)))
102
	}
103
	totalDigests := digest.SHA256FromStrings(pieces...)
104
	genPiecePacket := func(request *commonv1.PieceTaskRequest) *commonv1.PiecePacket {
105
		var tasks []*commonv1.PieceInfo
106
		for i := uint32(0); i < request.Limit; i++ {
107
			start := opt.pieceSize * (request.StartNum + i)
108
			if int64(start)+1 > opt.contentLength {
109
				break
110
			}
111
			size := opt.pieceSize
112
			if int64(start+opt.pieceSize) > opt.contentLength {
113
				size = uint32(opt.contentLength) - start
114
			}
115
			tasks = append(tasks,
116
				&commonv1.PieceInfo{
117
					PieceNum:    int32(request.StartNum + i),
118
					RangeStart:  uint64(start),
119
					RangeSize:   size,
120
					PieceMd5:    pieces[request.StartNum+i],
121
					PieceOffset: 0,
122
					PieceStyle:  0,
123
				})
124
		}
125
		return &commonv1.PiecePacket{
126
			TaskId:        request.TaskId,
127
			DstPid:        "peer-x",
128
			PieceInfos:    tasks,
129
			ContentLength: opt.contentLength,
130
			TotalPiece:    int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))),
131
			PieceMd5Sign:  totalDigests,
132
		}
133
	}
134
	daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().
135
		DoAndReturn(func(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
136
			return nil, status.Error(codes.Unimplemented, "TODO")
137
		})
138
	daemon.EXPECT().SyncPieceTasks(gomock.Any()).AnyTimes().DoAndReturn(func(s dfdaemonv1.Daemon_SyncPieceTasksServer) error {
139
		request, err := s.Recv()
140
		if err != nil {
141
			return err
142
		}
143
		if err = s.Send(genPiecePacket(request)); err != nil {
144
			return err
145
		}
146
		for {
147
			request, err = s.Recv()
148
			if err == io.EOF {
149
				break
150
			}
151
			if err != nil {
152
				return err
153
			}
154
			if err = s.Send(genPiecePacket(request)); err != nil {
155
				return err
156
			}
157
		}
158
		return nil
159
	})
160
	ln, _ := rpc.Listen(dfnet.NetAddr{
161
		Type: "tcp",
162
		Addr: fmt.Sprintf("0.0.0.0:%d", port),
163
	})
164

165
	go func() {
166
		hs := health.NewServer()
167
		if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
168
			panic(err)
169
		}
170
	}()
171

172
	time.Sleep(100 * time.Millisecond)
173

174
	// 2. setup a scheduler
175
	pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
176
	pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
177
		func(pr *schedulerv1.PieceResult) error {
178
			return nil
179
		})
180
	var (
181
		delayCount int
182
		sent       = make(chan struct{}, 1)
183
	)
184
	sent <- struct{}{}
185
	var reregistered bool
186
	pps.EXPECT().Recv().AnyTimes().DoAndReturn(
187
		func() (*schedulerv1.PeerPacket, error) {
188
			if opt.reregister && !reregistered {
189
				reregistered = true
190
				return nil, dferrors.New(commonv1.Code_SchedReregister, "reregister")
191
			}
192
			if len(opt.peerPacketDelay) > delayCount {
193
				if delay := opt.peerPacketDelay[delayCount]; delay > 0 {
194
					time.Sleep(delay)
195
				}
196
				delayCount++
197
			}
198
			<-sent
199
			if opt.backSource {
200
				return nil, dferrors.Newf(commonv1.Code_SchedNeedBackSource, "fake back source error")
201
			}
202
			return &schedulerv1.PeerPacket{
203
				Code:   commonv1.Code_Success,
204
				TaskId: opt.taskID,
205
				SrcPid: "127.0.0.1",
206
				MainPeer: &schedulerv1.PeerPacket_DestPeer{
207
					Ip:      "127.0.0.1",
208
					RpcPort: port,
209
					PeerId:  "peer-x",
210
				},
211
				CandidatePeers: nil,
212
			}, nil
213
		})
214
	pps.EXPECT().CloseSend().AnyTimes()
215

216
	sched := schedulerclientmocks.NewMockV1(ctrl)
217
	sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
218
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {
219
			switch opt.scope {
220
			case commonv1.SizeScope_TINY:
221
				return &schedulerv1.RegisterResult{
222
					TaskId:    opt.taskID,
223
					SizeScope: commonv1.SizeScope_TINY,
224
					DirectPiece: &schedulerv1.RegisterResult_PieceContent{
225
						PieceContent: opt.content,
226
					},
227
				}, nil
228
			case commonv1.SizeScope_SMALL:
229
				return &schedulerv1.RegisterResult{
230
					TaskId:    opt.taskID,
231
					SizeScope: commonv1.SizeScope_SMALL,
232
					DirectPiece: &schedulerv1.RegisterResult_SinglePiece{
233
						SinglePiece: &schedulerv1.SinglePiece{
234
							DstPid:  "fake-pid",
235
							DstAddr: "fake-addr",
236
							PieceInfo: &commonv1.PieceInfo{
237
								PieceNum:    0,
238
								RangeStart:  0,
239
								RangeSize:   uint32(opt.contentLength),
240
								PieceMd5:    pieces[0],
241
								PieceOffset: 0,
242
								PieceStyle:  0,
243
							},
244
						},
245
					},
246
				}, nil
247
			}
248
			return &schedulerv1.RegisterResult{
249
				TaskId:      opt.taskID,
250
				SizeScope:   commonv1.SizeScope_NORMAL,
251
				DirectPiece: nil,
252
			}, nil
253
		})
254
	sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
255
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (
256
			schedulerv1.Scheduler_ReportPieceResultClient, error) {
257
			return pps, nil
258
		})
259
	sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
260
		func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {
261
			return nil
262
		})
263
	tempDir, _ := os.MkdirTemp("", "d7y-test-*")
264
	storageManager, _ := storage.NewStorageManager(
265
		config.SimpleLocalTaskStoreStrategy,
266
		&config.StorageOption{
267
			DataPath: tempDir,
268
			TaskExpireTime: util.Duration{
269
				Duration: -1 * time.Second,
270
			},
271
		}, func(request storage.CommonTaskRequest) {}, os.FileMode(0700))
272
	return sched, storageManager
273
}
274

275
type mockManager struct {
276
	testSpec        *testSpec
277
	peerTaskManager *peerTaskManager
278
	schedulerClient schedulerclient.V1
279
	storageManager  storage.Manager
280
}
281

282
func (m *mockManager) CleanUp() {
283
	m.storageManager.CleanUp()
284
	for _, f := range m.testSpec.cleanUp {
285
		f()
286
	}
287
}
288

289
func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOption) *mockManager {
290
	schedulerClient, storageManager := setupPeerTaskManagerComponents(ctrl, opt)
291
	scheduleTimeout := util.Duration{Duration: 10 * time.Minute}
292
	if ts.scheduleTimeout > 0 {
293
		scheduleTimeout = util.Duration{Duration: ts.scheduleTimeout}
294
	}
295
	ptm := &peerTaskManager{
296
		conductorLock:    &sync.Mutex{},
297
		runningPeerTasks: sync.Map{},
298
		trafficShaper:    NewTrafficShaper("plain", 0, nil),
299
		TaskManagerOption: TaskManagerOption{
300
			SchedulerClient: schedulerClient,
301
			TaskOption: TaskOption{
302
				CalculateDigest: true,
303
				PeerHost: &schedulerv1.PeerHost{
304
					Ip: "127.0.0.1",
305
				},
306
				PieceManager: &pieceManager{
307
					calculateDigest: true,
308
					pieceDownloader: opt.pieceDownloader,
309
					computePieceSize: func(contentLength int64) uint32 {
310
						return opt.pieceSize
311
					},
312
				},
313
				StorageManager: storageManager,
314
				SchedulerOption: config.SchedulerOption{
315
					ScheduleTimeout: scheduleTimeout,
316
				},
317
				GRPCDialTimeout: time.Second,
318
				GRPCCredentials: insecure.NewCredentials(),
319
			},
320
		},
321
	}
322
	return &mockManager{
323
		testSpec:        ts,
324
		peerTaskManager: ptm,
325
		schedulerClient: schedulerClient,
326
		storageManager:  storageManager,
327
	}
328
}
329

330
const (
331
	taskTypeFile = iota
332
	taskTypeStream
333
	taskTypeConductor
334
	taskTypeSeed
335
)
336

337
type testSpec struct {
338
	runTaskTypes       []int
339
	taskType           int
340
	name               string
341
	taskData           []byte
342
	httpRange          *nethttp.Range // only used in back source cases
343
	pieceParallelCount int32
344
	pieceSize          int
345
	sizeScope          commonv1.SizeScope
346
	peerID             string
347
	url                string
348
	reregister         bool
349
	// when urlGenerator is not nil, use urlGenerator instead url
350
	// it's useful for httptest server
351
	urlGenerator func(ts *testSpec) string
352

353
	// mock schedule timeout
354
	peerPacketDelay []time.Duration
355
	scheduleTimeout time.Duration
356
	backSource      bool
357

358
	mockPieceDownloader  func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader
359
	mockHTTPSourceClient func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient
360

361
	cleanUp []func()
362
}
363

364
func TestPeerTaskManager_TaskSuite(t *testing.T) {
365
	assert := testifyassert.New(t)
366
	require := testifyrequire.New(t)
367
	testBytes, err := os.ReadFile(test.File)
368
	require.Nil(err, "load test file")
369

370
	commonPieceDownloader := func(ctrl *gomock.Controller, taskData []byte, pieceSize int) PieceDownloader {
371
		downloader := NewMockPieceDownloader(ctrl)
372
		downloader.EXPECT().DownloadPiece(gomock.Any(), gomock.Any()).Times(
373
			int(math.Ceil(float64(len(taskData)) / float64(pieceSize)))).DoAndReturn(
374
			func(ctx context.Context, task *DownloadPieceRequest) (io.Reader, io.Closer, error) {
375
				rc := io.NopCloser(
376
					bytes.NewBuffer(
377
						taskData[task.piece.RangeStart : task.piece.RangeStart+uint64(task.piece.RangeSize)],
378
					))
379
				return rc, rc, nil
380
			})
381
		return downloader
382
	}
383

384
	taskTypes := []int{taskTypeConductor, taskTypeFile, taskTypeStream} // seed task need back source client
385
	taskTypeNames := []string{"conductor", "file", "stream", "seed"}
386

387
	testCases := []testSpec{
388
		{
389
			name:                 "normal size scope - p2p",
390
			taskData:             testBytes,
391
			pieceParallelCount:   4,
392
			pieceSize:            1024,
393
			peerID:               "normal-size-peer",
394
			url:                  "http://localhost/test/data",
395
			sizeScope:            commonv1.SizeScope_NORMAL,
396
			mockPieceDownloader:  commonPieceDownloader,
397
			mockHTTPSourceClient: nil,
398
		},
399
		{
400
			name:                 "normal size scope - p2p - reregister",
401
			taskData:             testBytes,
402
			pieceParallelCount:   4,
403
			pieceSize:            1024,
404
			peerID:               "normal-size-peer",
405
			url:                  "http://localhost/test/data",
406
			sizeScope:            commonv1.SizeScope_NORMAL,
407
			mockPieceDownloader:  commonPieceDownloader,
408
			mockHTTPSourceClient: nil,
409
			reregister:           true,
410
		},
411
		{
412
			name:                 "small size scope - p2p",
413
			taskData:             testBytes,
414
			pieceParallelCount:   4,
415
			pieceSize:            16384,
416
			peerID:               "small-size-peer",
417
			url:                  "http://localhost/test/data",
418
			sizeScope:            commonv1.SizeScope_SMALL,
419
			mockPieceDownloader:  commonPieceDownloader,
420
			mockHTTPSourceClient: nil,
421
		},
422
		{
423
			name:                 "tiny size scope - p2p",
424
			taskData:             testBytes[:64],
425
			pieceParallelCount:   4,
426
			pieceSize:            1024,
427
			peerID:               "tiny-size-peer",
428
			url:                  "http://localhost/test/data",
429
			sizeScope:            commonv1.SizeScope_TINY,
430
			mockPieceDownloader:  nil,
431
			mockHTTPSourceClient: nil,
432
		},
433
		{
434
			name:                 "empty file - p2p",
435
			taskData:             []byte{},
436
			pieceParallelCount:   4,
437
			pieceSize:            1024,
438
			peerID:               "empty-file-peer",
439
			url:                  "http://localhost/test/data",
440
			sizeScope:            commonv1.SizeScope_NORMAL,
441
			mockPieceDownloader:  commonPieceDownloader,
442
			mockHTTPSourceClient: nil,
443
		},
444
		{
445
			name:                "normal size scope - back source - content length",
446
			runTaskTypes:        []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
447
			taskData:            testBytes,
448
			pieceParallelCount:  4,
449
			pieceSize:           1024,
450
			peerID:              "normal-size-peer-back-source",
451
			backSource:          true,
452
			url:                 "http://localhost/test/data",
453
			sizeScope:           commonv1.SizeScope_NORMAL,
454
			mockPieceDownloader: nil,
455
			mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
456
				sourceClient := sourcemocks.NewMockResourceClient(ctrl)
457
				sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
458
					func(request *source.Request) (int64, error) {
459
						return int64(len(taskData)), nil
460
					})
461
				sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
462
					func(request *source.Request) (*source.Response, error) {
463
						return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
464
					})
465
				return sourceClient
466
			},
467
		},
468
		{
469
			name:               "normal size scope - range - back source - content length",
470
			runTaskTypes:       []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
471
			taskData:           testBytes[0:4096],
472
			pieceParallelCount: 4,
473
			pieceSize:          1024,
474
			peerID:             "normal-size-peer-range-back-source",
475
			backSource:         true,
476
			url:                "http://localhost/test/data",
477
			sizeScope:          commonv1.SizeScope_NORMAL,
478
			httpRange: &nethttp.Range{
479
				Start:  0,
480
				Length: 4096,
481
			},
482
			mockPieceDownloader: nil,
483
			mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
484
				sourceClient := sourcemocks.NewMockResourceClient(ctrl)
485
				sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
486
					func(request *source.Request) (int64, error) {
487
						assert := testifyassert.New(t)
488
						if rg != nil {
489
							rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
490
							assert.Nil(err)
491
							assert.Equal(1, len(rgs))
492
							assert.Equal(rg.String(), rgs[0].String())
493
						}
494
						return int64(len(taskData)), nil
495
					})
496
				sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
497
					func(request *source.Request) (*source.Response, error) {
498
						assert := testifyassert.New(t)
499
						if rg != nil {
500
							rgs, err := nethttp.ParseRange(request.Header.Get(headers.Range), math.MaxInt64)
501
							assert.Nil(err)
502
							assert.Equal(1, len(rgs))
503
							assert.Equal(rg.String(), rgs[0].String())
504
						}
505
						return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
506
					})
507
				return sourceClient
508
			},
509
		},
510
		{
511
			name:                "normal size scope - back source - no content length",
512
			runTaskTypes:        []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
513
			taskData:            testBytes,
514
			pieceParallelCount:  4,
515
			pieceSize:           1024,
516
			peerID:              "normal-size-peer-back-source-no-length",
517
			backSource:          true,
518
			url:                 "http://localhost/test/data",
519
			sizeScope:           commonv1.SizeScope_NORMAL,
520
			mockPieceDownloader: nil,
521
			mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
522
				sourceClient := sourcemocks.NewMockResourceClient(ctrl)
523
				sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
524
					func(request *source.Request) (int64, error) {
525
						return -1, nil
526
					})
527
				sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
528
					func(request *source.Request) (*source.Response, error) {
529
						return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
530
					})
531
				return sourceClient
532
			},
533
		},
534
		{
535
			name:                "normal size scope - back source - no content length - aligning",
536
			runTaskTypes:        []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
537
			taskData:            testBytes[:8192],
538
			pieceParallelCount:  4,
539
			pieceSize:           1024,
540
			peerID:              "normal-size-peer-back-source-aligning-no-length",
541
			backSource:          true,
542
			url:                 "http://localhost/test/data",
543
			sizeScope:           commonv1.SizeScope_NORMAL,
544
			mockPieceDownloader: nil,
545
			mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
546
				sourceClient := sourcemocks.NewMockResourceClient(ctrl)
547
				sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
548
					func(request *source.Request) (int64, error) {
549
						return -1, nil
550
					})
551
				sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
552
					func(request *source.Request) (*source.Response, error) {
553
						return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
554
					})
555
				return sourceClient
556
			},
557
		},
558
		{
559
			name:               "normal size scope - schedule timeout - auto back source",
560
			taskData:           testBytes,
561
			pieceParallelCount: 4,
562
			pieceSize:          1024,
563
			peerID:             "normal-size-peer-schedule-timeout",
564
			peerPacketDelay:    []time.Duration{time.Second},
565
			scheduleTimeout:    time.Nanosecond,
566
			urlGenerator: func(ts *testSpec) string {
567
				server := httptest.NewServer(http.HandlerFunc(
568
					func(w http.ResponseWriter, r *http.Request) {
569
						n, err := w.Write(testBytes)
570
						assert.Nil(err)
571
						assert.Equal(len(ts.taskData), n)
572
					}))
573
				ts.cleanUp = append(ts.cleanUp, func() {
574
					server.Close()
575
				})
576
				return server.URL
577
			},
578
			sizeScope:            commonv1.SizeScope_NORMAL,
579
			mockPieceDownloader:  nil,
580
			mockHTTPSourceClient: nil,
581
		},
582
		{
583
			name:                "empty file peer - back source - content length",
584
			runTaskTypes:        []int{taskTypeConductor, taskTypeFile, taskTypeStream, taskTypeSeed},
585
			taskData:            []byte{},
586
			pieceParallelCount:  4,
587
			pieceSize:           1024,
588
			peerID:              "empty-file-peer-back-source",
589
			backSource:          true,
590
			url:                 "http://localhost/test/data",
591
			sizeScope:           commonv1.SizeScope_NORMAL,
592
			mockPieceDownloader: nil,
593
			mockHTTPSourceClient: func(t *testing.T, ctrl *gomock.Controller, rg *nethttp.Range, taskData []byte, url string) source.ResourceClient {
594
				sourceClient := sourcemocks.NewMockResourceClient(ctrl)
595
				sourceClient.EXPECT().GetContentLength(source.RequestEq(url)).AnyTimes().DoAndReturn(
596
					func(request *source.Request) (int64, error) {
597
						return int64(len(taskData)), nil
598
					})
599
				sourceClient.EXPECT().Download(source.RequestEq(url)).AnyTimes().DoAndReturn(
600
					func(request *source.Request) (*source.Response, error) {
601
						return source.NewResponse(io.NopCloser(bytes.NewBuffer(taskData))), nil
602
					})
603
				return sourceClient
604
			},
605
		},
606
	}
607

608
	for _, _tc := range testCases {
609
		t.Run(_tc.name, func(t *testing.T) {
610
			var types = _tc.runTaskTypes
611
			if _tc.runTaskTypes == nil {
612
				types = taskTypes
613
			}
614
			assert = testifyassert.New(t)
615
			require = testifyrequire.New(t)
616
			for _, typ := range types {
617
				// dup a new test case with the task type
618
				logger.Infof("-------------------- test %s - type %s, started --------------------",
619
					_tc.name, taskTypeNames[typ])
620
				tc := _tc
621
				tc.taskType = typ
622
				func() {
623
					ctrl := gomock.NewController(t)
624
					defer ctrl.Finish()
625
					mockContentLength := len(tc.taskData)
626

627
					urlMeta := &commonv1.UrlMeta{
628
						Tag: "d7y-test",
629
					}
630

631
					if tc.httpRange != nil {
632
						urlMeta.Range = strings.TrimPrefix(tc.httpRange.String(), "bytes=")
633
					}
634

635
					if tc.urlGenerator != nil {
636
						tc.url = tc.urlGenerator(&tc)
637
					}
638
					taskID := idgen.TaskIDV1(tc.url, urlMeta)
639

640
					var (
641
						downloader   PieceDownloader
642
						sourceClient source.ResourceClient
643
					)
644

645
					if tc.mockPieceDownloader != nil {
646
						downloader = tc.mockPieceDownloader(ctrl, tc.taskData, tc.pieceSize)
647
					}
648

649
					if tc.mockHTTPSourceClient != nil {
650
						source.UnRegister("http")
651
						defer func() {
652
							// reset source client
653
							source.UnRegister("http")
654
							require.Nil(source.Register("http", httpprotocol.NewHTTPSourceClient(), httpprotocol.Adapter))
655
						}()
656
						// replace source client
657
						sourceClient = tc.mockHTTPSourceClient(t, ctrl, tc.httpRange, tc.taskData, tc.url)
658
						require.Nil(source.Register("http", sourceClient, httpprotocol.Adapter))
659
					}
660

661
					option := componentsOption{
662
						taskID:             taskID,
663
						contentLength:      int64(mockContentLength),
664
						pieceSize:          uint32(tc.pieceSize),
665
						pieceParallelCount: tc.pieceParallelCount,
666
						pieceDownloader:    downloader,
667
						sourceClient:       sourceClient,
668
						content:            tc.taskData,
669
						scope:              tc.sizeScope,
670
						peerPacketDelay:    tc.peerPacketDelay,
671
						backSource:         tc.backSource,
672
						reregister:         tc.reregister,
673
					}
674
					// keep peer task running in enough time to check "getOrCreatePeerTaskConductor" always return same
675
					if tc.taskType == taskTypeConductor {
676
						option.peerPacketDelay = []time.Duration{time.Second}
677
					}
678
					mm := setupMockManager(ctrl, &tc, option)
679
					defer mm.CleanUp()
680

681
					tc.run(assert, require, mm, urlMeta)
682
				}()
683
				logger.Infof("-------------------- test %s - type %s, finished --------------------", _tc.name, taskTypeNames[typ])
684
			}
685
		})
686
	}
687
}
688

689
func (ts *testSpec) run(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
690
	switch ts.taskType {
691
	case taskTypeFile:
692
		ts.runFileTaskTest(assert, require, mm, urlMeta)
693
	case taskTypeStream:
694
		ts.runStreamTaskTest(assert, require, mm, urlMeta)
695
	case taskTypeConductor:
696
		ts.runConductorTest(assert, require, mm, urlMeta)
697
	case taskTypeSeed:
698
		ts.runSeedTaskTest(assert, require, mm, urlMeta)
699
	default:
700
		panic("unknown test type")
701
	}
702
}
703

704
func (ts *testSpec) runFileTaskTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
705
	var output = "../test/testdata/test.output"
706
	defer func() {
707
		assert.Nil(os.Remove(output))
708
	}()
709
	progress, err := mm.peerTaskManager.StartFileTask(
710
		context.Background(),
711
		&FileTaskRequest{
712
			PeerTaskRequest: schedulerv1.PeerTaskRequest{
713
				Url:      ts.url,
714
				UrlMeta:  urlMeta,
715
				PeerId:   ts.peerID,
716
				PeerHost: &schedulerv1.PeerHost{},
717
			},
718
			Output: output,
719
		})
720
	require.Nil(err, "start file peer task")
721

722
	var p *FileTaskProgress
723
	for p = range progress {
724
		require.True(p.State.Success)
725
		if p.PeerTaskDone {
726
			p.DoneCallback()
727
			break
728
		}
729
	}
730
	require.NotNil(p)
731
	require.True(p.PeerTaskDone)
732

733
	outputBytes, err := os.ReadFile(output)
734
	require.Nil(err, "load output file")
735
	require.Equal(ts.taskData, outputBytes, "output and desired output must match")
736
}
737

738
func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
739
	r, _, err := mm.peerTaskManager.StartStreamTask(
740
		context.Background(),
741
		&StreamTaskRequest{
742
			URL:     ts.url,
743
			URLMeta: urlMeta,
744
			PeerID:  ts.peerID,
745
		})
746
	require.Nil(err, "start stream peer task")
747

748
	outputBytes, err := io.ReadAll(r)
749
	require.Nil(err, "load read data")
750
	require.Equal(ts.taskData, outputBytes, "output and desired output must match")
751
}
752

753
func (ts *testSpec) runSeedTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
754
	r, _, err := mm.peerTaskManager.StartSeedTask(
755
		context.Background(),
756
		&SeedTaskRequest{
757
			PeerTaskRequest: schedulerv1.PeerTaskRequest{
758
				Url:         ts.url,
759
				UrlMeta:     urlMeta,
760
				PeerId:      ts.peerID,
761
				PeerHost:    &schedulerv1.PeerHost{},
762
				IsMigrating: false,
763
			},
764
			Limit: 0,
765
			Range: nil,
766
		})
767

768
	require.Nil(err, "start seed peer task")
769

770
	var success bool
771

772
loop:
773
	for {
774
		select {
775
		case <-r.Context.Done():
776
			break loop
777
		case <-r.Success:
778
			success = true
779
			break loop
780
		case <-r.Fail:
781
			break loop
782
		case p := <-r.PieceInfoChannel:
783
			if p.Finished {
784
				success = true
785
				break loop
786
			}
787
		case <-time.After(5 * time.Minute):
788
			buf := make([]byte, 16384)
789
			buf = buf[:runtime.Stack(buf, true)]
790
			fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
791
		}
792
	}
793

794
	require.True(success, "seed task should success")
795
}
796

797
func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *commonv1.UrlMeta) {
798
	var (
799
		ptm       = mm.peerTaskManager
800
		pieceSize = ts.pieceSize
801
		taskID    = idgen.TaskIDV1(ts.url, urlMeta)
802
		output    = "../test/testdata/test.output"
803
	)
804
	defer func() {
805
		assert.Nil(os.Remove(output))
806
	}()
807

808
	peerTaskRequest := &schedulerv1.PeerTaskRequest{
809
		Url:      ts.url,
810
		UrlMeta:  urlMeta,
811
		PeerId:   ts.peerID,
812
		PeerHost: &schedulerv1.PeerHost{},
813
	}
814

815
	ptc, created, err := ptm.getOrCreatePeerTaskConductor(
816
		context.Background(), taskID, peerTaskRequest, rate.Limit(pieceSize*4), nil, nil, "", false)
817
	assert.Nil(err, "load first peerTaskConductor")
818
	assert.True(created, "should create a new peerTaskConductor")
819

820
	var ptcCount = 100
821
	var wg = &sync.WaitGroup{}
822
	wg.Add(ptcCount + 1)
823

824
	var result = make([]bool, ptcCount)
825

826
	go func(ptc *peerTaskConductor) {
827
		defer wg.Done()
828
		select {
829
		case <-time.After(5 * time.Minute):
830
			ptc.Fail()
831
		case <-ptc.successCh:
832
			return
833
		case <-ptc.failCh:
834
			return
835
		}
836
	}(ptc)
837

838
	syncFunc := func(i int, ptc *peerTaskConductor) {
839
		pieceCh := ptc.broker.Subscribe()
840
		defer wg.Done()
841
		for {
842
			select {
843
			case <-pieceCh:
844
			case <-ptc.successCh:
845
				result[i] = true
846
				return
847
			case <-ptc.failCh:
848
				return
849
			}
850
		}
851
	}
852

853
	for i := 0; i < ptcCount; i++ {
854
		request := &schedulerv1.PeerTaskRequest{
855
			Url:      ts.url,
856
			UrlMeta:  urlMeta,
857
			PeerId:   fmt.Sprintf("should-not-use-peer-%d", i),
858
			PeerHost: &schedulerv1.PeerHost{},
859
		}
860
		p, created, err := ptm.getOrCreatePeerTaskConductor(
861
			context.Background(), taskID, request, rate.Limit(pieceSize*3), nil, nil, "", false)
862
		assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i))
863
		assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i))
864
		assert.False(created, "should not create a new peerTaskConductor")
865
		go syncFunc(i, p)
866
	}
867

868
	require.Nil(ptc.start(), "peerTaskConductor start should be ok")
869

870
	switch ts.sizeScope {
871
	case commonv1.SizeScope_TINY:
872
		require.NotNil(ptc.tinyData)
873
	case commonv1.SizeScope_SMALL:
874
		require.NotNil(ptc.singlePiece)
875
	}
876

877
	wg.Wait()
878

879
	for i, r := range result {
880
		assert.True(r, fmt.Sprintf("task %d result should be true", i))
881
	}
882

883
	var (
884
		noRunningTask = true
885
		success       bool
886
	)
887
	select {
888
	case <-ptc.successCh:
889
		success = true
890
	case <-ptc.failCh:
891
	case <-time.After(5 * time.Minute):
892
		buf := make([]byte, 16384)
893
		buf = buf[:runtime.Stack(buf, true)]
894
		fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
895
	}
896
	assert.True(success, "task should success")
897

898
	for i := 0; i < 3; i++ {
899
		ptm.runningPeerTasks.Range(func(key, value any) bool {
900
			noRunningTask = false
901
			return false
902
		})
903
		if noRunningTask {
904
			break
905
		}
906
		noRunningTask = true
907
		time.Sleep(100 * time.Millisecond)
908
	}
909
	assert.True(noRunningTask, "no running tasks")
910

911
	// test reuse stream task
912
	rc, _, ok := ptm.tryReuseStreamPeerTask(context.Background(),
913
		taskID,
914
		&StreamTaskRequest{
915
			URL:     ts.url,
916
			URLMeta: urlMeta,
917
			PeerID:  ts.peerID,
918
		})
919

920
	assert.True(ok, "reuse stream task")
921
	assert.NotNil(rc, "reuse stream task")
922
	if rc == nil {
923
		return
924
	}
925

926
	defer func() {
927
		assert.Nil(rc.Close())
928
	}()
929

930
	data, err := io.ReadAll(rc)
931
	assert.Nil(err, "read all should be ok")
932
	assert.Equal(ts.taskData, data, "stream output and desired output must match")
933

934
	// test reuse file task
935
	progress, ok := ptm.tryReuseFilePeerTask(
936
		context.Background(),
937
		&FileTaskRequest{
938
			PeerTaskRequest: schedulerv1.PeerTaskRequest{
939
				Url:      ts.url,
940
				UrlMeta:  urlMeta,
941
				PeerId:   ts.peerID,
942
				PeerHost: &schedulerv1.PeerHost{},
943
			},
944
			Output: output,
945
		})
946

947
	assert.True(ok, "reuse file task")
948
	var p *FileTaskProgress
949
	select {
950
	case p = <-progress:
951
	default:
952
	}
953

954
	assert.NotNil(p, "progress should not be nil")
955
	outputBytes, err := os.ReadFile(output)
956
	assert.Nil(err, "load output file should be ok")
957
	assert.Equal(ts.taskData, outputBytes, "file output and desired output must match")
958
}
959

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.