Dragonfly2
935 строк · 30.4 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package resource
18
19import (
20"errors"
21"fmt"
22"net"
23"net/http"
24"net/http/httptest"
25"net/url"
26"strconv"
27"testing"
28"time"
29
30"github.com/go-http-utils/headers"
31"github.com/stretchr/testify/assert"
32"go.uber.org/mock/gomock"
33
34commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
35managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
36schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
37v1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
38schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
39v2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
40
41"d7y.io/dragonfly/v2/pkg/idgen"
42nethttp "d7y.io/dragonfly/v2/pkg/net/http"
43configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
44)
45
46var (
47mockPeerID = idgen.PeerIDV1("127.0.0.1")
48mockSeedPeerID = idgen.SeedPeerIDV1("127.0.0.1")
49)
50
51func TestPeer_NewPeer(t *testing.T) {
52ctl := gomock.NewController(t)
53defer ctl.Finish()
54stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
55
56tests := []struct {
57name string
58id string
59options []PeerOption
60expect func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host)
61}{
62{
63name: "new peer",
64id: mockPeerID,
65options: []PeerOption{},
66expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
67assert := assert.New(t)
68assert.Equal(peer.ID, mockPeerID)
69assert.Nil(peer.Range)
70assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
71assert.Empty(peer.Pieces)
72assert.Empty(peer.FinishedPieces)
73assert.Equal(len(peer.PieceCosts()), 0)
74assert.Empty(peer.ReportPieceResultStream)
75assert.Empty(peer.AnnouncePeerStream)
76assert.Equal(peer.FSM.Current(), PeerStatePending)
77assert.EqualValues(peer.Task, mockTask)
78assert.EqualValues(peer.Host, mockHost)
79assert.Equal(peer.BlockParents.Len(), uint(0))
80assert.Equal(peer.NeedBackToSource.Load(), false)
81assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
82assert.NotEqual(peer.CreatedAt.Load(), 0)
83assert.NotEqual(peer.UpdatedAt.Load(), 0)
84assert.NotNil(peer.Log)
85},
86},
87{
88name: "new peer with priority",
89id: mockPeerID,
90options: []PeerOption{WithPriority(commonv2.Priority_LEVEL4)},
91expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
92assert := assert.New(t)
93assert.Equal(peer.ID, mockPeerID)
94assert.Nil(peer.Range)
95assert.Equal(peer.Priority, commonv2.Priority_LEVEL4)
96assert.Empty(peer.Pieces)
97assert.Empty(peer.FinishedPieces)
98assert.Equal(len(peer.PieceCosts()), 0)
99assert.Empty(peer.ReportPieceResultStream)
100assert.Empty(peer.AnnouncePeerStream)
101assert.Equal(peer.FSM.Current(), PeerStatePending)
102assert.EqualValues(peer.Task, mockTask)
103assert.EqualValues(peer.Host, mockHost)
104assert.Equal(peer.BlockParents.Len(), uint(0))
105assert.Equal(peer.NeedBackToSource.Load(), false)
106assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
107assert.NotEqual(peer.CreatedAt.Load(), 0)
108assert.NotEqual(peer.UpdatedAt.Load(), 0)
109assert.NotNil(peer.Log)
110},
111},
112{
113name: "new peer with range",
114id: mockPeerID,
115options: []PeerOption{WithRange(nethttp.Range{
116Start: 1,
117Length: 10,
118})},
119expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
120assert := assert.New(t)
121assert.Equal(peer.ID, mockPeerID)
122assert.EqualValues(peer.Range, &nethttp.Range{Start: 1, Length: 10})
123assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
124assert.Empty(peer.Pieces)
125assert.Empty(peer.FinishedPieces)
126assert.Equal(len(peer.PieceCosts()), 0)
127assert.Empty(peer.ReportPieceResultStream)
128assert.Empty(peer.AnnouncePeerStream)
129assert.Equal(peer.FSM.Current(), PeerStatePending)
130assert.EqualValues(peer.Task, mockTask)
131assert.EqualValues(peer.Host, mockHost)
132assert.Equal(peer.BlockParents.Len(), uint(0))
133assert.Equal(peer.NeedBackToSource.Load(), false)
134assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
135assert.NotEqual(peer.CreatedAt.Load(), 0)
136assert.NotEqual(peer.UpdatedAt.Load(), 0)
137assert.NotNil(peer.Log)
138},
139},
140{
141name: "new peer with AnnouncePeerStream",
142id: mockPeerID,
143options: []PeerOption{WithAnnouncePeerStream(stream)},
144expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
145assert := assert.New(t)
146assert.Equal(peer.ID, mockPeerID)
147assert.Nil(peer.Range)
148assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
149assert.Empty(peer.Pieces)
150assert.Empty(peer.FinishedPieces)
151assert.Equal(len(peer.PieceCosts()), 0)
152assert.Empty(peer.ReportPieceResultStream)
153assert.NotEmpty(peer.AnnouncePeerStream)
154assert.Equal(peer.FSM.Current(), PeerStatePending)
155assert.EqualValues(peer.Task, mockTask)
156assert.EqualValues(peer.Host, mockHost)
157assert.Equal(peer.BlockParents.Len(), uint(0))
158assert.Equal(peer.NeedBackToSource.Load(), false)
159assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
160assert.NotEqual(peer.CreatedAt.Load(), 0)
161assert.NotEqual(peer.UpdatedAt.Load(), 0)
162assert.NotNil(peer.Log)
163},
164},
165}
166
167for _, tc := range tests {
168t.Run(tc.name, func(t *testing.T) {
169mockHost := NewHost(
170mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
171mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
172mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
173tc.expect(t, NewPeer(tc.id, mockResourceConfig, mockTask, mockHost, tc.options...), mockTask, mockHost)
174})
175}
176}
177
178func TestPeer_AppendPieceCost(t *testing.T) {
179tests := []struct {
180name string
181expect func(t *testing.T, peer *Peer)
182}{
183{
184name: "append piece cost",
185expect: func(t *testing.T, peer *Peer) {
186assert := assert.New(t)
187peer.AppendPieceCost(time.Duration(1))
188costs := peer.PieceCosts()
189assert.Equal(costs[0], time.Duration(1))
190},
191},
192{
193name: "piece costs slice is empty",
194expect: func(t *testing.T, peer *Peer) {
195assert := assert.New(t)
196costs := peer.PieceCosts()
197assert.Equal(len(costs), 0)
198},
199},
200}
201
202for _, tc := range tests {
203t.Run(tc.name, func(t *testing.T) {
204mockHost := NewHost(
205mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
206mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
207mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
208peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
209
210tc.expect(t, peer)
211})
212}
213}
214
215func TestPeer_PieceCosts(t *testing.T) {
216tests := []struct {
217name string
218expect func(t *testing.T, peer *Peer)
219}{
220{
221name: "piece costs slice is not empty",
222expect: func(t *testing.T, peer *Peer) {
223assert := assert.New(t)
224peer.AppendPieceCost(time.Duration(1))
225costs := peer.PieceCosts()
226assert.Equal(costs[0], time.Duration(1))
227},
228},
229{
230name: "piece costs slice is empty",
231expect: func(t *testing.T, peer *Peer) {
232assert := assert.New(t)
233costs := peer.PieceCosts()
234assert.Equal(len(costs), 0)
235},
236},
237}
238
239for _, tc := range tests {
240t.Run(tc.name, func(t *testing.T) {
241mockHost := NewHost(
242mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
243mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
244mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
245peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
246
247tc.expect(t, peer)
248})
249}
250}
251
252func TestPeer_LoadReportPieceResultStream(t *testing.T) {
253tests := []struct {
254name string
255expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
256}{
257{
258name: "load stream",
259expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
260assert := assert.New(t)
261peer.StoreReportPieceResultStream(stream)
262newStream, loaded := peer.LoadReportPieceResultStream()
263assert.Equal(loaded, true)
264assert.EqualValues(newStream, stream)
265},
266},
267{
268name: "stream does not exist",
269expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
270assert := assert.New(t)
271_, loaded := peer.LoadReportPieceResultStream()
272assert.Equal(loaded, false)
273},
274},
275}
276
277for _, tc := range tests {
278t.Run(tc.name, func(t *testing.T) {
279ctl := gomock.NewController(t)
280defer ctl.Finish()
281stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
282
283mockHost := NewHost(
284mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
285mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
286mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
287peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
288tc.expect(t, peer, stream)
289})
290}
291}
292
293func TestPeer_StoreReportPieceResultStream(t *testing.T) {
294tests := []struct {
295name string
296expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
297}{
298{
299name: "store stream",
300expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
301assert := assert.New(t)
302peer.StoreReportPieceResultStream(stream)
303newStream, loaded := peer.LoadReportPieceResultStream()
304assert.Equal(loaded, true)
305assert.EqualValues(newStream, stream)
306},
307},
308}
309
310for _, tc := range tests {
311t.Run(tc.name, func(t *testing.T) {
312ctl := gomock.NewController(t)
313defer ctl.Finish()
314stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
315
316mockHost := NewHost(
317mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
318mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
319mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
320peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
321tc.expect(t, peer, stream)
322})
323}
324}
325
326func TestPeer_DeleteReportPieceResultStream(t *testing.T) {
327tests := []struct {
328name string
329expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
330}{
331{
332name: "delete stream",
333expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
334assert := assert.New(t)
335peer.StoreReportPieceResultStream(stream)
336peer.DeleteReportPieceResultStream()
337_, loaded := peer.LoadReportPieceResultStream()
338assert.Equal(loaded, false)
339},
340},
341}
342
343for _, tc := range tests {
344t.Run(tc.name, func(t *testing.T) {
345ctl := gomock.NewController(t)
346defer ctl.Finish()
347stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
348
349mockHost := NewHost(
350mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
351mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
352mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
353peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
354tc.expect(t, peer, stream)
355})
356}
357}
358
359func TestPeer_LoadAnnouncePeerStream(t *testing.T) {
360tests := []struct {
361name string
362expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
363}{
364{
365name: "load stream",
366expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
367assert := assert.New(t)
368peer.StoreAnnouncePeerStream(stream)
369newStream, loaded := peer.LoadAnnouncePeerStream()
370assert.Equal(loaded, true)
371assert.EqualValues(newStream, stream)
372},
373},
374{
375name: "stream does not exist",
376expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
377assert := assert.New(t)
378_, loaded := peer.LoadAnnouncePeerStream()
379assert.Equal(loaded, false)
380},
381},
382}
383
384for _, tc := range tests {
385t.Run(tc.name, func(t *testing.T) {
386ctl := gomock.NewController(t)
387defer ctl.Finish()
388stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
389
390mockHost := NewHost(
391mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
392mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
393mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
394peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
395tc.expect(t, peer, stream)
396})
397}
398}
399
400func TestPeer_StoreAnnouncePeerStream(t *testing.T) {
401tests := []struct {
402name string
403expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
404}{
405{
406name: "store stream",
407expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
408assert := assert.New(t)
409peer.StoreAnnouncePeerStream(stream)
410newStream, loaded := peer.LoadAnnouncePeerStream()
411assert.Equal(loaded, true)
412assert.EqualValues(newStream, stream)
413},
414},
415}
416
417for _, tc := range tests {
418t.Run(tc.name, func(t *testing.T) {
419ctl := gomock.NewController(t)
420defer ctl.Finish()
421stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
422
423mockHost := NewHost(
424mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
425mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
426mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
427peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
428tc.expect(t, peer, stream)
429})
430}
431}
432
433func TestPeer_DeleteAnnouncePeerStream(t *testing.T) {
434tests := []struct {
435name string
436expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
437}{
438{
439name: "delete stream",
440expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
441assert := assert.New(t)
442peer.StoreAnnouncePeerStream(stream)
443peer.DeleteAnnouncePeerStream()
444_, loaded := peer.LoadAnnouncePeerStream()
445assert.Equal(loaded, false)
446},
447},
448}
449
450for _, tc := range tests {
451t.Run(tc.name, func(t *testing.T) {
452ctl := gomock.NewController(t)
453defer ctl.Finish()
454stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
455
456mockHost := NewHost(
457mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
458mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
459mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
460peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
461tc.expect(t, peer, stream)
462})
463}
464}
465
466func TestPeer_LoadPiece(t *testing.T) {
467tests := []struct {
468name string
469piece *Piece
470pieceNumber int32
471expect func(t *testing.T, piece *Piece, loaded bool)
472}{
473{
474name: "load piece",
475piece: mockPiece,
476pieceNumber: mockPiece.Number,
477expect: func(t *testing.T, piece *Piece, loaded bool) {
478assert := assert.New(t)
479assert.Equal(loaded, true)
480assert.Equal(piece.Number, mockPiece.Number)
481assert.Equal(piece.ParentID, mockPiece.ParentID)
482assert.Equal(piece.Offset, mockPiece.Offset)
483assert.Equal(piece.Length, mockPiece.Length)
484assert.EqualValues(piece.Digest, mockPiece.Digest)
485assert.Equal(piece.TrafficType, mockPiece.TrafficType)
486assert.Equal(piece.Cost, mockPiece.Cost)
487assert.Equal(piece.CreatedAt, mockPiece.CreatedAt)
488},
489},
490{
491name: "piece does not exist",
492piece: mockPiece,
493pieceNumber: 2,
494expect: func(t *testing.T, piece *Piece, loaded bool) {
495assert := assert.New(t)
496assert.Equal(loaded, false)
497},
498},
499{
500name: "load key is zero",
501piece: mockPiece,
502pieceNumber: 0,
503expect: func(t *testing.T, piece *Piece, loaded bool) {
504assert := assert.New(t)
505assert.Equal(loaded, false)
506},
507},
508}
509
510for _, tc := range tests {
511t.Run(tc.name, func(t *testing.T) {
512mockHost := NewHost(
513mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
514mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
515mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
516peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
517
518peer.StorePiece(tc.piece)
519piece, loaded := peer.LoadPiece(tc.pieceNumber)
520tc.expect(t, piece, loaded)
521})
522}
523}
524
525func TestPeer_StorePiece(t *testing.T) {
526tests := []struct {
527name string
528piece *Piece
529pieceNumber int32
530expect func(t *testing.T, piece *Piece, loaded bool)
531}{
532{
533name: "store piece",
534piece: mockPiece,
535pieceNumber: mockPiece.Number,
536expect: func(t *testing.T, piece *Piece, loaded bool) {
537assert := assert.New(t)
538assert.Equal(loaded, true)
539assert.Equal(piece.Number, mockPiece.Number)
540assert.Equal(piece.ParentID, mockPiece.ParentID)
541assert.Equal(piece.Offset, mockPiece.Offset)
542assert.Equal(piece.Length, mockPiece.Length)
543assert.EqualValues(piece.Digest, mockPiece.Digest)
544assert.Equal(piece.TrafficType, mockPiece.TrafficType)
545assert.Equal(piece.Cost, mockPiece.Cost)
546assert.Equal(piece.CreatedAt, mockPiece.CreatedAt)
547},
548},
549}
550
551for _, tc := range tests {
552t.Run(tc.name, func(t *testing.T) {
553mockHost := NewHost(
554mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
555mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
556mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
557peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
558
559peer.StorePiece(tc.piece)
560piece, loaded := peer.LoadPiece(tc.pieceNumber)
561tc.expect(t, piece, loaded)
562})
563}
564}
565
566func TestPeer_DeletePiece(t *testing.T) {
567tests := []struct {
568name string
569piece *Piece
570pieceNumber int32
571expect func(t *testing.T, peer *Peer)
572}{
573{
574name: "delete piece",
575piece: mockPiece,
576pieceNumber: mockPiece.Number,
577expect: func(t *testing.T, peer *Peer) {
578assert := assert.New(t)
579_, loaded := peer.LoadPiece(mockPiece.Number)
580assert.Equal(loaded, false)
581},
582},
583{
584name: "delete key does not exist",
585piece: mockPiece,
586pieceNumber: 0,
587expect: func(t *testing.T, peer *Peer) {
588assert := assert.New(t)
589piece, loaded := peer.LoadPiece(mockPiece.Number)
590assert.Equal(loaded, true)
591assert.Equal(piece.Number, mockPiece.Number)
592},
593},
594}
595
596for _, tc := range tests {
597t.Run(tc.name, func(t *testing.T) {
598mockHost := NewHost(
599mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
600mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
601mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
602peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
603
604peer.StorePiece(tc.piece)
605peer.DeletePiece(tc.pieceNumber)
606tc.expect(t, peer)
607})
608}
609}
610
611func TestPeer_Parents(t *testing.T) {
612tests := []struct {
613name string
614expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
615}{
616{
617name: "peer has no parents",
618expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
619assert := assert.New(t)
620peer.Task.StorePeer(peer)
621assert.Equal(len(peer.Parents()), 0)
622},
623},
624{
625name: "peer has parents",
626expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
627assert := assert.New(t)
628peer.Task.StorePeer(peer)
629peer.Task.StorePeer(seedPeer)
630if err := peer.Task.AddPeerEdge(seedPeer, peer); err != nil {
631t.Fatal(err)
632}
633
634assert.Equal(len(peer.Parents()), 1)
635assert.Equal(peer.Parents()[0].ID, mockSeedPeerID)
636},
637},
638}
639
640for _, tc := range tests {
641t.Run(tc.name, func(t *testing.T) {
642ctl := gomock.NewController(t)
643defer ctl.Finish()
644stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
645
646mockHost := NewHost(
647mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
648mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
649mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
650peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
651seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
652tc.expect(t, peer, seedPeer, stream)
653})
654}
655}
656
657func TestPeer_Children(t *testing.T) {
658tests := []struct {
659name string
660expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
661}{
662{
663name: "peer has no children",
664expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
665assert := assert.New(t)
666peer.Task.StorePeer(peer)
667assert.Equal(len(peer.Children()), 0)
668},
669},
670{
671name: "peer has children",
672expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
673assert := assert.New(t)
674peer.Task.StorePeer(peer)
675peer.Task.StorePeer(seedPeer)
676if err := peer.Task.AddPeerEdge(peer, seedPeer); err != nil {
677t.Fatal(err)
678}
679
680assert.Equal(len(peer.Children()), 1)
681assert.Equal(peer.Children()[0].ID, mockSeedPeerID)
682},
683},
684}
685
686for _, tc := range tests {
687t.Run(tc.name, func(t *testing.T) {
688ctl := gomock.NewController(t)
689defer ctl.Finish()
690stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
691
692mockHost := NewHost(
693mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
694mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
695mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
696peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
697seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
698tc.expect(t, peer, seedPeer, stream)
699})
700}
701}
702
703func TestPeer_DownloadTinyFile(t *testing.T) {
704testData := []byte("./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +
705"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")
706mockServer := func(t *testing.T, peer *Peer) *httptest.Server {
707return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
708assert := assert.New(t)
709assert.NotNil(peer)
710assert.Equal(r.URL.Path, fmt.Sprintf("/download/%s/%s", peer.Task.ID[:3], peer.Task.ID))
711assert.Equal(r.URL.RawQuery, fmt.Sprintf("peerId=%s", peer.ID))
712
713rgs, err := nethttp.ParseRange(r.Header.Get(headers.Range), 128)
714assert.Nil(err)
715assert.Equal(1, len(rgs))
716rg := rgs[0]
717
718w.WriteHeader(http.StatusPartialContent)
719n, err := w.Write(testData[rg.Start : rg.Start+rg.Length])
720assert.Nil(err)
721assert.Equal(int64(n), rg.Length)
722}))
723}
724
725tests := []struct {
726name string
727mockServer func(t *testing.T, peer *Peer) *httptest.Server
728expect func(t *testing.T, peer *Peer)
729}{
730{
731name: "download tiny file",
732mockServer: mockServer,
733expect: func(t *testing.T, peer *Peer) {
734assert := assert.New(t)
735peer.Task.ContentLength.Store(32)
736data, err := peer.DownloadTinyFile()
737assert.NoError(err)
738assert.Equal(testData[:32], data)
739},
740},
741{
742name: "download tiny file with range",
743mockServer: mockServer,
744expect: func(t *testing.T, peer *Peer) {
745assert := assert.New(t)
746peer.Task.ContentLength.Store(32)
747peer.Range = &nethttp.Range{
748Start: 0,
749Length: 10,
750}
751data, err := peer.DownloadTinyFile()
752assert.NoError(err)
753assert.Equal(testData[:32], data)
754},
755},
756{
757name: "download tiny file failed because of http status code",
758mockServer: func(t *testing.T, peer *Peer) *httptest.Server {
759return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
760w.WriteHeader(http.StatusNotFound)
761}))
762},
763expect: func(t *testing.T, peer *Peer) {
764assert := assert.New(t)
765peer.Task.ID = "foobar"
766_, err := peer.DownloadTinyFile()
767assert.EqualError(err, "bad response status 404 Not Found")
768},
769},
770}
771
772for _, tc := range tests {
773t.Run(tc.name, func(t *testing.T) {
774mockHost := NewHost(
775mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
776mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
777mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
778peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
779
780if tc.mockServer == nil {
781tc.mockServer = mockServer
782}
783
784s := tc.mockServer(t, peer)
785defer s.Close()
786
787url, err := url.Parse(s.URL)
788if err != nil {
789t.Fatal(err)
790}
791
792ip, rawPort, err := net.SplitHostPort(url.Host)
793if err != nil {
794t.Fatal(err)
795}
796
797port, err := strconv.ParseInt(rawPort, 10, 32)
798if err != nil {
799t.Fatal(err)
800}
801
802mockHost.IP = ip
803mockHost.DownloadPort = int32(port)
804tc.expect(t, peer)
805})
806}
807}
808
809func TestPeer_CalculatePriority(t *testing.T) {
810tests := []struct {
811name string
812mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder)
813expect func(t *testing.T, priority commonv2.Priority)
814}{
815{
816name: "peer has priority",
817mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
818priority := commonv2.Priority_LEVEL4
819peer.Priority = priority
820},
821expect: func(t *testing.T, priority commonv2.Priority) {
822assert := assert.New(t)
823assert.Equal(priority, commonv2.Priority_LEVEL4)
824},
825},
826{
827name: "get applications failed",
828mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
829md.GetApplications().Return(nil, errors.New("bas")).Times(1)
830},
831expect: func(t *testing.T, priority commonv2.Priority) {
832assert := assert.New(t)
833assert.Equal(priority, commonv2.Priority_LEVEL0)
834},
835},
836{
837name: "can not found applications",
838mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
839md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1)
840},
841expect: func(t *testing.T, priority commonv2.Priority) {
842assert := assert.New(t)
843assert.Equal(priority, commonv2.Priority_LEVEL0)
844},
845},
846{
847name: "can not found matching application",
848mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
849md.GetApplications().Return([]*managerv2.Application{
850{
851Name: "baw",
852},
853}, nil).Times(1)
854},
855expect: func(t *testing.T, priority commonv2.Priority) {
856assert := assert.New(t)
857assert.Equal(priority, commonv2.Priority_LEVEL0)
858},
859},
860{
861name: "can not found priority",
862mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
863peer.Task.Application = "bae"
864md.GetApplications().Return([]*managerv2.Application{
865{
866Name: "bae",
867},
868}, nil).Times(1)
869},
870expect: func(t *testing.T, priority commonv2.Priority) {
871assert := assert.New(t)
872assert.Equal(priority, commonv2.Priority_LEVEL0)
873},
874},
875{
876name: "match the priority of application",
877mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
878peer.Task.Application = "baz"
879md.GetApplications().Return([]*managerv2.Application{
880{
881Name: "baz",
882Priority: &managerv2.ApplicationPriority{
883Value: commonv2.Priority_LEVEL1,
884},
885},
886}, nil).Times(1)
887},
888expect: func(t *testing.T, priority commonv2.Priority) {
889assert := assert.New(t)
890assert.Equal(priority, commonv2.Priority_LEVEL1)
891},
892},
893{
894name: "match the priority of url",
895mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
896peer.Task.Application = "bak"
897peer.Task.URL = "example.com"
898md.GetApplications().Return([]*managerv2.Application{
899{
900Name: "bak",
901Priority: &managerv2.ApplicationPriority{
902Value: commonv2.Priority_LEVEL1,
903Urls: []*managerv2.URLPriority{
904{
905Regex: "am",
906Value: commonv2.Priority_LEVEL2,
907},
908},
909},
910},
911}, nil).Times(1)
912},
913expect: func(t *testing.T, priority commonv2.Priority) {
914assert := assert.New(t)
915assert.Equal(priority, commonv2.Priority_LEVEL2)
916},
917},
918}
919
920for _, tc := range tests {
921t.Run(tc.name, func(t *testing.T) {
922ctl := gomock.NewController(t)
923defer ctl.Finish()
924dynconfig := configmocks.NewMockDynconfigInterface(ctl)
925
926mockHost := NewHost(
927mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
928mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
929mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
930peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
931tc.mock(peer, dynconfig.EXPECT())
932tc.expect(t, peer.CalculatePriority(dynconfig))
933})
934}
935}
936