Dragonfly2

Форк
0
935 строк · 30.4 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package resource
18

19
import (
20
	"errors"
21
	"fmt"
22
	"net"
23
	"net/http"
24
	"net/http/httptest"
25
	"net/url"
26
	"strconv"
27
	"testing"
28
	"time"
29

30
	"github.com/go-http-utils/headers"
31
	"github.com/stretchr/testify/assert"
32
	"go.uber.org/mock/gomock"
33

34
	commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
35
	managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
36
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
37
	v1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
38
	schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
39
	v2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
40

41
	"d7y.io/dragonfly/v2/pkg/idgen"
42
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
43
	configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
44
)
45

46
var (
47
	mockPeerID     = idgen.PeerIDV1("127.0.0.1")
48
	mockSeedPeerID = idgen.SeedPeerIDV1("127.0.0.1")
49
)
50

51
func TestPeer_NewPeer(t *testing.T) {
52
	ctl := gomock.NewController(t)
53
	defer ctl.Finish()
54
	stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
55

56
	tests := []struct {
57
		name    string
58
		id      string
59
		options []PeerOption
60
		expect  func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host)
61
	}{
62
		{
63
			name:    "new peer",
64
			id:      mockPeerID,
65
			options: []PeerOption{},
66
			expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
67
				assert := assert.New(t)
68
				assert.Equal(peer.ID, mockPeerID)
69
				assert.Nil(peer.Range)
70
				assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
71
				assert.Empty(peer.Pieces)
72
				assert.Empty(peer.FinishedPieces)
73
				assert.Equal(len(peer.PieceCosts()), 0)
74
				assert.Empty(peer.ReportPieceResultStream)
75
				assert.Empty(peer.AnnouncePeerStream)
76
				assert.Equal(peer.FSM.Current(), PeerStatePending)
77
				assert.EqualValues(peer.Task, mockTask)
78
				assert.EqualValues(peer.Host, mockHost)
79
				assert.Equal(peer.BlockParents.Len(), uint(0))
80
				assert.Equal(peer.NeedBackToSource.Load(), false)
81
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
82
				assert.NotEqual(peer.CreatedAt.Load(), 0)
83
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
84
				assert.NotNil(peer.Log)
85
			},
86
		},
87
		{
88
			name:    "new peer with priority",
89
			id:      mockPeerID,
90
			options: []PeerOption{WithPriority(commonv2.Priority_LEVEL4)},
91
			expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
92
				assert := assert.New(t)
93
				assert.Equal(peer.ID, mockPeerID)
94
				assert.Nil(peer.Range)
95
				assert.Equal(peer.Priority, commonv2.Priority_LEVEL4)
96
				assert.Empty(peer.Pieces)
97
				assert.Empty(peer.FinishedPieces)
98
				assert.Equal(len(peer.PieceCosts()), 0)
99
				assert.Empty(peer.ReportPieceResultStream)
100
				assert.Empty(peer.AnnouncePeerStream)
101
				assert.Equal(peer.FSM.Current(), PeerStatePending)
102
				assert.EqualValues(peer.Task, mockTask)
103
				assert.EqualValues(peer.Host, mockHost)
104
				assert.Equal(peer.BlockParents.Len(), uint(0))
105
				assert.Equal(peer.NeedBackToSource.Load(), false)
106
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
107
				assert.NotEqual(peer.CreatedAt.Load(), 0)
108
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
109
				assert.NotNil(peer.Log)
110
			},
111
		},
112
		{
113
			name: "new peer with range",
114
			id:   mockPeerID,
115
			options: []PeerOption{WithRange(nethttp.Range{
116
				Start:  1,
117
				Length: 10,
118
			})},
119
			expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
120
				assert := assert.New(t)
121
				assert.Equal(peer.ID, mockPeerID)
122
				assert.EqualValues(peer.Range, &nethttp.Range{Start: 1, Length: 10})
123
				assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
124
				assert.Empty(peer.Pieces)
125
				assert.Empty(peer.FinishedPieces)
126
				assert.Equal(len(peer.PieceCosts()), 0)
127
				assert.Empty(peer.ReportPieceResultStream)
128
				assert.Empty(peer.AnnouncePeerStream)
129
				assert.Equal(peer.FSM.Current(), PeerStatePending)
130
				assert.EqualValues(peer.Task, mockTask)
131
				assert.EqualValues(peer.Host, mockHost)
132
				assert.Equal(peer.BlockParents.Len(), uint(0))
133
				assert.Equal(peer.NeedBackToSource.Load(), false)
134
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
135
				assert.NotEqual(peer.CreatedAt.Load(), 0)
136
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
137
				assert.NotNil(peer.Log)
138
			},
139
		},
140
		{
141
			name:    "new peer with AnnouncePeerStream",
142
			id:      mockPeerID,
143
			options: []PeerOption{WithAnnouncePeerStream(stream)},
144
			expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
145
				assert := assert.New(t)
146
				assert.Equal(peer.ID, mockPeerID)
147
				assert.Nil(peer.Range)
148
				assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
149
				assert.Empty(peer.Pieces)
150
				assert.Empty(peer.FinishedPieces)
151
				assert.Equal(len(peer.PieceCosts()), 0)
152
				assert.Empty(peer.ReportPieceResultStream)
153
				assert.NotEmpty(peer.AnnouncePeerStream)
154
				assert.Equal(peer.FSM.Current(), PeerStatePending)
155
				assert.EqualValues(peer.Task, mockTask)
156
				assert.EqualValues(peer.Host, mockHost)
157
				assert.Equal(peer.BlockParents.Len(), uint(0))
158
				assert.Equal(peer.NeedBackToSource.Load(), false)
159
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
160
				assert.NotEqual(peer.CreatedAt.Load(), 0)
161
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
162
				assert.NotNil(peer.Log)
163
			},
164
		},
165
	}
166

167
	for _, tc := range tests {
168
		t.Run(tc.name, func(t *testing.T) {
169
			mockHost := NewHost(
170
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
171
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
172
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
173
			tc.expect(t, NewPeer(tc.id, mockResourceConfig, mockTask, mockHost, tc.options...), mockTask, mockHost)
174
		})
175
	}
176
}
177

178
func TestPeer_AppendPieceCost(t *testing.T) {
179
	tests := []struct {
180
		name   string
181
		expect func(t *testing.T, peer *Peer)
182
	}{
183
		{
184
			name: "append piece cost",
185
			expect: func(t *testing.T, peer *Peer) {
186
				assert := assert.New(t)
187
				peer.AppendPieceCost(time.Duration(1))
188
				costs := peer.PieceCosts()
189
				assert.Equal(costs[0], time.Duration(1))
190
			},
191
		},
192
		{
193
			name: "piece costs slice is empty",
194
			expect: func(t *testing.T, peer *Peer) {
195
				assert := assert.New(t)
196
				costs := peer.PieceCosts()
197
				assert.Equal(len(costs), 0)
198
			},
199
		},
200
	}
201

202
	for _, tc := range tests {
203
		t.Run(tc.name, func(t *testing.T) {
204
			mockHost := NewHost(
205
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
206
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
207
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
208
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
209

210
			tc.expect(t, peer)
211
		})
212
	}
213
}
214

215
func TestPeer_PieceCosts(t *testing.T) {
216
	tests := []struct {
217
		name   string
218
		expect func(t *testing.T, peer *Peer)
219
	}{
220
		{
221
			name: "piece costs slice is not empty",
222
			expect: func(t *testing.T, peer *Peer) {
223
				assert := assert.New(t)
224
				peer.AppendPieceCost(time.Duration(1))
225
				costs := peer.PieceCosts()
226
				assert.Equal(costs[0], time.Duration(1))
227
			},
228
		},
229
		{
230
			name: "piece costs slice is empty",
231
			expect: func(t *testing.T, peer *Peer) {
232
				assert := assert.New(t)
233
				costs := peer.PieceCosts()
234
				assert.Equal(len(costs), 0)
235
			},
236
		},
237
	}
238

239
	for _, tc := range tests {
240
		t.Run(tc.name, func(t *testing.T) {
241
			mockHost := NewHost(
242
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
243
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
244
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
245
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
246

247
			tc.expect(t, peer)
248
		})
249
	}
250
}
251

252
func TestPeer_LoadReportPieceResultStream(t *testing.T) {
253
	tests := []struct {
254
		name   string
255
		expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
256
	}{
257
		{
258
			name: "load stream",
259
			expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
260
				assert := assert.New(t)
261
				peer.StoreReportPieceResultStream(stream)
262
				newStream, loaded := peer.LoadReportPieceResultStream()
263
				assert.Equal(loaded, true)
264
				assert.EqualValues(newStream, stream)
265
			},
266
		},
267
		{
268
			name: "stream does not exist",
269
			expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
270
				assert := assert.New(t)
271
				_, loaded := peer.LoadReportPieceResultStream()
272
				assert.Equal(loaded, false)
273
			},
274
		},
275
	}
276

277
	for _, tc := range tests {
278
		t.Run(tc.name, func(t *testing.T) {
279
			ctl := gomock.NewController(t)
280
			defer ctl.Finish()
281
			stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
282

283
			mockHost := NewHost(
284
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
285
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
286
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
287
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
288
			tc.expect(t, peer, stream)
289
		})
290
	}
291
}
292

293
func TestPeer_StoreReportPieceResultStream(t *testing.T) {
294
	tests := []struct {
295
		name   string
296
		expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
297
	}{
298
		{
299
			name: "store stream",
300
			expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
301
				assert := assert.New(t)
302
				peer.StoreReportPieceResultStream(stream)
303
				newStream, loaded := peer.LoadReportPieceResultStream()
304
				assert.Equal(loaded, true)
305
				assert.EqualValues(newStream, stream)
306
			},
307
		},
308
	}
309

310
	for _, tc := range tests {
311
		t.Run(tc.name, func(t *testing.T) {
312
			ctl := gomock.NewController(t)
313
			defer ctl.Finish()
314
			stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
315

316
			mockHost := NewHost(
317
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
318
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
319
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
320
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
321
			tc.expect(t, peer, stream)
322
		})
323
	}
324
}
325

326
func TestPeer_DeleteReportPieceResultStream(t *testing.T) {
327
	tests := []struct {
328
		name   string
329
		expect func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
330
	}{
331
		{
332
			name: "delete stream",
333
			expect: func(t *testing.T, peer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
334
				assert := assert.New(t)
335
				peer.StoreReportPieceResultStream(stream)
336
				peer.DeleteReportPieceResultStream()
337
				_, loaded := peer.LoadReportPieceResultStream()
338
				assert.Equal(loaded, false)
339
			},
340
		},
341
	}
342

343
	for _, tc := range tests {
344
		t.Run(tc.name, func(t *testing.T) {
345
			ctl := gomock.NewController(t)
346
			defer ctl.Finish()
347
			stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
348

349
			mockHost := NewHost(
350
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
351
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
352
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
353
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
354
			tc.expect(t, peer, stream)
355
		})
356
	}
357
}
358

359
func TestPeer_LoadAnnouncePeerStream(t *testing.T) {
360
	tests := []struct {
361
		name   string
362
		expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
363
	}{
364
		{
365
			name: "load stream",
366
			expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
367
				assert := assert.New(t)
368
				peer.StoreAnnouncePeerStream(stream)
369
				newStream, loaded := peer.LoadAnnouncePeerStream()
370
				assert.Equal(loaded, true)
371
				assert.EqualValues(newStream, stream)
372
			},
373
		},
374
		{
375
			name: "stream does not exist",
376
			expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
377
				assert := assert.New(t)
378
				_, loaded := peer.LoadAnnouncePeerStream()
379
				assert.Equal(loaded, false)
380
			},
381
		},
382
	}
383

384
	for _, tc := range tests {
385
		t.Run(tc.name, func(t *testing.T) {
386
			ctl := gomock.NewController(t)
387
			defer ctl.Finish()
388
			stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
389

390
			mockHost := NewHost(
391
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
392
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
393
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
394
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
395
			tc.expect(t, peer, stream)
396
		})
397
	}
398
}
399

400
func TestPeer_StoreAnnouncePeerStream(t *testing.T) {
401
	tests := []struct {
402
		name   string
403
		expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
404
	}{
405
		{
406
			name: "store stream",
407
			expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
408
				assert := assert.New(t)
409
				peer.StoreAnnouncePeerStream(stream)
410
				newStream, loaded := peer.LoadAnnouncePeerStream()
411
				assert.Equal(loaded, true)
412
				assert.EqualValues(newStream, stream)
413
			},
414
		},
415
	}
416

417
	for _, tc := range tests {
418
		t.Run(tc.name, func(t *testing.T) {
419
			ctl := gomock.NewController(t)
420
			defer ctl.Finish()
421
			stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
422

423
			mockHost := NewHost(
424
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
425
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
426
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
427
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
428
			tc.expect(t, peer, stream)
429
		})
430
	}
431
}
432

433
func TestPeer_DeleteAnnouncePeerStream(t *testing.T) {
434
	tests := []struct {
435
		name   string
436
		expect func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer)
437
	}{
438
		{
439
			name: "delete stream",
440
			expect: func(t *testing.T, peer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer) {
441
				assert := assert.New(t)
442
				peer.StoreAnnouncePeerStream(stream)
443
				peer.DeleteAnnouncePeerStream()
444
				_, loaded := peer.LoadAnnouncePeerStream()
445
				assert.Equal(loaded, false)
446
			},
447
		},
448
	}
449

450
	for _, tc := range tests {
451
		t.Run(tc.name, func(t *testing.T) {
452
			ctl := gomock.NewController(t)
453
			defer ctl.Finish()
454
			stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
455

456
			mockHost := NewHost(
457
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
458
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
459
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
460
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
461
			tc.expect(t, peer, stream)
462
		})
463
	}
464
}
465

466
func TestPeer_LoadPiece(t *testing.T) {
467
	tests := []struct {
468
		name        string
469
		piece       *Piece
470
		pieceNumber int32
471
		expect      func(t *testing.T, piece *Piece, loaded bool)
472
	}{
473
		{
474
			name:        "load piece",
475
			piece:       mockPiece,
476
			pieceNumber: mockPiece.Number,
477
			expect: func(t *testing.T, piece *Piece, loaded bool) {
478
				assert := assert.New(t)
479
				assert.Equal(loaded, true)
480
				assert.Equal(piece.Number, mockPiece.Number)
481
				assert.Equal(piece.ParentID, mockPiece.ParentID)
482
				assert.Equal(piece.Offset, mockPiece.Offset)
483
				assert.Equal(piece.Length, mockPiece.Length)
484
				assert.EqualValues(piece.Digest, mockPiece.Digest)
485
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
486
				assert.Equal(piece.Cost, mockPiece.Cost)
487
				assert.Equal(piece.CreatedAt, mockPiece.CreatedAt)
488
			},
489
		},
490
		{
491
			name:        "piece does not exist",
492
			piece:       mockPiece,
493
			pieceNumber: 2,
494
			expect: func(t *testing.T, piece *Piece, loaded bool) {
495
				assert := assert.New(t)
496
				assert.Equal(loaded, false)
497
			},
498
		},
499
		{
500
			name:        "load key is zero",
501
			piece:       mockPiece,
502
			pieceNumber: 0,
503
			expect: func(t *testing.T, piece *Piece, loaded bool) {
504
				assert := assert.New(t)
505
				assert.Equal(loaded, false)
506
			},
507
		},
508
	}
509

510
	for _, tc := range tests {
511
		t.Run(tc.name, func(t *testing.T) {
512
			mockHost := NewHost(
513
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
514
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
515
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
516
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
517

518
			peer.StorePiece(tc.piece)
519
			piece, loaded := peer.LoadPiece(tc.pieceNumber)
520
			tc.expect(t, piece, loaded)
521
		})
522
	}
523
}
524

525
func TestPeer_StorePiece(t *testing.T) {
526
	tests := []struct {
527
		name        string
528
		piece       *Piece
529
		pieceNumber int32
530
		expect      func(t *testing.T, piece *Piece, loaded bool)
531
	}{
532
		{
533
			name:        "store piece",
534
			piece:       mockPiece,
535
			pieceNumber: mockPiece.Number,
536
			expect: func(t *testing.T, piece *Piece, loaded bool) {
537
				assert := assert.New(t)
538
				assert.Equal(loaded, true)
539
				assert.Equal(piece.Number, mockPiece.Number)
540
				assert.Equal(piece.ParentID, mockPiece.ParentID)
541
				assert.Equal(piece.Offset, mockPiece.Offset)
542
				assert.Equal(piece.Length, mockPiece.Length)
543
				assert.EqualValues(piece.Digest, mockPiece.Digest)
544
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
545
				assert.Equal(piece.Cost, mockPiece.Cost)
546
				assert.Equal(piece.CreatedAt, mockPiece.CreatedAt)
547
			},
548
		},
549
	}
550

551
	for _, tc := range tests {
552
		t.Run(tc.name, func(t *testing.T) {
553
			mockHost := NewHost(
554
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
555
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
556
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
557
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
558

559
			peer.StorePiece(tc.piece)
560
			piece, loaded := peer.LoadPiece(tc.pieceNumber)
561
			tc.expect(t, piece, loaded)
562
		})
563
	}
564
}
565

566
func TestPeer_DeletePiece(t *testing.T) {
567
	tests := []struct {
568
		name        string
569
		piece       *Piece
570
		pieceNumber int32
571
		expect      func(t *testing.T, peer *Peer)
572
	}{
573
		{
574
			name:        "delete piece",
575
			piece:       mockPiece,
576
			pieceNumber: mockPiece.Number,
577
			expect: func(t *testing.T, peer *Peer) {
578
				assert := assert.New(t)
579
				_, loaded := peer.LoadPiece(mockPiece.Number)
580
				assert.Equal(loaded, false)
581
			},
582
		},
583
		{
584
			name:        "delete key does not exist",
585
			piece:       mockPiece,
586
			pieceNumber: 0,
587
			expect: func(t *testing.T, peer *Peer) {
588
				assert := assert.New(t)
589
				piece, loaded := peer.LoadPiece(mockPiece.Number)
590
				assert.Equal(loaded, true)
591
				assert.Equal(piece.Number, mockPiece.Number)
592
			},
593
		},
594
	}
595

596
	for _, tc := range tests {
597
		t.Run(tc.name, func(t *testing.T) {
598
			mockHost := NewHost(
599
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
600
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
601
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
602
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
603

604
			peer.StorePiece(tc.piece)
605
			peer.DeletePiece(tc.pieceNumber)
606
			tc.expect(t, peer)
607
		})
608
	}
609
}
610

611
func TestPeer_Parents(t *testing.T) {
612
	tests := []struct {
613
		name   string
614
		expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
615
	}{
616
		{
617
			name: "peer has no parents",
618
			expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
619
				assert := assert.New(t)
620
				peer.Task.StorePeer(peer)
621
				assert.Equal(len(peer.Parents()), 0)
622
			},
623
		},
624
		{
625
			name: "peer has parents",
626
			expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
627
				assert := assert.New(t)
628
				peer.Task.StorePeer(peer)
629
				peer.Task.StorePeer(seedPeer)
630
				if err := peer.Task.AddPeerEdge(seedPeer, peer); err != nil {
631
					t.Fatal(err)
632
				}
633

634
				assert.Equal(len(peer.Parents()), 1)
635
				assert.Equal(peer.Parents()[0].ID, mockSeedPeerID)
636
			},
637
		},
638
	}
639

640
	for _, tc := range tests {
641
		t.Run(tc.name, func(t *testing.T) {
642
			ctl := gomock.NewController(t)
643
			defer ctl.Finish()
644
			stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
645

646
			mockHost := NewHost(
647
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
648
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
649
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
650
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
651
			seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
652
			tc.expect(t, peer, seedPeer, stream)
653
		})
654
	}
655
}
656

657
func TestPeer_Children(t *testing.T) {
658
	tests := []struct {
659
		name   string
660
		expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
661
	}{
662
		{
663
			name: "peer has no children",
664
			expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
665
				assert := assert.New(t)
666
				peer.Task.StorePeer(peer)
667
				assert.Equal(len(peer.Children()), 0)
668
			},
669
		},
670
		{
671
			name: "peer has children",
672
			expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
673
				assert := assert.New(t)
674
				peer.Task.StorePeer(peer)
675
				peer.Task.StorePeer(seedPeer)
676
				if err := peer.Task.AddPeerEdge(peer, seedPeer); err != nil {
677
					t.Fatal(err)
678
				}
679

680
				assert.Equal(len(peer.Children()), 1)
681
				assert.Equal(peer.Children()[0].ID, mockSeedPeerID)
682
			},
683
		},
684
	}
685

686
	for _, tc := range tests {
687
		t.Run(tc.name, func(t *testing.T) {
688
			ctl := gomock.NewController(t)
689
			defer ctl.Finish()
690
			stream := v1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
691

692
			mockHost := NewHost(
693
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
694
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
695
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
696
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
697
			seedPeer := NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
698
			tc.expect(t, peer, seedPeer, stream)
699
		})
700
	}
701
}
702

703
func TestPeer_DownloadTinyFile(t *testing.T) {
704
	testData := []byte("./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +
705
		"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")
706
	mockServer := func(t *testing.T, peer *Peer) *httptest.Server {
707
		return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
708
			assert := assert.New(t)
709
			assert.NotNil(peer)
710
			assert.Equal(r.URL.Path, fmt.Sprintf("/download/%s/%s", peer.Task.ID[:3], peer.Task.ID))
711
			assert.Equal(r.URL.RawQuery, fmt.Sprintf("peerId=%s", peer.ID))
712

713
			rgs, err := nethttp.ParseRange(r.Header.Get(headers.Range), 128)
714
			assert.Nil(err)
715
			assert.Equal(1, len(rgs))
716
			rg := rgs[0]
717

718
			w.WriteHeader(http.StatusPartialContent)
719
			n, err := w.Write(testData[rg.Start : rg.Start+rg.Length])
720
			assert.Nil(err)
721
			assert.Equal(int64(n), rg.Length)
722
		}))
723
	}
724

725
	tests := []struct {
726
		name       string
727
		mockServer func(t *testing.T, peer *Peer) *httptest.Server
728
		expect     func(t *testing.T, peer *Peer)
729
	}{
730
		{
731
			name:       "download tiny file",
732
			mockServer: mockServer,
733
			expect: func(t *testing.T, peer *Peer) {
734
				assert := assert.New(t)
735
				peer.Task.ContentLength.Store(32)
736
				data, err := peer.DownloadTinyFile()
737
				assert.NoError(err)
738
				assert.Equal(testData[:32], data)
739
			},
740
		},
741
		{
742
			name:       "download tiny file with range",
743
			mockServer: mockServer,
744
			expect: func(t *testing.T, peer *Peer) {
745
				assert := assert.New(t)
746
				peer.Task.ContentLength.Store(32)
747
				peer.Range = &nethttp.Range{
748
					Start:  0,
749
					Length: 10,
750
				}
751
				data, err := peer.DownloadTinyFile()
752
				assert.NoError(err)
753
				assert.Equal(testData[:32], data)
754
			},
755
		},
756
		{
757
			name: "download tiny file failed because of http status code",
758
			mockServer: func(t *testing.T, peer *Peer) *httptest.Server {
759
				return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
760
					w.WriteHeader(http.StatusNotFound)
761
				}))
762
			},
763
			expect: func(t *testing.T, peer *Peer) {
764
				assert := assert.New(t)
765
				peer.Task.ID = "foobar"
766
				_, err := peer.DownloadTinyFile()
767
				assert.EqualError(err, "bad response status 404 Not Found")
768
			},
769
		},
770
	}
771

772
	for _, tc := range tests {
773
		t.Run(tc.name, func(t *testing.T) {
774
			mockHost := NewHost(
775
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
776
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
777
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
778
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
779

780
			if tc.mockServer == nil {
781
				tc.mockServer = mockServer
782
			}
783

784
			s := tc.mockServer(t, peer)
785
			defer s.Close()
786

787
			url, err := url.Parse(s.URL)
788
			if err != nil {
789
				t.Fatal(err)
790
			}
791

792
			ip, rawPort, err := net.SplitHostPort(url.Host)
793
			if err != nil {
794
				t.Fatal(err)
795
			}
796

797
			port, err := strconv.ParseInt(rawPort, 10, 32)
798
			if err != nil {
799
				t.Fatal(err)
800
			}
801

802
			mockHost.IP = ip
803
			mockHost.DownloadPort = int32(port)
804
			tc.expect(t, peer)
805
		})
806
	}
807
}
808

809
func TestPeer_CalculatePriority(t *testing.T) {
810
	tests := []struct {
811
		name   string
812
		mock   func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder)
813
		expect func(t *testing.T, priority commonv2.Priority)
814
	}{
815
		{
816
			name: "peer has priority",
817
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
818
				priority := commonv2.Priority_LEVEL4
819
				peer.Priority = priority
820
			},
821
			expect: func(t *testing.T, priority commonv2.Priority) {
822
				assert := assert.New(t)
823
				assert.Equal(priority, commonv2.Priority_LEVEL4)
824
			},
825
		},
826
		{
827
			name: "get applications failed",
828
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
829
				md.GetApplications().Return(nil, errors.New("bas")).Times(1)
830
			},
831
			expect: func(t *testing.T, priority commonv2.Priority) {
832
				assert := assert.New(t)
833
				assert.Equal(priority, commonv2.Priority_LEVEL0)
834
			},
835
		},
836
		{
837
			name: "can not found applications",
838
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
839
				md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1)
840
			},
841
			expect: func(t *testing.T, priority commonv2.Priority) {
842
				assert := assert.New(t)
843
				assert.Equal(priority, commonv2.Priority_LEVEL0)
844
			},
845
		},
846
		{
847
			name: "can not found matching application",
848
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
849
				md.GetApplications().Return([]*managerv2.Application{
850
					{
851
						Name: "baw",
852
					},
853
				}, nil).Times(1)
854
			},
855
			expect: func(t *testing.T, priority commonv2.Priority) {
856
				assert := assert.New(t)
857
				assert.Equal(priority, commonv2.Priority_LEVEL0)
858
			},
859
		},
860
		{
861
			name: "can not found priority",
862
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
863
				peer.Task.Application = "bae"
864
				md.GetApplications().Return([]*managerv2.Application{
865
					{
866
						Name: "bae",
867
					},
868
				}, nil).Times(1)
869
			},
870
			expect: func(t *testing.T, priority commonv2.Priority) {
871
				assert := assert.New(t)
872
				assert.Equal(priority, commonv2.Priority_LEVEL0)
873
			},
874
		},
875
		{
876
			name: "match the priority of application",
877
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
878
				peer.Task.Application = "baz"
879
				md.GetApplications().Return([]*managerv2.Application{
880
					{
881
						Name: "baz",
882
						Priority: &managerv2.ApplicationPriority{
883
							Value: commonv2.Priority_LEVEL1,
884
						},
885
					},
886
				}, nil).Times(1)
887
			},
888
			expect: func(t *testing.T, priority commonv2.Priority) {
889
				assert := assert.New(t)
890
				assert.Equal(priority, commonv2.Priority_LEVEL1)
891
			},
892
		},
893
		{
894
			name: "match the priority of url",
895
			mock: func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder) {
896
				peer.Task.Application = "bak"
897
				peer.Task.URL = "example.com"
898
				md.GetApplications().Return([]*managerv2.Application{
899
					{
900
						Name: "bak",
901
						Priority: &managerv2.ApplicationPriority{
902
							Value: commonv2.Priority_LEVEL1,
903
							Urls: []*managerv2.URLPriority{
904
								{
905
									Regex: "am",
906
									Value: commonv2.Priority_LEVEL2,
907
								},
908
							},
909
						},
910
					},
911
				}, nil).Times(1)
912
			},
913
			expect: func(t *testing.T, priority commonv2.Priority) {
914
				assert := assert.New(t)
915
				assert.Equal(priority, commonv2.Priority_LEVEL2)
916
			},
917
		},
918
	}
919

920
	for _, tc := range tests {
921
		t.Run(tc.name, func(t *testing.T) {
922
			ctl := gomock.NewController(t)
923
			defer ctl.Finish()
924
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
925

926
			mockHost := NewHost(
927
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
928
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
929
			mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
930
			peer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
931
			tc.mock(peer, dynconfig.EXPECT())
932
			tc.expect(t, peer.CalculatePriority(dynconfig))
933
		})
934
	}
935
}
936

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.