Dragonfly2

Форк
0
/
service_v2_test.go 
3400 строк · 151.7 Кб
1
/*
2
 *     Copyright 2023 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package service
18

19
import (
20
	"context"
21
	"errors"
22
	"io"
23
	"net"
24
	"net/http"
25
	"net/http/httptest"
26
	"net/url"
27
	"reflect"
28
	"strconv"
29
	"sync"
30
	"testing"
31

32
	"github.com/stretchr/testify/assert"
33
	"go.uber.org/mock/gomock"
34
	"google.golang.org/grpc/codes"
35
	"google.golang.org/grpc/status"
36
	"google.golang.org/protobuf/types/known/durationpb"
37
	"google.golang.org/protobuf/types/known/timestamppb"
38

39
	commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
40
	dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
41
	managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
42
	schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
43
	schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
44

45
	managertypes "d7y.io/dragonfly/v2/manager/types"
46
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
47
	pkgtypes "d7y.io/dragonfly/v2/pkg/types"
48
	"d7y.io/dragonfly/v2/scheduler/config"
49
	configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
50
	"d7y.io/dragonfly/v2/scheduler/networktopology"
51
	networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
52
	"d7y.io/dragonfly/v2/scheduler/resource"
53
	"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
54
	schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
55
	storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
56
)
57

58
func TestService_NewV2(t *testing.T) {
59
	tests := []struct {
60
		name   string
61
		expect func(t *testing.T, s any)
62
	}{
63
		{
64
			name: "new service",
65
			expect: func(t *testing.T, s any) {
66
				assert := assert.New(t)
67
				assert.Equal(reflect.TypeOf(s).Elem().Name(), "V2")
68
			},
69
		},
70
	}
71

72
	for _, tc := range tests {
73
		t.Run(tc.name, func(t *testing.T) {
74
			ctl := gomock.NewController(t)
75
			defer ctl.Finish()
76
			scheduling := schedulingmocks.NewMockScheduling(ctl)
77
			resource := resource.NewMockResource(ctl)
78
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
79
			storage := storagemocks.NewMockStorage(ctl)
80
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
81

82
			tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology))
83
		})
84
	}
85
}
86

87
func TestServiceV2_StatPeer(t *testing.T) {
88
	tests := []struct {
89
		name   string
90
		mock   func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
91
		expect func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error)
92
	}{
93
		{
94
			name: "peer not found",
95
			mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
96
				gomock.InOrder(
97
					mr.PeerManager().Return(peerManager).Times(1),
98
					mp.Load(gomock.Any()).Return(nil, false).Times(1),
99
				)
100
			},
101
			expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
102
				assert := assert.New(t)
103
				assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
104
			},
105
		},
106
		{
107
			name: "peer has been loaded",
108
			mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
109
				peer.StorePiece(&mockPiece)
110
				peer.Task.StorePiece(&mockPiece)
111
				gomock.InOrder(
112
					mr.PeerManager().Return(peerManager).Times(1),
113
					mp.Load(gomock.Any()).Return(peer, true).Times(1),
114
				)
115
			},
116
			expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
117
				dgst := peer.Task.Digest.String()
118

119
				assert := assert.New(t)
120
				assert.EqualValues(resp, &commonv2.Peer{
121
					Id: peer.ID,
122
					Range: &commonv2.Range{
123
						Start:  uint64(peer.Range.Start),
124
						Length: uint64(peer.Range.Length),
125
					},
126
					Priority: peer.Priority,
127
					Pieces: []*commonv2.Piece{
128
						{
129
							Number:      uint32(mockPiece.Number),
130
							ParentId:    &mockPiece.ParentID,
131
							Offset:      mockPiece.Offset,
132
							Length:      mockPiece.Length,
133
							Digest:      mockPiece.Digest.String(),
134
							TrafficType: &mockPiece.TrafficType,
135
							Cost:        durationpb.New(mockPiece.Cost),
136
							CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
137
						},
138
					},
139
					Cost:  durationpb.New(peer.Cost.Load()),
140
					State: peer.FSM.Current(),
141
					Task: &commonv2.Task{
142
						Id:                  peer.Task.ID,
143
						Type:                peer.Task.Type,
144
						Url:                 peer.Task.URL,
145
						Digest:              &dgst,
146
						Tag:                 &peer.Task.Tag,
147
						Application:         &peer.Task.Application,
148
						FilteredQueryParams: peer.Task.FilteredQueryParams,
149
						RequestHeader:       peer.Task.Header,
150
						PieceLength:         uint32(peer.Task.PieceLength),
151
						ContentLength:       uint64(peer.Task.ContentLength.Load()),
152
						PieceCount:          uint32(peer.Task.TotalPieceCount.Load()),
153
						SizeScope:           peer.Task.SizeScope(),
154
						Pieces: []*commonv2.Piece{
155
							{
156
								Number:      uint32(mockPiece.Number),
157
								ParentId:    &mockPiece.ParentID,
158
								Offset:      mockPiece.Offset,
159
								Length:      mockPiece.Length,
160
								Digest:      mockPiece.Digest.String(),
161
								TrafficType: &mockPiece.TrafficType,
162
								Cost:        durationpb.New(mockPiece.Cost),
163
								CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
164
							},
165
						},
166
						State:     peer.Task.FSM.Current(),
167
						PeerCount: uint32(peer.Task.PeerCount()),
168
						CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()),
169
						UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()),
170
					},
171
					Host: &commonv2.Host{
172
						Id:              peer.Host.ID,
173
						Type:            uint32(peer.Host.Type),
174
						Hostname:        peer.Host.Hostname,
175
						Ip:              peer.Host.IP,
176
						Port:            peer.Host.Port,
177
						DownloadPort:    peer.Host.DownloadPort,
178
						Os:              peer.Host.OS,
179
						Platform:        peer.Host.Platform,
180
						PlatformFamily:  peer.Host.PlatformFamily,
181
						PlatformVersion: peer.Host.PlatformVersion,
182
						KernelVersion:   peer.Host.KernelVersion,
183
						Cpu: &commonv2.CPU{
184
							LogicalCount:   peer.Host.CPU.LogicalCount,
185
							PhysicalCount:  peer.Host.CPU.PhysicalCount,
186
							Percent:        peer.Host.CPU.Percent,
187
							ProcessPercent: peer.Host.CPU.ProcessPercent,
188
							Times: &commonv2.CPUTimes{
189
								User:      peer.Host.CPU.Times.User,
190
								System:    peer.Host.CPU.Times.System,
191
								Idle:      peer.Host.CPU.Times.Idle,
192
								Nice:      peer.Host.CPU.Times.Nice,
193
								Iowait:    peer.Host.CPU.Times.Iowait,
194
								Irq:       peer.Host.CPU.Times.Irq,
195
								Softirq:   peer.Host.CPU.Times.Softirq,
196
								Steal:     peer.Host.CPU.Times.Steal,
197
								Guest:     peer.Host.CPU.Times.Guest,
198
								GuestNice: peer.Host.CPU.Times.GuestNice,
199
							},
200
						},
201
						Memory: &commonv2.Memory{
202
							Total:              peer.Host.Memory.Total,
203
							Available:          peer.Host.Memory.Available,
204
							Used:               peer.Host.Memory.Used,
205
							UsedPercent:        peer.Host.Memory.UsedPercent,
206
							ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
207
							Free:               peer.Host.Memory.Free,
208
						},
209
						Network: &commonv2.Network{
210
							TcpConnectionCount:       peer.Host.Network.TCPConnectionCount,
211
							UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
212
							Location:                 &peer.Host.Network.Location,
213
							Idc:                      &peer.Host.Network.IDC,
214
						},
215
						Disk: &commonv2.Disk{
216
							Total:             peer.Host.Disk.Total,
217
							Free:              peer.Host.Disk.Free,
218
							Used:              peer.Host.Disk.Used,
219
							UsedPercent:       peer.Host.Disk.UsedPercent,
220
							InodesTotal:       peer.Host.Disk.InodesTotal,
221
							InodesUsed:        peer.Host.Disk.InodesUsed,
222
							InodesFree:        peer.Host.Disk.InodesFree,
223
							InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
224
						},
225
						Build: &commonv2.Build{
226
							GitVersion: peer.Host.Build.GitVersion,
227
							GitCommit:  &peer.Host.Build.GitCommit,
228
							GoVersion:  &peer.Host.Build.GoVersion,
229
							Platform:   &peer.Host.Build.Platform,
230
						},
231
					},
232
					NeedBackToSource: peer.NeedBackToSource.Load(),
233
					CreatedAt:        timestamppb.New(peer.CreatedAt.Load()),
234
					UpdatedAt:        timestamppb.New(peer.UpdatedAt.Load()),
235
				})
236
			},
237
		},
238
	}
239

240
	for _, tc := range tests {
241
		t.Run(tc.name, func(t *testing.T) {
242
			ctl := gomock.NewController(t)
243
			defer ctl.Finish()
244
			scheduling := schedulingmocks.NewMockScheduling(ctl)
245
			res := resource.NewMockResource(ctl)
246
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
247
			storage := storagemocks.NewMockStorage(ctl)
248
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
249
			peerManager := resource.NewMockPeerManager(ctl)
250
			mockHost := resource.NewHost(
251
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
252
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
253
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
254
			peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange))
255
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
256

257
			tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
258
			resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})
259
			tc.expect(t, peer, resp, err)
260
		})
261
	}
262
}
263

264
func TestServiceV2_LeavePeer(t *testing.T) {
265
	tests := []struct {
266
		name   string
267
		mock   func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
268
		expect func(t *testing.T, err error)
269
	}{
270
		{
271
			name: "peer not found",
272
			mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
273
				gomock.InOrder(
274
					mr.PeerManager().Return(peerManager).Times(1),
275
					mp.Load(gomock.Any()).Return(nil, false).Times(1),
276
				)
277
			},
278
			expect: func(t *testing.T, err error) {
279
				assert := assert.New(t)
280
				assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
281
			},
282
		},
283
		{
284
			name: "peer fsm event failed",
285
			mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
286
				peer.FSM.SetState(resource.PeerStateLeave)
287
				gomock.InOrder(
288
					mr.PeerManager().Return(peerManager).Times(1),
289
					mp.Load(gomock.Any()).Return(peer, true).Times(1),
290
				)
291
			},
292
			expect: func(t *testing.T, err error) {
293
				assert := assert.New(t)
294
				assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "peer fsm event failed: event Leave inappropriate in current state Leave"))
295
			},
296
		},
297
		{
298
			name: "peer leaves succeeded",
299
			mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
300
				gomock.InOrder(
301
					mr.PeerManager().Return(peerManager).Times(1),
302
					mp.Load(gomock.Any()).Return(peer, true).Times(1),
303
				)
304
			},
305
			expect: func(t *testing.T, err error) {
306
				assert := assert.New(t)
307
				assert.NoError(err)
308
			},
309
		},
310
	}
311

312
	for _, tc := range tests {
313
		t.Run(tc.name, func(t *testing.T) {
314
			ctl := gomock.NewController(t)
315
			defer ctl.Finish()
316
			scheduling := schedulingmocks.NewMockScheduling(ctl)
317
			res := resource.NewMockResource(ctl)
318
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
319
			storage := storagemocks.NewMockStorage(ctl)
320
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
321
			peerManager := resource.NewMockPeerManager(ctl)
322
			mockHost := resource.NewHost(
323
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
324
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
325
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
326
			peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange))
327
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
328

329
			tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
330
			tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}))
331
		})
332
	}
333
}
334

335
func TestServiceV2_StatTask(t *testing.T) {
336
	tests := []struct {
337
		name   string
338
		mock   func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder)
339
		expect func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error)
340
	}{
341
		{
342
			name: "task not found",
343
			mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) {
344
				gomock.InOrder(
345
					mr.TaskManager().Return(taskManager).Times(1),
346
					mt.Load(gomock.Any()).Return(nil, false).Times(1),
347
				)
348
			},
349
			expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) {
350
				assert := assert.New(t)
351
				assert.ErrorIs(err, status.Errorf(codes.NotFound, "task %s not found", mockTaskID))
352
			},
353
		},
354
		{
355
			name: "task has been loaded",
356
			mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) {
357
				task.StorePiece(&mockPiece)
358
				gomock.InOrder(
359
					mr.TaskManager().Return(taskManager).Times(1),
360
					mt.Load(gomock.Any()).Return(task, true).Times(1),
361
				)
362
			},
363
			expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) {
364
				dgst := task.Digest.String()
365

366
				assert := assert.New(t)
367
				assert.EqualValues(resp, &commonv2.Task{
368
					Id:                  task.ID,
369
					Type:                task.Type,
370
					Url:                 task.URL,
371
					Digest:              &dgst,
372
					Tag:                 &task.Tag,
373
					Application:         &task.Application,
374
					FilteredQueryParams: task.FilteredQueryParams,
375
					RequestHeader:       task.Header,
376
					PieceLength:         uint32(task.PieceLength),
377
					ContentLength:       uint64(task.ContentLength.Load()),
378
					PieceCount:          uint32(task.TotalPieceCount.Load()),
379
					SizeScope:           task.SizeScope(),
380
					Pieces: []*commonv2.Piece{
381
						{
382
							Number:      uint32(mockPiece.Number),
383
							ParentId:    &mockPiece.ParentID,
384
							Offset:      mockPiece.Offset,
385
							Length:      mockPiece.Length,
386
							Digest:      mockPiece.Digest.String(),
387
							TrafficType: &mockPiece.TrafficType,
388
							Cost:        durationpb.New(mockPiece.Cost),
389
							CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
390
						},
391
					},
392
					State:     task.FSM.Current(),
393
					PeerCount: uint32(task.PeerCount()),
394
					CreatedAt: timestamppb.New(task.CreatedAt.Load()),
395
					UpdatedAt: timestamppb.New(task.UpdatedAt.Load()),
396
				})
397
			},
398
		},
399
	}
400

401
	for _, tc := range tests {
402
		t.Run(tc.name, func(t *testing.T) {
403
			ctl := gomock.NewController(t)
404
			defer ctl.Finish()
405
			scheduling := schedulingmocks.NewMockScheduling(ctl)
406
			res := resource.NewMockResource(ctl)
407
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
408
			storage := storagemocks.NewMockStorage(ctl)
409
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
410
			taskManager := resource.NewMockTaskManager(ctl)
411
			task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
412
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
413

414
			tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT())
415
			resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{Id: mockTaskID})
416
			tc.expect(t, task, resp, err)
417
		})
418
	}
419
}
420

421
func TestServiceV2_AnnounceHost(t *testing.T) {
422
	tests := []struct {
423
		name string
424
		req  *schedulerv2.AnnounceHostRequest
425
		run  func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
426
	}{
427
		{
428
			name: "host not found",
429
			req: &schedulerv2.AnnounceHostRequest{
430
				Host: &commonv2.Host{
431
					Id:              mockHostID,
432
					Type:            uint32(pkgtypes.HostTypeNormal),
433
					Hostname:        "hostname",
434
					Ip:              "127.0.0.1",
435
					Port:            8003,
436
					DownloadPort:    8001,
437
					Os:              "darwin",
438
					Platform:        "darwin",
439
					PlatformFamily:  "Standalone Workstation",
440
					PlatformVersion: "11.1",
441
					KernelVersion:   "20.2.0",
442
					Cpu: &commonv2.CPU{
443
						LogicalCount:   mockCPU.LogicalCount,
444
						PhysicalCount:  mockCPU.PhysicalCount,
445
						Percent:        mockCPU.Percent,
446
						ProcessPercent: mockCPU.ProcessPercent,
447
						Times: &commonv2.CPUTimes{
448
							User:      mockCPU.Times.User,
449
							System:    mockCPU.Times.System,
450
							Idle:      mockCPU.Times.Idle,
451
							Nice:      mockCPU.Times.Nice,
452
							Iowait:    mockCPU.Times.Iowait,
453
							Irq:       mockCPU.Times.Irq,
454
							Softirq:   mockCPU.Times.Softirq,
455
							Steal:     mockCPU.Times.Steal,
456
							Guest:     mockCPU.Times.Guest,
457
							GuestNice: mockCPU.Times.GuestNice,
458
						},
459
					},
460
					Memory: &commonv2.Memory{
461
						Total:              mockMemory.Total,
462
						Available:          mockMemory.Available,
463
						Used:               mockMemory.Used,
464
						UsedPercent:        mockMemory.UsedPercent,
465
						ProcessUsedPercent: mockMemory.ProcessUsedPercent,
466
						Free:               mockMemory.Free,
467
					},
468
					Network: &commonv2.Network{
469
						TcpConnectionCount:       mockNetwork.TCPConnectionCount,
470
						UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
471
						Location:                 &mockNetwork.Location,
472
						Idc:                      &mockNetwork.IDC,
473
					},
474
					Disk: &commonv2.Disk{
475
						Total:             mockDisk.Total,
476
						Free:              mockDisk.Free,
477
						Used:              mockDisk.Used,
478
						UsedPercent:       mockDisk.UsedPercent,
479
						InodesTotal:       mockDisk.InodesTotal,
480
						InodesUsed:        mockDisk.InodesUsed,
481
						InodesFree:        mockDisk.InodesFree,
482
						InodesUsedPercent: mockDisk.InodesUsedPercent,
483
					},
484
					Build: &commonv2.Build{
485
						GitVersion: mockBuild.GitVersion,
486
						GitCommit:  &mockBuild.GitCommit,
487
						GoVersion:  &mockBuild.GoVersion,
488
						Platform:   &mockBuild.Platform,
489
					},
490
				},
491
			},
492
			run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
493
				gomock.InOrder(
494
					md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1),
495
					mr.HostManager().Return(hostManager).Times(1),
496
					mh.Load(gomock.Any()).Return(nil, false).Times(1),
497
					mr.HostManager().Return(hostManager).Times(1),
498
					mh.Store(gomock.Any()).Do(func(host *resource.Host) {
499
						assert := assert.New(t)
500
						assert.Equal(host.ID, req.Host.Id)
501
						assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
502
						assert.Equal(host.Hostname, req.Host.Hostname)
503
						assert.Equal(host.IP, req.Host.Ip)
504
						assert.Equal(host.Port, req.Host.Port)
505
						assert.Equal(host.DownloadPort, req.Host.DownloadPort)
506
						assert.Equal(host.OS, req.Host.Os)
507
						assert.Equal(host.Platform, req.Host.Platform)
508
						assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
509
						assert.Equal(host.KernelVersion, req.Host.KernelVersion)
510
						assert.EqualValues(host.CPU, mockCPU)
511
						assert.EqualValues(host.Memory, mockMemory)
512
						assert.EqualValues(host.Network, mockNetwork)
513
						assert.EqualValues(host.Disk, mockDisk)
514
						assert.EqualValues(host.Build, mockBuild)
515
						assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
516
						assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
517
						assert.Equal(host.UploadCount.Load(), int64(0))
518
						assert.Equal(host.UploadFailedCount.Load(), int64(0))
519
						assert.NotNil(host.Peers)
520
						assert.Equal(host.PeerCount.Load(), int32(0))
521
						assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
522
						assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
523
						assert.NotNil(host.Log)
524
					}).Return().Times(1),
525
				)
526

527
				assert := assert.New(t)
528
				assert.NoError(svc.AnnounceHost(context.Background(), req))
529
			},
530
		},
531
		{
532
			name: "host not found and dynconfig returns error",
533
			req: &schedulerv2.AnnounceHostRequest{
534
				Host: &commonv2.Host{
535
					Id:              mockHostID,
536
					Type:            uint32(pkgtypes.HostTypeNormal),
537
					Hostname:        "hostname",
538
					Ip:              "127.0.0.1",
539
					Port:            8003,
540
					DownloadPort:    8001,
541
					Os:              "darwin",
542
					Platform:        "darwin",
543
					PlatformFamily:  "Standalone Workstation",
544
					PlatformVersion: "11.1",
545
					KernelVersion:   "20.2.0",
546
					Cpu: &commonv2.CPU{
547
						LogicalCount:   mockCPU.LogicalCount,
548
						PhysicalCount:  mockCPU.PhysicalCount,
549
						Percent:        mockCPU.Percent,
550
						ProcessPercent: mockCPU.ProcessPercent,
551
						Times: &commonv2.CPUTimes{
552
							User:      mockCPU.Times.User,
553
							System:    mockCPU.Times.System,
554
							Idle:      mockCPU.Times.Idle,
555
							Nice:      mockCPU.Times.Nice,
556
							Iowait:    mockCPU.Times.Iowait,
557
							Irq:       mockCPU.Times.Irq,
558
							Softirq:   mockCPU.Times.Softirq,
559
							Steal:     mockCPU.Times.Steal,
560
							Guest:     mockCPU.Times.Guest,
561
							GuestNice: mockCPU.Times.GuestNice,
562
						},
563
					},
564
					Memory: &commonv2.Memory{
565
						Total:              mockMemory.Total,
566
						Available:          mockMemory.Available,
567
						Used:               mockMemory.Used,
568
						UsedPercent:        mockMemory.UsedPercent,
569
						ProcessUsedPercent: mockMemory.ProcessUsedPercent,
570
						Free:               mockMemory.Free,
571
					},
572
					Network: &commonv2.Network{
573
						TcpConnectionCount:       mockNetwork.TCPConnectionCount,
574
						UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
575
						Location:                 &mockNetwork.Location,
576
						Idc:                      &mockNetwork.IDC,
577
					},
578
					Disk: &commonv2.Disk{
579
						Total:             mockDisk.Total,
580
						Free:              mockDisk.Free,
581
						Used:              mockDisk.Used,
582
						UsedPercent:       mockDisk.UsedPercent,
583
						InodesTotal:       mockDisk.InodesTotal,
584
						InodesUsed:        mockDisk.InodesUsed,
585
						InodesFree:        mockDisk.InodesFree,
586
						InodesUsedPercent: mockDisk.InodesUsedPercent,
587
					},
588
					Build: &commonv2.Build{
589
						GitVersion: mockBuild.GitVersion,
590
						GitCommit:  &mockBuild.GitCommit,
591
						GoVersion:  &mockBuild.GoVersion,
592
						Platform:   &mockBuild.Platform,
593
					},
594
				},
595
			},
596
			run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
597
				gomock.InOrder(
598
					md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1),
599
					mr.HostManager().Return(hostManager).Times(1),
600
					mh.Load(gomock.Any()).Return(nil, false).Times(1),
601
					mr.HostManager().Return(hostManager).Times(1),
602
					mh.Store(gomock.Any()).Do(func(host *resource.Host) {
603
						assert := assert.New(t)
604
						assert.Equal(host.ID, req.Host.Id)
605
						assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
606
						assert.Equal(host.Hostname, req.Host.Hostname)
607
						assert.Equal(host.IP, req.Host.Ip)
608
						assert.Equal(host.Port, req.Host.Port)
609
						assert.Equal(host.DownloadPort, req.Host.DownloadPort)
610
						assert.Equal(host.OS, req.Host.Os)
611
						assert.Equal(host.Platform, req.Host.Platform)
612
						assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
613
						assert.Equal(host.KernelVersion, req.Host.KernelVersion)
614
						assert.EqualValues(host.CPU, mockCPU)
615
						assert.EqualValues(host.Memory, mockMemory)
616
						assert.EqualValues(host.Network, mockNetwork)
617
						assert.EqualValues(host.Disk, mockDisk)
618
						assert.EqualValues(host.Build, mockBuild)
619
						assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
620
						assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
621
						assert.Equal(host.UploadCount.Load(), int64(0))
622
						assert.Equal(host.UploadFailedCount.Load(), int64(0))
623
						assert.NotNil(host.Peers)
624
						assert.Equal(host.PeerCount.Load(), int32(0))
625
						assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
626
						assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
627
						assert.NotNil(host.Log)
628
					}).Return().Times(1),
629
				)
630

631
				assert := assert.New(t)
632
				assert.NoError(svc.AnnounceHost(context.Background(), req))
633
			},
634
		},
635
		{
636
			name: "host already exists",
637
			req: &schedulerv2.AnnounceHostRequest{
638
				Host: &commonv2.Host{
639
					Id:              mockHostID,
640
					Type:            uint32(pkgtypes.HostTypeNormal),
641
					Hostname:        "foo",
642
					Ip:              "127.0.0.1",
643
					Port:            8003,
644
					DownloadPort:    8001,
645
					Os:              "darwin",
646
					Platform:        "darwin",
647
					PlatformFamily:  "Standalone Workstation",
648
					PlatformVersion: "11.1",
649
					KernelVersion:   "20.2.0",
650
					Cpu: &commonv2.CPU{
651
						LogicalCount:   mockCPU.LogicalCount,
652
						PhysicalCount:  mockCPU.PhysicalCount,
653
						Percent:        mockCPU.Percent,
654
						ProcessPercent: mockCPU.ProcessPercent,
655
						Times: &commonv2.CPUTimes{
656
							User:      mockCPU.Times.User,
657
							System:    mockCPU.Times.System,
658
							Idle:      mockCPU.Times.Idle,
659
							Nice:      mockCPU.Times.Nice,
660
							Iowait:    mockCPU.Times.Iowait,
661
							Irq:       mockCPU.Times.Irq,
662
							Softirq:   mockCPU.Times.Softirq,
663
							Steal:     mockCPU.Times.Steal,
664
							Guest:     mockCPU.Times.Guest,
665
							GuestNice: mockCPU.Times.GuestNice,
666
						},
667
					},
668
					Memory: &commonv2.Memory{
669
						Total:              mockMemory.Total,
670
						Available:          mockMemory.Available,
671
						Used:               mockMemory.Used,
672
						UsedPercent:        mockMemory.UsedPercent,
673
						ProcessUsedPercent: mockMemory.ProcessUsedPercent,
674
						Free:               mockMemory.Free,
675
					},
676
					Network: &commonv2.Network{
677
						TcpConnectionCount:       mockNetwork.TCPConnectionCount,
678
						UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
679
						Location:                 &mockNetwork.Location,
680
						Idc:                      &mockNetwork.IDC,
681
					},
682
					Disk: &commonv2.Disk{
683
						Total:             mockDisk.Total,
684
						Free:              mockDisk.Free,
685
						Used:              mockDisk.Used,
686
						UsedPercent:       mockDisk.UsedPercent,
687
						InodesTotal:       mockDisk.InodesTotal,
688
						InodesUsed:        mockDisk.InodesUsed,
689
						InodesFree:        mockDisk.InodesFree,
690
						InodesUsedPercent: mockDisk.InodesUsedPercent,
691
					},
692
					Build: &commonv2.Build{
693
						GitVersion: mockBuild.GitVersion,
694
						GitCommit:  &mockBuild.GitCommit,
695
						GoVersion:  &mockBuild.GoVersion,
696
						Platform:   &mockBuild.Platform,
697
					},
698
				},
699
			},
700
			run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
701
				gomock.InOrder(
702
					md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1),
703
					mr.HostManager().Return(hostManager).Times(1),
704
					mh.Load(gomock.Any()).Return(host, true).Times(1),
705
				)
706

707
				assert := assert.New(t)
708
				assert.NoError(svc.AnnounceHost(context.Background(), req))
709
				assert.Equal(host.ID, req.Host.Id)
710
				assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
711
				assert.Equal(host.Hostname, req.Host.Hostname)
712
				assert.Equal(host.IP, req.Host.Ip)
713
				assert.Equal(host.Port, req.Host.Port)
714
				assert.Equal(host.DownloadPort, req.Host.DownloadPort)
715
				assert.Equal(host.OS, req.Host.Os)
716
				assert.Equal(host.Platform, req.Host.Platform)
717
				assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
718
				assert.Equal(host.KernelVersion, req.Host.KernelVersion)
719
				assert.EqualValues(host.CPU, mockCPU)
720
				assert.EqualValues(host.Memory, mockMemory)
721
				assert.EqualValues(host.Network, mockNetwork)
722
				assert.EqualValues(host.Disk, mockDisk)
723
				assert.EqualValues(host.Build, mockBuild)
724
				assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
725
				assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
726
				assert.Equal(host.UploadCount.Load(), int64(0))
727
				assert.Equal(host.UploadFailedCount.Load(), int64(0))
728
				assert.NotNil(host.Peers)
729
				assert.Equal(host.PeerCount.Load(), int32(0))
730
				assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
731
				assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
732
				assert.NotNil(host.Log)
733
			},
734
		},
735
		{
736
			name: "host already exists and dynconfig returns error",
737
			req: &schedulerv2.AnnounceHostRequest{
738
				Host: &commonv2.Host{
739
					Id:              mockHostID,
740
					Type:            uint32(pkgtypes.HostTypeNormal),
741
					Hostname:        "foo",
742
					Ip:              "127.0.0.1",
743
					Port:            8003,
744
					DownloadPort:    8001,
745
					Os:              "darwin",
746
					Platform:        "darwin",
747
					PlatformFamily:  "Standalone Workstation",
748
					PlatformVersion: "11.1",
749
					KernelVersion:   "20.2.0",
750
					Cpu: &commonv2.CPU{
751
						LogicalCount:   mockCPU.LogicalCount,
752
						PhysicalCount:  mockCPU.PhysicalCount,
753
						Percent:        mockCPU.Percent,
754
						ProcessPercent: mockCPU.ProcessPercent,
755
						Times: &commonv2.CPUTimes{
756
							User:      mockCPU.Times.User,
757
							System:    mockCPU.Times.System,
758
							Idle:      mockCPU.Times.Idle,
759
							Nice:      mockCPU.Times.Nice,
760
							Iowait:    mockCPU.Times.Iowait,
761
							Irq:       mockCPU.Times.Irq,
762
							Softirq:   mockCPU.Times.Softirq,
763
							Steal:     mockCPU.Times.Steal,
764
							Guest:     mockCPU.Times.Guest,
765
							GuestNice: mockCPU.Times.GuestNice,
766
						},
767
					},
768
					Memory: &commonv2.Memory{
769
						Total:              mockMemory.Total,
770
						Available:          mockMemory.Available,
771
						Used:               mockMemory.Used,
772
						UsedPercent:        mockMemory.UsedPercent,
773
						ProcessUsedPercent: mockMemory.ProcessUsedPercent,
774
						Free:               mockMemory.Free,
775
					},
776
					Network: &commonv2.Network{
777
						TcpConnectionCount:       mockNetwork.TCPConnectionCount,
778
						UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
779
						Location:                 &mockNetwork.Location,
780
						Idc:                      &mockNetwork.IDC,
781
					},
782
					Disk: &commonv2.Disk{
783
						Total:             mockDisk.Total,
784
						Free:              mockDisk.Free,
785
						Used:              mockDisk.Used,
786
						UsedPercent:       mockDisk.UsedPercent,
787
						InodesTotal:       mockDisk.InodesTotal,
788
						InodesUsed:        mockDisk.InodesUsed,
789
						InodesFree:        mockDisk.InodesFree,
790
						InodesUsedPercent: mockDisk.InodesUsedPercent,
791
					},
792
					Build: &commonv2.Build{
793
						GitVersion: mockBuild.GitVersion,
794
						GitCommit:  &mockBuild.GitCommit,
795
						GoVersion:  &mockBuild.GoVersion,
796
						Platform:   &mockBuild.Platform,
797
					},
798
				},
799
			},
800
			run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
801
				gomock.InOrder(
802
					md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1),
803
					mr.HostManager().Return(hostManager).Times(1),
804
					mh.Load(gomock.Any()).Return(host, true).Times(1),
805
				)
806

807
				assert := assert.New(t)
808
				assert.NoError(svc.AnnounceHost(context.Background(), req))
809
				assert.Equal(host.ID, req.Host.Id)
810
				assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
811
				assert.Equal(host.Hostname, req.Host.Hostname)
812
				assert.Equal(host.IP, req.Host.Ip)
813
				assert.Equal(host.Port, req.Host.Port)
814
				assert.Equal(host.DownloadPort, req.Host.DownloadPort)
815
				assert.Equal(host.OS, req.Host.Os)
816
				assert.Equal(host.Platform, req.Host.Platform)
817
				assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
818
				assert.Equal(host.KernelVersion, req.Host.KernelVersion)
819
				assert.EqualValues(host.CPU, mockCPU)
820
				assert.EqualValues(host.Memory, mockMemory)
821
				assert.EqualValues(host.Network, mockNetwork)
822
				assert.EqualValues(host.Disk, mockDisk)
823
				assert.EqualValues(host.Build, mockBuild)
824
				assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
825
				assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
826
				assert.Equal(host.UploadCount.Load(), int64(0))
827
				assert.Equal(host.UploadFailedCount.Load(), int64(0))
828
				assert.NotNil(host.Peers)
829
				assert.Equal(host.PeerCount.Load(), int32(0))
830
				assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
831
				assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
832
				assert.NotNil(host.Log)
833
			},
834
		},
835
	}
836

837
	for _, tc := range tests {
838
		t.Run(tc.name, func(t *testing.T) {
839
			ctl := gomock.NewController(t)
840
			defer ctl.Finish()
841
			scheduling := schedulingmocks.NewMockScheduling(ctl)
842
			res := resource.NewMockResource(ctl)
843
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
844
			storage := storagemocks.NewMockStorage(ctl)
845
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
846
			hostManager := resource.NewMockHostManager(ctl)
847
			host := resource.NewHost(
848
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
849
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
850
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
851

852
			tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
853
		})
854
	}
855
}
856

857
func TestServiceV2_LeaveHost(t *testing.T) {
858
	tests := []struct {
859
		name   string
860
		mock   func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder)
861
		expect func(t *testing.T, peer *resource.Peer, err error)
862
	}{
863
		{
864
			name: "host not found",
865
			mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
866
				gomock.InOrder(
867
					mr.HostManager().Return(hostManager).Times(1),
868
					mh.Load(gomock.Any()).Return(nil, false).Times(1),
869
				)
870
			},
871
			expect: func(t *testing.T, peer *resource.Peer, err error) {
872
				assert := assert.New(t)
873
				assert.Error(err)
874
			},
875
		},
876
		{
877
			name: "host has not peers",
878
			mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
879
				gomock.InOrder(
880
					mr.HostManager().Return(hostManager).Times(1),
881
					mh.Load(gomock.Any()).Return(host, true).Times(1),
882
					mnt.DeleteHost(host.ID).Return(nil).Times(1),
883
				)
884
			},
885
			expect: func(t *testing.T, peer *resource.Peer, err error) {
886
				assert := assert.New(t)
887
				assert.NoError(err)
888
			},
889
		},
890
		{
891
			name: "peer leaves succeeded",
892
			mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
893
				host.Peers.Store(mockPeer.ID, mockPeer)
894
				mockPeer.FSM.SetState(resource.PeerStatePending)
895
				gomock.InOrder(
896
					mr.HostManager().Return(hostManager).Times(1),
897
					mh.Load(gomock.Any()).Return(host, true).Times(1),
898
					mnt.DeleteHost(host.ID).Return(nil).Times(1),
899
				)
900
			},
901
			expect: func(t *testing.T, peer *resource.Peer, err error) {
902
				assert := assert.New(t)
903
				assert.NoError(err)
904
				assert.Equal(peer.FSM.Current(), resource.PeerStateLeave)
905
			},
906
		},
907
	}
908

909
	for _, tc := range tests {
910
		t.Run(tc.name, func(t *testing.T) {
911
			ctl := gomock.NewController(t)
912
			defer ctl.Finish()
913
			scheduling := schedulingmocks.NewMockScheduling(ctl)
914
			res := resource.NewMockResource(ctl)
915
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
916
			storage := storagemocks.NewMockStorage(ctl)
917
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
918
			hostManager := resource.NewMockHostManager(ctl)
919
			host := resource.NewHost(
920
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
921
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
922
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
923
			mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
924
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
925

926
			tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT())
927
			tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID}))
928
		})
929
	}
930
}
931

932
func TestServiceV2_SyncProbes(t *testing.T) {
933
	tests := []struct {
934
		name string
935
		mock func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
936
			mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
937
			ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder)
938
		expect func(t *testing.T, err error)
939
	}{
940
		{
941
			name: "network topology is not enabled",
942
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
943
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
944
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
945
				svc.networkTopology = nil
946
			},
947
			expect: func(t *testing.T, err error) {
948
				assert := assert.New(t)
949
				assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled")
950
			},
951
		},
952
		{
953
			name: "synchronize probes when receive ProbeStartedRequest",
954
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
955
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
956
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
957
				gomock.InOrder(
958
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
959
						Host: &commonv2.Host{
960
							Id:              mockSeedHostID,
961
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
962
							Hostname:        "bar",
963
							Ip:              "127.0.0.1",
964
							Port:            8003,
965
							DownloadPort:    8001,
966
							Os:              "darwin",
967
							Platform:        "darwin",
968
							PlatformFamily:  "Standalone Workstation",
969
							PlatformVersion: "11.1",
970
							KernelVersion:   "20.2.0",
971
							Cpu:             mockV2Probe.Host.Cpu,
972
							Memory:          mockV2Probe.Host.Memory,
973
							Network:         mockV2Probe.Host.Network,
974
							Disk:            mockV2Probe.Host.Disk,
975
							Build:           mockV2Probe.Host.Build,
976
						},
977
						Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
978
							ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
979
						},
980
					}, nil).Times(1),
981
					mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
982
					ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
983
						Hosts: []*commonv2.Host{
984
							{
985
								Id:              mockRawHost.ID,
986
								Type:            uint32(mockRawHost.Type),
987
								Hostname:        mockRawHost.Hostname,
988
								Ip:              mockRawHost.IP,
989
								Port:            mockRawHost.Port,
990
								DownloadPort:    mockRawHost.DownloadPort,
991
								Os:              mockRawHost.OS,
992
								Platform:        mockRawHost.Platform,
993
								PlatformFamily:  mockRawHost.PlatformFamily,
994
								PlatformVersion: mockRawHost.PlatformVersion,
995
								KernelVersion:   mockRawHost.KernelVersion,
996
								Cpu:             mockV2Probe.Host.Cpu,
997
								Memory:          mockV2Probe.Host.Memory,
998
								Network:         mockV2Probe.Host.Network,
999
								Disk:            mockV2Probe.Host.Disk,
1000
								Build:           mockV2Probe.Host.Build,
1001
							},
1002
						},
1003
					})).Return(nil).Times(1),
1004
					ms.Recv().Return(nil, io.EOF).Times(1),
1005
				)
1006
			},
1007
			expect: func(t *testing.T, err error) {
1008
				assert := assert.New(t)
1009
				assert.NoError(err)
1010
			},
1011
		},
1012
		{
1013
			name: "synchronize probes when receive ProbeFinishedRequest",
1014
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1015
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1016
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1017
				gomock.InOrder(
1018
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1019
						Host: &commonv2.Host{
1020
							Id:              mockSeedHostID,
1021
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1022
							Hostname:        "bar",
1023
							Ip:              "127.0.0.1",
1024
							Port:            8003,
1025
							DownloadPort:    8001,
1026
							Os:              "darwin",
1027
							Platform:        "darwin",
1028
							PlatformFamily:  "Standalone Workstation",
1029
							PlatformVersion: "11.1",
1030
							KernelVersion:   "20.2.0",
1031
							Cpu:             mockV2Probe.Host.Cpu,
1032
							Memory:          mockV2Probe.Host.Memory,
1033
							Network:         mockV2Probe.Host.Network,
1034
							Disk:            mockV2Probe.Host.Disk,
1035
							Build:           mockV2Probe.Host.Build,
1036
						},
1037
						Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1038
							ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1039
								Probes: []*schedulerv2.Probe{mockV2Probe},
1040
							},
1041
						},
1042
					}, nil).Times(1),
1043
					mr.HostManager().Return(hostManager).Times(1),
1044
					mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1045
					mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
1046
					mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
1047
					mp.Enqueue(gomock.Eq(&networktopology.Probe{
1048
						Host:      &mockRawHost,
1049
						RTT:       mockV2Probe.Rtt.AsDuration(),
1050
						CreatedAt: mockV2Probe.CreatedAt.AsTime(),
1051
					})).Return(nil).Times(1),
1052
					ms.Recv().Return(nil, io.EOF).Times(1),
1053
				)
1054
			},
1055
			expect: func(t *testing.T, err error) {
1056
				assert := assert.New(t)
1057
				assert.NoError(err)
1058
			},
1059
		},
1060
		{
1061
			name: "synchronize probes when receive ProbeFailedRequest",
1062
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1063
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1064
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1065
				gomock.InOrder(
1066
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1067
						Host: &commonv2.Host{
1068
							Id:              mockSeedHostID,
1069
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1070
							Hostname:        "bar",
1071
							Ip:              "127.0.0.1",
1072
							Port:            8003,
1073
							DownloadPort:    8001,
1074
							Os:              "darwin",
1075
							Platform:        "darwin",
1076
							PlatformFamily:  "Standalone Workstation",
1077
							PlatformVersion: "11.1",
1078
							KernelVersion:   "20.2.0",
1079
							Cpu:             mockV2Probe.Host.Cpu,
1080
							Memory:          mockV2Probe.Host.Memory,
1081
							Network:         mockV2Probe.Host.Network,
1082
							Disk:            mockV2Probe.Host.Disk,
1083
							Build:           mockV2Probe.Host.Build,
1084
						},
1085
						Request: &schedulerv2.SyncProbesRequest_ProbeFailedRequest{
1086
							ProbeFailedRequest: &schedulerv2.ProbeFailedRequest{
1087
								Probes: []*schedulerv2.FailedProbe{
1088
									{
1089
										Host: &commonv2.Host{
1090
											Id:              mockRawHost.ID,
1091
											Type:            uint32(mockRawHost.Type),
1092
											Hostname:        mockRawHost.Hostname,
1093
											Ip:              mockRawHost.IP,
1094
											Port:            mockRawHost.Port,
1095
											DownloadPort:    mockRawHost.DownloadPort,
1096
											Os:              mockRawHost.OS,
1097
											Platform:        mockRawHost.Platform,
1098
											PlatformFamily:  mockRawHost.PlatformFamily,
1099
											PlatformVersion: mockRawHost.PlatformVersion,
1100
											KernelVersion:   mockRawHost.KernelVersion,
1101
											Cpu:             mockV2Probe.Host.Cpu,
1102
											Memory:          mockV2Probe.Host.Memory,
1103
											Network:         mockV2Probe.Host.Network,
1104
											Disk:            mockV2Probe.Host.Disk,
1105
											Build:           mockV2Probe.Host.Build,
1106
										},
1107
									},
1108
								},
1109
							},
1110
						},
1111
					}, nil).Times(1),
1112
					ms.Recv().Return(nil, io.EOF).Times(1),
1113
				)
1114
			},
1115
			expect: func(t *testing.T, err error) {
1116
				assert := assert.New(t)
1117
				assert.NoError(err)
1118
			},
1119
		},
1120
		{
1121
			name: "synchronize probes when receive fail type request",
1122
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1123
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1124
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1125
				ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1126
					Host: &commonv2.Host{
1127
						Id:              mockSeedHostID,
1128
						Type:            uint32(pkgtypes.HostTypeSuperSeed),
1129
						Hostname:        "bar",
1130
						Ip:              "127.0.0.1",
1131
						Port:            8003,
1132
						DownloadPort:    8001,
1133
						Os:              "darwin",
1134
						Platform:        "darwin",
1135
						PlatformFamily:  "Standalone Workstation",
1136
						PlatformVersion: "11.1",
1137
						KernelVersion:   "20.2.0",
1138
						Cpu:             mockV2Probe.Host.Cpu,
1139
						Memory:          mockV2Probe.Host.Memory,
1140
						Network:         mockV2Probe.Host.Network,
1141
						Disk:            mockV2Probe.Host.Disk,
1142
						Build:           mockV2Probe.Host.Build,
1143
					},
1144
					Request: nil,
1145
				}, nil).Times(1)
1146
			},
1147
			expect: func(t *testing.T, err error) {
1148
				assert := assert.New(t)
1149
				assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: <nil>")
1150
			},
1151
		},
1152
		{
1153
			name: "receive error",
1154
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1155
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1156
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1157
				ms.Recv().Return(nil, errors.New("receive error")).Times(1)
1158
			},
1159
			expect: func(t *testing.T, err error) {
1160
				assert := assert.New(t)
1161
				assert.EqualError(err, "receive error")
1162
			},
1163
		},
1164
		{
1165
			name: "receive end of file",
1166
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1167
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1168
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1169
				ms.Recv().Return(nil, io.EOF).Times(1)
1170
			},
1171
			expect: func(t *testing.T, err error) {
1172
				assert := assert.New(t)
1173
				assert.NoError(err)
1174
			},
1175
		},
1176
		{
1177
			name: "find probed host ids error",
1178
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1179
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1180
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1181
				gomock.InOrder(
1182
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1183
						Host: &commonv2.Host{
1184
							Id:              mockSeedHostID,
1185
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1186
							Hostname:        "bar",
1187
							Ip:              "127.0.0.1",
1188
							Port:            8003,
1189
							DownloadPort:    8001,
1190
							Os:              "darwin",
1191
							Platform:        "darwin",
1192
							PlatformFamily:  "Standalone Workstation",
1193
							PlatformVersion: "11.1",
1194
							KernelVersion:   "20.2.0",
1195
							Cpu:             mockV2Probe.Host.Cpu,
1196
							Memory:          mockV2Probe.Host.Memory,
1197
							Network:         mockV2Probe.Host.Network,
1198
							Disk:            mockV2Probe.Host.Disk,
1199
							Build:           mockV2Probe.Host.Build,
1200
						},
1201
						Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
1202
							ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
1203
						},
1204
					}, nil).Times(1),
1205
					mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
1206
				)
1207
			},
1208
			expect: func(t *testing.T, err error) {
1209
				assert := assert.New(t)
1210
				assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
1211
			},
1212
		},
1213
		{
1214
			name: "send synchronize probes response error",
1215
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1216
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1217
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1218
				gomock.InOrder(
1219
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1220
						Host: &commonv2.Host{
1221
							Id:              mockSeedHostID,
1222
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1223
							Hostname:        "bar",
1224
							Ip:              "127.0.0.1",
1225
							Port:            8003,
1226
							DownloadPort:    8001,
1227
							Os:              "darwin",
1228
							Platform:        "darwin",
1229
							PlatformFamily:  "Standalone Workstation",
1230
							PlatformVersion: "11.1",
1231
							KernelVersion:   "20.2.0",
1232
							Cpu:             mockV2Probe.Host.Cpu,
1233
							Memory:          mockV2Probe.Host.Memory,
1234
							Network:         mockV2Probe.Host.Network,
1235
							Disk:            mockV2Probe.Host.Disk,
1236
							Build:           mockV2Probe.Host.Build,
1237
						},
1238
						Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
1239
							ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
1240
						},
1241
					}, nil).Times(1),
1242
					mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
1243
					ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
1244
						Hosts: []*commonv2.Host{
1245
							{
1246
								Id:              mockRawHost.ID,
1247
								Type:            uint32(mockRawHost.Type),
1248
								Hostname:        mockRawHost.Hostname,
1249
								Ip:              mockRawHost.IP,
1250
								Port:            mockRawHost.Port,
1251
								DownloadPort:    mockRawHost.DownloadPort,
1252
								Os:              mockRawHost.OS,
1253
								Platform:        mockRawHost.Platform,
1254
								PlatformFamily:  mockRawHost.PlatformFamily,
1255
								PlatformVersion: mockRawHost.PlatformVersion,
1256
								KernelVersion:   mockRawHost.KernelVersion,
1257
								Cpu:             mockV2Probe.Host.Cpu,
1258
								Memory:          mockV2Probe.Host.Memory,
1259
								Network:         mockV2Probe.Host.Network,
1260
								Disk:            mockV2Probe.Host.Disk,
1261
								Build:           mockV2Probe.Host.Build,
1262
							},
1263
						},
1264
					})).Return(errors.New("send synchronize probes response error")).Times(1),
1265
				)
1266
			},
1267
			expect: func(t *testing.T, err error) {
1268
				assert := assert.New(t)
1269
				assert.EqualError(err, "send synchronize probes response error")
1270
			},
1271
		},
1272
		{
1273
			name: "load host error when receive ProbeFinishedRequest",
1274
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1275
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1276
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1277
				gomock.InOrder(
1278
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1279
						Host: &commonv2.Host{
1280
							Id:              mockSeedHostID,
1281
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1282
							Hostname:        "bar",
1283
							Ip:              "127.0.0.1",
1284
							Port:            8003,
1285
							DownloadPort:    8001,
1286
							Os:              "darwin",
1287
							Platform:        "darwin",
1288
							PlatformFamily:  "Standalone Workstation",
1289
							PlatformVersion: "11.1",
1290
							KernelVersion:   "20.2.0",
1291
							Cpu:             mockV2Probe.Host.Cpu,
1292
							Memory:          mockV2Probe.Host.Memory,
1293
							Network:         mockV2Probe.Host.Network,
1294
							Disk:            mockV2Probe.Host.Disk,
1295
							Build:           mockV2Probe.Host.Build,
1296
						},
1297
						Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1298
							ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1299
								Probes: []*schedulerv2.Probe{mockV2Probe},
1300
							},
1301
						},
1302
					}, nil).Times(1),
1303
					mr.HostManager().Return(hostManager).Times(1),
1304
					mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
1305
					ms.Recv().Return(nil, io.EOF).Times(1),
1306
				)
1307
			},
1308
			expect: func(t *testing.T, err error) {
1309
				assert := assert.New(t)
1310
				assert.NoError(err)
1311
			},
1312
		},
1313
		{
1314
			name: "store error when receive ProbeFinishedRequest",
1315
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1316
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1317
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1318
				gomock.InOrder(
1319
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1320
						Host: &commonv2.Host{
1321
							Id:              mockSeedHostID,
1322
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1323
							Hostname:        "bar",
1324
							Ip:              "127.0.0.1",
1325
							Port:            8003,
1326
							DownloadPort:    8001,
1327
							Os:              "darwin",
1328
							Platform:        "darwin",
1329
							PlatformFamily:  "Standalone Workstation",
1330
							PlatformVersion: "11.1",
1331
							KernelVersion:   "20.2.0",
1332
							Cpu:             mockV2Probe.Host.Cpu,
1333
							Memory:          mockV2Probe.Host.Memory,
1334
							Network:         mockV2Probe.Host.Network,
1335
							Disk:            mockV2Probe.Host.Disk,
1336
							Build:           mockV2Probe.Host.Build,
1337
						},
1338
						Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1339
							ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1340
								Probes: []*schedulerv2.Probe{mockV2Probe},
1341
							},
1342
						},
1343
					}, nil).Times(1),
1344
					mr.HostManager().Return(hostManager).Times(1),
1345
					mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1346
					mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1),
1347
					ms.Recv().Return(nil, io.EOF).Times(1),
1348
				)
1349
			},
1350
			expect: func(t *testing.T, err error) {
1351
				assert := assert.New(t)
1352
				assert.NoError(err)
1353
			},
1354
		},
1355
		{
1356
			name: "enqueue probe error when receive ProbeFinishedRequest",
1357
			mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1358
				mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1359
				ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1360
				gomock.InOrder(
1361
					ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1362
						Host: &commonv2.Host{
1363
							Id:              mockSeedHostID,
1364
							Type:            uint32(pkgtypes.HostTypeSuperSeed),
1365
							Hostname:        "bar",
1366
							Ip:              "127.0.0.1",
1367
							Port:            8003,
1368
							DownloadPort:    8001,
1369
							Os:              "darwin",
1370
							Platform:        "darwin",
1371
							PlatformFamily:  "Standalone Workstation",
1372
							PlatformVersion: "11.1",
1373
							KernelVersion:   "20.2.0",
1374
							Cpu:             mockV2Probe.Host.Cpu,
1375
							Memory:          mockV2Probe.Host.Memory,
1376
							Network:         mockV2Probe.Host.Network,
1377
							Disk:            mockV2Probe.Host.Disk,
1378
							Build:           mockV2Probe.Host.Build,
1379
						},
1380
						Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1381
							ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1382
								Probes: []*schedulerv2.Probe{mockV2Probe},
1383
							},
1384
						},
1385
					}, nil).Times(1),
1386
					mr.HostManager().Return(hostManager).Times(1),
1387
					mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1388
					mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
1389
					mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
1390
					mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1),
1391
					ms.Recv().Return(nil, io.EOF).Times(1),
1392
				)
1393
			},
1394
			expect: func(t *testing.T, err error) {
1395
				assert := assert.New(t)
1396
				assert.NoError(err)
1397
			},
1398
		},
1399
	}
1400

1401
	for _, tc := range tests {
1402
		t.Run(tc.name, func(t *testing.T) {
1403
			ctl := gomock.NewController(t)
1404
			defer ctl.Finish()
1405

1406
			scheduling := mocks.NewMockScheduling(ctl)
1407
			res := resource.NewMockResource(ctl)
1408
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1409
			storage := storagemocks.NewMockStorage(ctl)
1410
			probes := networktopologymocks.NewMockProbes(ctl)
1411
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1412
			hostManager := resource.NewMockHostManager(ctl)
1413
			stream := schedulerv2mocks.NewMockScheduler_SyncProbesServer(ctl)
1414
			svc := NewV2(&config.Config{Scheduler: config.SchedulerConfig{NetworkTopology: mockNetworkTopologyConfig}, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
1415

1416
			tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT())
1417
			tc.expect(t, svc.SyncProbes(stream))
1418
		})
1419
	}
1420
}
1421

1422
func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
1423
	dgst := mockTaskDigest.String()
1424

1425
	tests := []struct {
1426
		name string
1427
		req  *schedulerv2.RegisterPeerRequest
1428
		run  func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1429
			peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1430
			mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
1431
	}{
1432
		{
1433
			name: "host not found",
1434
			req:  &schedulerv2.RegisterPeerRequest{},
1435
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1436
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1437
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1438
				gomock.InOrder(
1439
					mr.HostManager().Return(hostManager).Times(1),
1440
					mh.Load(gomock.Eq(peer.Host.ID)).Return(nil, false).Times(1),
1441
				)
1442

1443
				assert := assert.New(t)
1444
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1445
					status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID))
1446
			},
1447
		},
1448
		{
1449
			name: "can not found available peer and download task failed",
1450
			req: &schedulerv2.RegisterPeerRequest{
1451
				Download: &commonv2.Download{
1452
					Digest: &dgst,
1453
				},
1454
			},
1455
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1456
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1457
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1458
				gomock.InOrder(
1459
					mr.HostManager().Return(hostManager).Times(1),
1460
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1461
					mr.TaskManager().Return(taskManager).Times(1),
1462
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1463
					mr.PeerManager().Return(peerManager).Times(1),
1464
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1465
				)
1466

1467
				peer.Priority = commonv2.Priority_LEVEL1
1468

1469
				assert := assert.New(t)
1470
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1471
					status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
1472
			},
1473
		},
1474
		{
1475
			name: "task state is TaskStateFailed and download task failed",
1476
			req: &schedulerv2.RegisterPeerRequest{
1477
				Download: &commonv2.Download{
1478
					Digest: &dgst,
1479
				},
1480
			},
1481
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1482
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1483
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1484
				gomock.InOrder(
1485
					mr.HostManager().Return(hostManager).Times(1),
1486
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1487
					mr.TaskManager().Return(taskManager).Times(1),
1488
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1489
					mr.PeerManager().Return(peerManager).Times(1),
1490
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1491
				)
1492

1493
				peer.Priority = commonv2.Priority_LEVEL1
1494
				peer.Task.FSM.SetState(resource.TaskStateFailed)
1495
				peer.Task.StorePeer(peer)
1496
				peer.Task.StorePeer(seedPeer)
1497
				seedPeer.FSM.SetState(resource.PeerStateRunning)
1498

1499
				assert := assert.New(t)
1500
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1501
					status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
1502
			},
1503
		},
1504
		{
1505
			name: "size scope is SizeScope_EMPTY and load AnnouncePeerStream failed",
1506
			req: &schedulerv2.RegisterPeerRequest{
1507
				Download: &commonv2.Download{
1508
					Digest: &dgst,
1509
				},
1510
			},
1511
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1512
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1513
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1514
				gomock.InOrder(
1515
					mr.HostManager().Return(hostManager).Times(1),
1516
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1517
					mr.TaskManager().Return(taskManager).Times(1),
1518
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1519
					mr.PeerManager().Return(peerManager).Times(1),
1520
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1521
				)
1522

1523
				peer.Task.ContentLength.Store(0)
1524
				peer.Priority = commonv2.Priority_LEVEL6
1525

1526
				assert := assert.New(t)
1527
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1528
					status.Error(codes.NotFound, "AnnouncePeerStream not found"))
1529
				assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
1530
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1531
			},
1532
		},
1533
		{
1534
			name: "size scope is SizeScope_EMPTY and event PeerEventRegisterEmpty failed",
1535
			req: &schedulerv2.RegisterPeerRequest{
1536
				Download: &commonv2.Download{
1537
					Digest: &dgst,
1538
				},
1539
			},
1540
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1541
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1542
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1543
				gomock.InOrder(
1544
					mr.HostManager().Return(hostManager).Times(1),
1545
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1546
					mr.TaskManager().Return(taskManager).Times(1),
1547
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1548
					mr.PeerManager().Return(peerManager).Times(1),
1549
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1550
				)
1551

1552
				peer.Task.ContentLength.Store(0)
1553
				peer.Priority = commonv2.Priority_LEVEL6
1554
				peer.StoreAnnouncePeerStream(stream)
1555
				peer.FSM.SetState(resource.PeerStateReceivedEmpty)
1556

1557
				assert := assert.New(t)
1558
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1559
					status.Errorf(codes.Internal, "event RegisterEmpty inappropriate in current state ReceivedEmpty"))
1560
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1561
			},
1562
		},
1563
		{
1564
			name: "size scope is SizeScope_EMPTY and send EmptyTaskResponse failed",
1565
			req: &schedulerv2.RegisterPeerRequest{
1566
				Download: &commonv2.Download{
1567
					Digest: &dgst,
1568
				},
1569
			},
1570
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1571
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1572
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1573
				gomock.InOrder(
1574
					mr.HostManager().Return(hostManager).Times(1),
1575
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1576
					mr.TaskManager().Return(taskManager).Times(1),
1577
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1578
					mr.PeerManager().Return(peerManager).Times(1),
1579
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1580
					ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
1581
						Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{
1582
							EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},
1583
						},
1584
					})).Return(errors.New("foo")).Times(1),
1585
				)
1586

1587
				peer.Task.ContentLength.Store(0)
1588
				peer.Priority = commonv2.Priority_LEVEL6
1589
				peer.StoreAnnouncePeerStream(stream)
1590

1591
				assert := assert.New(t)
1592
				assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1593
					status.Errorf(codes.Internal, "foo"))
1594
				assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
1595
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1596
			},
1597
		},
1598
		{
1599
			name: "size scope is SizeScope_NORMAL and need back-to-source",
1600
			req: &schedulerv2.RegisterPeerRequest{
1601
				Download: &commonv2.Download{
1602
					Digest:           &dgst,
1603
					NeedBackToSource: true,
1604
				},
1605
			},
1606
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1607
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1608
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1609
				gomock.InOrder(
1610
					mr.HostManager().Return(hostManager).Times(1),
1611
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1612
					mr.TaskManager().Return(taskManager).Times(1),
1613
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1614
					mr.PeerManager().Return(peerManager).Times(1),
1615
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1616
					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1617
				)
1618

1619
				peer.Task.ContentLength.Store(129)
1620
				peer.Task.TotalPieceCount.Store(2)
1621
				peer.Task.StorePeer(peer)
1622
				peer.Task.StorePeer(seedPeer)
1623
				peer.Priority = commonv2.Priority_LEVEL6
1624
				peer.NeedBackToSource.Store(true)
1625
				peer.StoreAnnouncePeerStream(stream)
1626

1627
				assert := assert.New(t)
1628
				assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1629
				assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1630
				assert.Equal(peer.NeedBackToSource.Load(), true)
1631
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1632
			},
1633
		},
1634
		{
1635
			name: "size scope is SizeScope_NORMAL",
1636
			req: &schedulerv2.RegisterPeerRequest{
1637
				Download: &commonv2.Download{
1638
					Digest: &dgst,
1639
				},
1640
			},
1641
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1642
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1643
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1644
				gomock.InOrder(
1645
					mr.HostManager().Return(hostManager).Times(1),
1646
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1647
					mr.TaskManager().Return(taskManager).Times(1),
1648
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1649
					mr.PeerManager().Return(peerManager).Times(1),
1650
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1651
					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1652
				)
1653

1654
				peer.Task.ContentLength.Store(129)
1655
				peer.Task.TotalPieceCount.Store(2)
1656
				peer.Task.StorePeer(peer)
1657
				peer.Task.StorePeer(seedPeer)
1658
				peer.Priority = commonv2.Priority_LEVEL6
1659
				peer.StoreAnnouncePeerStream(stream)
1660

1661
				assert := assert.New(t)
1662
				assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1663
				assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1664
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1665
			},
1666
		},
1667
		{
1668
			name: "size scope is SizeScope_UNKNOW",
1669
			req: &schedulerv2.RegisterPeerRequest{
1670
				Download: &commonv2.Download{
1671
					Digest: &dgst,
1672
				},
1673
			},
1674
			run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1675
				peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1676
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1677
				gomock.InOrder(
1678
					mr.HostManager().Return(hostManager).Times(1),
1679
					mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1680
					mr.TaskManager().Return(taskManager).Times(1),
1681
					mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1682
					mr.PeerManager().Return(peerManager).Times(1),
1683
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1684
					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1685
				)
1686

1687
				peer.Priority = commonv2.Priority_LEVEL6
1688

1689
				assert := assert.New(t)
1690
				assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1691
				assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1692
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1693
			},
1694
		},
1695
	}
1696

1697
	for _, tc := range tests {
1698
		t.Run(tc.name, func(t *testing.T) {
1699
			ctl := gomock.NewController(t)
1700
			defer ctl.Finish()
1701
			scheduling := schedulingmocks.NewMockScheduling(ctl)
1702
			res := resource.NewMockResource(ctl)
1703
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1704
			storage := storagemocks.NewMockStorage(ctl)
1705
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1706
			hostManager := resource.NewMockHostManager(ctl)
1707
			peerManager := resource.NewMockPeerManager(ctl)
1708
			taskManager := resource.NewMockTaskManager(ctl)
1709
			stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
1710

1711
			mockHost := resource.NewHost(
1712
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1713
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1714
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1715
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1716
			seedPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
1717
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1718

1719
			tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
1720
		})
1721
	}
1722
}
1723

1724
func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
1725
	tests := []struct {
1726
		name string
1727
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1728
	}{
1729
		{
1730
			name: "peer can not be loaded",
1731
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1732
				gomock.InOrder(
1733
					mr.PeerManager().Return(peerManager).Times(1),
1734
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1735
				)
1736

1737
				assert := assert.New(t)
1738
				assert.ErrorIs(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1739
			},
1740
		},
1741
		{
1742
			name: "peer state is PeerStateRunning",
1743
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1744
				gomock.InOrder(
1745
					mr.PeerManager().Return(peerManager).Times(1),
1746
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1747
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1748
				)
1749

1750
				peer.FSM.SetState(resource.PeerStateRunning)
1751

1752
				assert := assert.New(t)
1753
				assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1754
			},
1755
		},
1756
		{
1757
			name: "task state is TaskStateRunning",
1758
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1759
				gomock.InOrder(
1760
					mr.PeerManager().Return(peerManager).Times(1),
1761
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1762
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1763
				)
1764

1765
				peer.FSM.SetState(resource.PeerStateReceivedNormal)
1766
				peer.Task.FSM.SetState(resource.TaskStateRunning)
1767

1768
				assert := assert.New(t)
1769
				assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1770
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
1771
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1772
			},
1773
		},
1774
		{
1775
			name: "task state is TaskStatePending",
1776
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1777
				gomock.InOrder(
1778
					mr.PeerManager().Return(peerManager).Times(1),
1779
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1780
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1781
				)
1782

1783
				peer.FSM.SetState(resource.PeerStateReceivedNormal)
1784
				peer.Task.FSM.SetState(resource.TaskStatePending)
1785

1786
				assert := assert.New(t)
1787
				assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1788
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
1789
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1790
			},
1791
		},
1792
	}
1793

1794
	for _, tc := range tests {
1795
		t.Run(tc.name, func(t *testing.T) {
1796
			ctl := gomock.NewController(t)
1797
			defer ctl.Finish()
1798
			scheduling := schedulingmocks.NewMockScheduling(ctl)
1799
			res := resource.NewMockResource(ctl)
1800
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1801
			storage := storagemocks.NewMockStorage(ctl)
1802
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1803
			peerManager := resource.NewMockPeerManager(ctl)
1804

1805
			mockHost := resource.NewHost(
1806
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1807
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1808
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1809
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1810
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1811

1812
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
1813
		})
1814
	}
1815
}
1816

1817
func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
1818
	tests := []struct {
1819
		name string
1820
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1821
	}{
1822
		{
1823
			name: "peer can not be loaded",
1824
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1825
				gomock.InOrder(
1826
					mr.PeerManager().Return(peerManager).Times(1),
1827
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1828
				)
1829

1830
				assert := assert.New(t)
1831
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1832
			},
1833
		},
1834
		{
1835
			name: "peer state is PeerStateRunning",
1836
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1837
				gomock.InOrder(
1838
					mr.PeerManager().Return(peerManager).Times(1),
1839
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1840
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1841
				)
1842

1843
				peer.FSM.SetState(resource.PeerStateBackToSource)
1844

1845
				assert := assert.New(t)
1846
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadBackToSource inappropriate in current state BackToSource"))
1847
			},
1848
		},
1849
		{
1850
			name: "task state is TaskStateRunning",
1851
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1852
				gomock.InOrder(
1853
					mr.PeerManager().Return(peerManager).Times(1),
1854
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1855
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1856
				)
1857

1858
				peer.FSM.SetState(resource.PeerStateReceivedNormal)
1859
				peer.Task.FSM.SetState(resource.TaskStateRunning)
1860

1861
				assert := assert.New(t)
1862
				assert.NoError(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID))
1863
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
1864
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1865
			},
1866
		},
1867
		{
1868
			name: "task state is TaskStatePending",
1869
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1870
				gomock.InOrder(
1871
					mr.PeerManager().Return(peerManager).Times(1),
1872
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1873
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1874
				)
1875

1876
				peer.FSM.SetState(resource.PeerStateReceivedNormal)
1877
				peer.Task.FSM.SetState(resource.TaskStatePending)
1878

1879
				assert := assert.New(t)
1880
				assert.NoError(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID))
1881
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
1882
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1883
			},
1884
		},
1885
	}
1886

1887
	for _, tc := range tests {
1888
		t.Run(tc.name, func(t *testing.T) {
1889
			ctl := gomock.NewController(t)
1890
			defer ctl.Finish()
1891
			scheduling := schedulingmocks.NewMockScheduling(ctl)
1892
			res := resource.NewMockResource(ctl)
1893
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1894
			storage := storagemocks.NewMockStorage(ctl)
1895
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1896
			peerManager := resource.NewMockPeerManager(ctl)
1897

1898
			mockHost := resource.NewHost(
1899
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1900
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1901
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1902
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1903
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1904

1905
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
1906
		})
1907
	}
1908
}
1909

1910
func TestServiceV2_handleRescheduleRequest(t *testing.T) {
1911
	tests := []struct {
1912
		name string
1913
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1914
			mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
1915
	}{
1916
		{
1917
			name: "peer can not be loaded",
1918
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1919
				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1920
				gomock.InOrder(
1921
					mr.PeerManager().Return(peerManager).Times(1),
1922
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1923
				)
1924

1925
				assert := assert.New(t)
1926
				assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1927
			},
1928
		},
1929
		{
1930
			name: "reschedule failed",
1931
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1932
				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1933
				gomock.InOrder(
1934
					mr.PeerManager().Return(peerManager).Times(1),
1935
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1936
					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
1937
				)
1938

1939
				assert := assert.New(t)
1940
				assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Error(codes.FailedPrecondition, "foo"))
1941
			},
1942
		},
1943
		{
1944
			name: "reschedule succeeded",
1945
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1946
				mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1947
				gomock.InOrder(
1948
					mr.PeerManager().Return(peerManager).Times(1),
1949
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1950
					ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1951
				)
1952

1953
				assert := assert.New(t)
1954
				assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}))
1955
			},
1956
		},
1957
	}
1958

1959
	for _, tc := range tests {
1960
		t.Run(tc.name, func(t *testing.T) {
1961
			ctl := gomock.NewController(t)
1962
			defer ctl.Finish()
1963
			scheduling := schedulingmocks.NewMockScheduling(ctl)
1964
			res := resource.NewMockResource(ctl)
1965
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1966
			storage := storagemocks.NewMockStorage(ctl)
1967
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1968
			peerManager := resource.NewMockPeerManager(ctl)
1969

1970
			mockHost := resource.NewHost(
1971
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1972
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1973
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1974
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1975
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1976

1977
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
1978
		})
1979
	}
1980
}
1981

1982
func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
1983
	tests := []struct {
1984
		name string
1985
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1986
	}{
1987
		{
1988
			name: "peer can not be loaded",
1989
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1990
				gomock.InOrder(
1991
					mr.PeerManager().Return(peerManager).Times(1),
1992
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1993
				)
1994

1995
				assert := assert.New(t)
1996
				assert.ErrorIs(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1997
			},
1998
		},
1999
		{
2000
			name: "peer state is PeerStateSucceeded",
2001
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2002
				gomock.InOrder(
2003
					mr.PeerManager().Return(peerManager).Times(1),
2004
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2005
				)
2006

2007
				peer.FSM.SetState(resource.PeerStateSucceeded)
2008

2009
				assert := assert.New(t)
2010
				assert.ErrorIs(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Succeeded"))
2011
				assert.NotEqual(peer.Cost.Load(), 0)
2012
			},
2013
		},
2014
		{
2015
			name: "peer state is PeerStateRunning",
2016
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2017
				gomock.InOrder(
2018
					mr.PeerManager().Return(peerManager).Times(1),
2019
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2020
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2021
				)
2022

2023
				peer.FSM.SetState(resource.PeerStateRunning)
2024

2025
				assert := assert.New(t)
2026
				assert.NoError(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID))
2027
				assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2028
				assert.NotEqual(peer.Cost.Load(), 0)
2029
			},
2030
		},
2031
	}
2032

2033
	for _, tc := range tests {
2034
		t.Run(tc.name, func(t *testing.T) {
2035
			ctl := gomock.NewController(t)
2036
			defer ctl.Finish()
2037
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2038
			res := resource.NewMockResource(ctl)
2039
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2040
			storage := storagemocks.NewMockStorage(ctl)
2041
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2042
			peerManager := resource.NewMockPeerManager(ctl)
2043

2044
			mockHost := resource.NewHost(
2045
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2046
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2047
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2048
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2049
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2050

2051
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2052
		})
2053
	}
2054
}
2055

2056
func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
2057
	s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2058
		if _, err := w.Write([]byte{1}); err != nil {
2059
			w.WriteHeader(http.StatusInternalServerError)
2060
			return
2061
		}
2062

2063
		w.WriteHeader(http.StatusOK)
2064
	}))
2065
	defer s.Close()
2066

2067
	tests := []struct {
2068
		name string
2069
		req  *schedulerv2.DownloadPeerBackToSourceFinishedRequest
2070
		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2071
			mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2072
	}{
2073
		{
2074
			name: "peer can not be loaded",
2075
			req:  &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2076
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2077
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2078
				gomock.InOrder(
2079
					mr.PeerManager().Return(peerManager).Times(1),
2080
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2081
				)
2082

2083
				assert := assert.New(t)
2084
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2085
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2086
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2087
				assert.Equal(len(peer.Task.DirectPiece), 0)
2088
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2089
			},
2090
		},
2091
		{
2092
			name: "peer state is PeerStateSucceeded",
2093
			req:  &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2094
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2095
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2096
				gomock.InOrder(
2097
					mr.PeerManager().Return(peerManager).Times(1),
2098
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2099
				)
2100

2101
				peer.FSM.SetState(resource.PeerStateSucceeded)
2102

2103
				assert := assert.New(t)
2104
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Succeeded"))
2105
				assert.NotEqual(peer.Cost.Load(), 0)
2106
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2107
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2108
				assert.Equal(len(peer.Task.DirectPiece), 0)
2109
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2110
			},
2111
		},
2112
		{
2113
			name: "peer has range",
2114
			req:  &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2115
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2116
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2117
				gomock.InOrder(
2118
					mr.PeerManager().Return(peerManager).Times(1),
2119
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2120
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2121
				)
2122

2123
				peer.FSM.SetState(resource.PeerStateRunning)
2124
				peer.Range = &nethttp.Range{}
2125

2126
				assert := assert.New(t)
2127
				assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2128
				assert.NotEqual(peer.Cost.Load(), 0)
2129
				assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2130
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2131
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2132
				assert.Equal(len(peer.Task.DirectPiece), 0)
2133
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2134
			},
2135
		},
2136
		{
2137
			name: "task state is TaskStateSucceeded",
2138
			req:  &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2139
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2140
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2141
				gomock.InOrder(
2142
					mr.PeerManager().Return(peerManager).Times(1),
2143
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2144
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2145
				)
2146

2147
				peer.FSM.SetState(resource.PeerStateRunning)
2148
				peer.Task.FSM.SetState(resource.TaskStateSucceeded)
2149

2150
				assert := assert.New(t)
2151
				assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2152
				assert.NotEqual(peer.Cost.Load(), 0)
2153
				assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2154
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2155
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2156
				assert.Equal(len(peer.Task.DirectPiece), 0)
2157
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
2158
			},
2159
		},
2160
		{
2161
			name: "task state is TaskStatePending",
2162
			req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
2163
				ContentLength: 1024,
2164
				PieceCount:    10,
2165
			},
2166
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2167
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2168
				gomock.InOrder(
2169
					mr.PeerManager().Return(peerManager).Times(1),
2170
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2171
				)
2172

2173
				peer.FSM.SetState(resource.PeerStateRunning)
2174
				peer.Task.FSM.SetState(resource.TaskStatePending)
2175

2176
				assert := assert.New(t)
2177
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Pending"))
2178
				assert.NotEqual(peer.Cost.Load(), 0)
2179
				assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2180
				assert.Equal(peer.Task.ContentLength.Load(), int64(1024))
2181
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(10))
2182
				assert.Equal(len(peer.Task.DirectPiece), 0)
2183
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2184
			},
2185
		},
2186
		{
2187
			name: "task state is TaskStateRunning",
2188
			req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
2189
				ContentLength: 1024,
2190
				PieceCount:    10,
2191
			},
2192
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2193
				mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2194
				gomock.InOrder(
2195
					mr.PeerManager().Return(peerManager).Times(1),
2196
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2197
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2198
				)
2199

2200
				peer.FSM.SetState(resource.PeerStateRunning)
2201
				peer.Task.FSM.SetState(resource.TaskStateRunning)
2202

2203
				assert := assert.New(t)
2204
				assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2205
				assert.NotEqual(peer.Cost.Load(), 0)
2206
				assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2207
				assert.Equal(peer.Task.ContentLength.Load(), int64(1024))
2208
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(10))
2209
				assert.Equal(len(peer.Task.DirectPiece), 0)
2210
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
2211
			},
2212
		},
2213
	}
2214

2215
	for _, tc := range tests {
2216
		t.Run(tc.name, func(t *testing.T) {
2217
			ctl := gomock.NewController(t)
2218
			defer ctl.Finish()
2219
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2220
			res := resource.NewMockResource(ctl)
2221
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2222
			storage := storagemocks.NewMockStorage(ctl)
2223
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2224
			peerManager := resource.NewMockPeerManager(ctl)
2225

2226
			url, err := url.Parse(s.URL)
2227
			if err != nil {
2228
				t.Fatal(err)
2229
			}
2230

2231
			ip, rawPort, err := net.SplitHostPort(url.Host)
2232
			if err != nil {
2233
				t.Fatal(err)
2234
			}
2235

2236
			port, err := strconv.ParseInt(rawPort, 10, 32)
2237
			if err != nil {
2238
				t.Fatal(err)
2239
			}
2240

2241
			mockHost := resource.NewHost(
2242
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2243
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2244
			mockHost.IP = ip
2245
			mockHost.DownloadPort = int32(port)
2246

2247
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2248
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2249
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2250

2251
			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2252
		})
2253
	}
2254
}
2255

2256
func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
2257
	tests := []struct {
2258
		name string
2259
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2260
	}{
2261
		{
2262
			name: "peer can not be loaded",
2263
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2264
				gomock.InOrder(
2265
					mr.PeerManager().Return(peerManager).Times(1),
2266
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2267
				)
2268

2269
				assert := assert.New(t)
2270
				assert.ErrorIs(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2271
			},
2272
		},
2273
		{
2274
			name: "peer state is PeerEventDownloadFailed",
2275
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2276
				gomock.InOrder(
2277
					mr.PeerManager().Return(peerManager).Times(1),
2278
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2279
				)
2280

2281
				peer.FSM.SetState(resource.PeerEventDownloadFailed)
2282

2283
				assert := assert.New(t)
2284
				assert.ErrorIs(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state DownloadFailed"))
2285
			},
2286
		},
2287
		{
2288
			name: "peer state is PeerStateRunning",
2289
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2290
				gomock.InOrder(
2291
					mr.PeerManager().Return(peerManager).Times(1),
2292
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2293
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2294
				)
2295

2296
				peer.FSM.SetState(resource.PeerStateRunning)
2297

2298
				assert := assert.New(t)
2299
				assert.NoError(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID))
2300
				assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2301
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2302
			},
2303
		},
2304
	}
2305

2306
	for _, tc := range tests {
2307
		t.Run(tc.name, func(t *testing.T) {
2308
			ctl := gomock.NewController(t)
2309
			defer ctl.Finish()
2310
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2311
			res := resource.NewMockResource(ctl)
2312
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2313
			storage := storagemocks.NewMockStorage(ctl)
2314
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2315
			peerManager := resource.NewMockPeerManager(ctl)
2316

2317
			mockHost := resource.NewHost(
2318
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2319
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2320
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2321
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2322
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2323

2324
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2325
		})
2326
	}
2327
}
2328

2329
func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
2330
	tests := []struct {
2331
		name string
2332
		run  func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2333
	}{
2334
		{
2335
			name: "peer can not be loaded",
2336
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2337
				gomock.InOrder(
2338
					mr.PeerManager().Return(peerManager).Times(1),
2339
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2340
				)
2341

2342
				peer.Task.ContentLength.Store(1)
2343
				peer.Task.TotalPieceCount.Store(1)
2344
				peer.Task.DirectPiece = []byte{1}
2345

2346
				assert := assert.New(t)
2347
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2348
				assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
2349
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2350
				assert.Equal(peer.Task.ContentLength.Load(), int64(1))
2351
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
2352
				assert.Equal(peer.Task.DirectPiece, []byte{1})
2353
			},
2354
		},
2355
		{
2356
			name: "peer state is PeerStateFailed",
2357
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2358
				gomock.InOrder(
2359
					mr.PeerManager().Return(peerManager).Times(1),
2360
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2361
				)
2362

2363
				peer.FSM.SetState(resource.PeerStateFailed)
2364
				peer.Task.ContentLength.Store(1)
2365
				peer.Task.TotalPieceCount.Store(1)
2366
				peer.Task.DirectPiece = []byte{1}
2367

2368
				assert := assert.New(t)
2369
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state Failed"))
2370
				assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2371
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2372
				assert.Equal(peer.Task.ContentLength.Load(), int64(1))
2373
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
2374
				assert.Equal(peer.Task.DirectPiece, []byte{1})
2375
			},
2376
		},
2377
		{
2378
			name: "task state is TaskStateFailed",
2379
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2380
				gomock.InOrder(
2381
					mr.PeerManager().Return(peerManager).Times(1),
2382
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2383
				)
2384

2385
				peer.FSM.SetState(resource.PeerStateRunning)
2386
				peer.Task.FSM.SetState(resource.TaskStateFailed)
2387
				peer.Task.ContentLength.Store(1)
2388
				peer.Task.TotalPieceCount.Store(1)
2389
				peer.Task.DirectPiece = []byte{1}
2390

2391
				assert := assert.New(t)
2392
				assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state Failed"))
2393
				assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2394
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateFailed)
2395
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2396
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2397
				assert.Equal(peer.Task.DirectPiece, []byte{})
2398
			},
2399
		},
2400
		{
2401
			name: "task state is TaskStateRunning",
2402
			run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2403
				gomock.InOrder(
2404
					mr.PeerManager().Return(peerManager).Times(1),
2405
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2406
					md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2407
				)
2408

2409
				peer.FSM.SetState(resource.PeerStateRunning)
2410
				peer.Task.FSM.SetState(resource.TaskStateRunning)
2411
				peer.Task.ContentLength.Store(1)
2412
				peer.Task.TotalPieceCount.Store(1)
2413
				peer.Task.DirectPiece = []byte{1}
2414

2415
				assert := assert.New(t)
2416
				assert.NoError(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID))
2417
				assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2418
				assert.Equal(peer.Task.FSM.Current(), resource.TaskStateFailed)
2419
				assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2420
				assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2421
				assert.Equal(peer.Task.DirectPiece, []byte{})
2422
			},
2423
		},
2424
	}
2425

2426
	for _, tc := range tests {
2427
		t.Run(tc.name, func(t *testing.T) {
2428
			ctl := gomock.NewController(t)
2429
			defer ctl.Finish()
2430
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2431
			res := resource.NewMockResource(ctl)
2432
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2433
			storage := storagemocks.NewMockStorage(ctl)
2434
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2435
			peerManager := resource.NewMockPeerManager(ctl)
2436

2437
			mockHost := resource.NewHost(
2438
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2439
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2440
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2441
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2442
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2443

2444
			tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2445
		})
2446
	}
2447
}
2448

2449
func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
2450
	tests := []struct {
2451
		name string
2452
		req  *schedulerv2.DownloadPieceFinishedRequest
2453
		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2454
	}{
2455
		{
2456
			name: "invalid digest",
2457
			req: &schedulerv2.DownloadPieceFinishedRequest{
2458
				Piece: &commonv2.Piece{
2459
					Number:      uint32(mockPiece.Number),
2460
					ParentId:    &mockPiece.ParentID,
2461
					Offset:      mockPiece.Offset,
2462
					Length:      mockPiece.Length,
2463
					Digest:      "foo",
2464
					TrafficType: &mockPiece.TrafficType,
2465
					Cost:        durationpb.New(mockPiece.Cost),
2466
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2467
				},
2468
			},
2469
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2470
				assert := assert.New(t)
2471
				assert.ErrorIs(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.InvalidArgument, "invalid digest"))
2472
			},
2473
		},
2474
		{
2475
			name: "peer can not be loaded",
2476
			req: &schedulerv2.DownloadPieceFinishedRequest{
2477
				Piece: &commonv2.Piece{
2478
					Number:      uint32(mockPiece.Number),
2479
					ParentId:    &mockPiece.ParentID,
2480
					Offset:      mockPiece.Offset,
2481
					Length:      mockPiece.Length,
2482
					Digest:      mockPiece.Digest.String(),
2483
					TrafficType: &mockPiece.TrafficType,
2484
					Cost:        durationpb.New(mockPiece.Cost),
2485
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2486
				},
2487
			},
2488
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2489
				gomock.InOrder(
2490
					mr.PeerManager().Return(peerManager).Times(1),
2491
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2492
				)
2493

2494
				assert := assert.New(t)
2495
				assert.ErrorIs(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2496
			},
2497
		},
2498
		{
2499
			name: "parent can not be loaded",
2500
			req: &schedulerv2.DownloadPieceFinishedRequest{
2501
				Piece: &commonv2.Piece{
2502
					Number:      uint32(mockPiece.Number),
2503
					ParentId:    &mockPiece.ParentID,
2504
					Offset:      mockPiece.Offset,
2505
					Length:      mockPiece.Length,
2506
					Digest:      mockPiece.Digest.String(),
2507
					TrafficType: &mockPiece.TrafficType,
2508
					Cost:        durationpb.New(mockPiece.Cost),
2509
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2510
				},
2511
			},
2512
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2513
				gomock.InOrder(
2514
					mr.PeerManager().Return(peerManager).Times(1),
2515
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2516
					mr.PeerManager().Return(peerManager).Times(1),
2517
					mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
2518
				)
2519

2520
				assert := assert.New(t)
2521
				assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req))
2522

2523
				piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2524
				assert.True(loaded)
2525
				assert.Equal(piece.Number, mockPiece.Number)
2526
				assert.Equal(piece.ParentID, mockPiece.ParentID)
2527
				assert.Equal(piece.Offset, mockPiece.Offset)
2528
				assert.Equal(piece.Length, mockPiece.Length)
2529
				assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2530
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2531
				assert.Equal(piece.Cost, mockPiece.Cost)
2532
				assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2533
				assert.Equal(peer.FinishedPieces.Count(), uint(1))
2534
				assert.Equal(len(peer.PieceCosts()), 1)
2535
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2536
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2537
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2538
			},
2539
		},
2540
		{
2541
			name: "parent can be loaded",
2542
			req: &schedulerv2.DownloadPieceFinishedRequest{
2543
				Piece: &commonv2.Piece{
2544
					Number:      uint32(mockPiece.Number),
2545
					ParentId:    &mockPiece.ParentID,
2546
					Offset:      mockPiece.Offset,
2547
					Length:      mockPiece.Length,
2548
					Digest:      mockPiece.Digest.String(),
2549
					TrafficType: &mockPiece.TrafficType,
2550
					Cost:        durationpb.New(mockPiece.Cost),
2551
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2552
				},
2553
			},
2554
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2555
				gomock.InOrder(
2556
					mr.PeerManager().Return(peerManager).Times(1),
2557
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2558
					mr.PeerManager().Return(peerManager).Times(1),
2559
					mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
2560
				)
2561

2562
				assert := assert.New(t)
2563
				assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req))
2564

2565
				piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2566
				assert.True(loaded)
2567
				assert.Equal(piece.Number, mockPiece.Number)
2568
				assert.Equal(piece.ParentID, mockPiece.ParentID)
2569
				assert.Equal(piece.Offset, mockPiece.Offset)
2570
				assert.Equal(piece.Length, mockPiece.Length)
2571
				assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2572
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2573
				assert.Equal(piece.Cost, mockPiece.Cost)
2574
				assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2575
				assert.Equal(peer.FinishedPieces.Count(), uint(1))
2576
				assert.Equal(len(peer.PieceCosts()), 1)
2577
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2578
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2579
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2580
				assert.NotEqual(peer.Host.UpdatedAt.Load(), 0)
2581
			},
2582
		},
2583
	}
2584

2585
	for _, tc := range tests {
2586
		t.Run(tc.name, func(t *testing.T) {
2587
			ctl := gomock.NewController(t)
2588
			defer ctl.Finish()
2589
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2590
			res := resource.NewMockResource(ctl)
2591
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2592
			storage := storagemocks.NewMockStorage(ctl)
2593
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2594
			peerManager := resource.NewMockPeerManager(ctl)
2595

2596
			mockHost := resource.NewHost(
2597
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2598
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2599
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2600
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2601
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2602

2603
			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2604
		})
2605
	}
2606
}
2607

2608
func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) {
2609
	tests := []struct {
2610
		name string
2611
		req  *schedulerv2.DownloadPieceBackToSourceFinishedRequest
2612
		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2613
	}{
2614
		{
2615
			name: "invalid digest",
2616
			req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2617
				Piece: &commonv2.Piece{
2618
					Number:      uint32(mockPiece.Number),
2619
					ParentId:    &mockPiece.ParentID,
2620
					Offset:      mockPiece.Offset,
2621
					Length:      mockPiece.Length,
2622
					Digest:      "foo",
2623
					TrafficType: &mockPiece.TrafficType,
2624
					Cost:        durationpb.New(mockPiece.Cost),
2625
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2626
				},
2627
			},
2628
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2629
				assert := assert.New(t)
2630
				assert.ErrorIs(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.InvalidArgument, "invalid digest"))
2631
			},
2632
		},
2633
		{
2634
			name: "peer can not be loaded",
2635
			req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2636
				Piece: &commonv2.Piece{
2637
					Number:      uint32(mockPiece.Number),
2638
					ParentId:    &mockPiece.ParentID,
2639
					Offset:      mockPiece.Offset,
2640
					Length:      mockPiece.Length,
2641
					Digest:      mockPiece.Digest.String(),
2642
					TrafficType: &mockPiece.TrafficType,
2643
					Cost:        durationpb.New(mockPiece.Cost),
2644
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2645
				},
2646
			},
2647
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2648
				gomock.InOrder(
2649
					mr.PeerManager().Return(peerManager).Times(1),
2650
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2651
				)
2652

2653
				assert := assert.New(t)
2654
				assert.ErrorIs(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2655
			},
2656
		},
2657
		{
2658
			name: "peer can be loaded",
2659
			req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2660
				Piece: &commonv2.Piece{
2661
					Number:      uint32(mockPiece.Number),
2662
					ParentId:    &mockPiece.ParentID,
2663
					Offset:      mockPiece.Offset,
2664
					Length:      mockPiece.Length,
2665
					Digest:      mockPiece.Digest.String(),
2666
					TrafficType: &mockPiece.TrafficType,
2667
					Cost:        durationpb.New(mockPiece.Cost),
2668
					CreatedAt:   timestamppb.New(mockPiece.CreatedAt),
2669
				},
2670
			},
2671
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2672
				gomock.InOrder(
2673
					mr.PeerManager().Return(peerManager).Times(1),
2674
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2675
				)
2676

2677
				assert := assert.New(t)
2678
				assert.NoError(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2679

2680
				piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2681
				assert.True(loaded)
2682
				assert.Equal(piece.Number, mockPiece.Number)
2683
				assert.Equal(piece.ParentID, mockPiece.ParentID)
2684
				assert.Equal(piece.Offset, mockPiece.Offset)
2685
				assert.Equal(piece.Length, mockPiece.Length)
2686
				assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2687
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2688
				assert.Equal(piece.Cost, mockPiece.Cost)
2689
				assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2690
				assert.Equal(peer.FinishedPieces.Count(), uint(1))
2691
				assert.Equal(len(peer.PieceCosts()), 1)
2692
				assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2693
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2694

2695
				piece, loaded = peer.Task.LoadPiece(int32(req.Piece.Number))
2696
				assert.True(loaded)
2697
				assert.Equal(piece.Number, mockPiece.Number)
2698
				assert.Equal(piece.ParentID, mockPiece.ParentID)
2699
				assert.Equal(piece.Offset, mockPiece.Offset)
2700
				assert.Equal(piece.Length, mockPiece.Length)
2701
				assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2702
				assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2703
				assert.Equal(piece.Cost, mockPiece.Cost)
2704
				assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2705
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2706
				assert.NotEqual(peer.Host.UpdatedAt.Load(), 0)
2707
			},
2708
		},
2709
	}
2710

2711
	for _, tc := range tests {
2712
		t.Run(tc.name, func(t *testing.T) {
2713
			ctl := gomock.NewController(t)
2714
			defer ctl.Finish()
2715
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2716
			res := resource.NewMockResource(ctl)
2717
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2718
			storage := storagemocks.NewMockStorage(ctl)
2719
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2720
			peerManager := resource.NewMockPeerManager(ctl)
2721

2722
			mockHost := resource.NewHost(
2723
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2724
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2725
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2726
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2727
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2728

2729
			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2730
		})
2731
	}
2732
}
2733

2734
func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
2735
	tests := []struct {
2736
		name string
2737
		req  *schedulerv2.DownloadPieceFailedRequest
2738
		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2739
			mp *resource.MockPeerManagerMockRecorder)
2740
	}{
2741
		{
2742
			name: "peer can not be loaded",
2743
			req: &schedulerv2.DownloadPieceFailedRequest{
2744
				ParentId:  mockSeedPeerID,
2745
				Temporary: true,
2746
			},
2747
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2748
				mp *resource.MockPeerManagerMockRecorder) {
2749
				gomock.InOrder(
2750
					mr.PeerManager().Return(peerManager).Times(1),
2751
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2752
				)
2753

2754
				assert := assert.New(t)
2755
				assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2756
			},
2757
		},
2758
		{
2759
			name: "temporary is false",
2760
			req: &schedulerv2.DownloadPieceFailedRequest{
2761
				ParentId:  mockSeedPeerID,
2762
				Temporary: false,
2763
			},
2764
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2765
				mp *resource.MockPeerManagerMockRecorder) {
2766
				gomock.InOrder(
2767
					mr.PeerManager().Return(peerManager).Times(1),
2768
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2769
				)
2770

2771
				assert := assert.New(t)
2772
				assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed"))
2773
			},
2774
		},
2775
		{
2776
			name: "parent can not be loaded",
2777
			req: &schedulerv2.DownloadPieceFailedRequest{
2778
				ParentId:  mockSeedPeerID,
2779
				Temporary: true,
2780
			},
2781
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2782
				mp *resource.MockPeerManagerMockRecorder) {
2783
				gomock.InOrder(
2784
					mr.PeerManager().Return(peerManager).Times(1),
2785
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2786
					mr.PeerManager().Return(peerManager).Times(1),
2787
					mp.Load(gomock.Eq(req.GetParentId())).Return(nil, false).Times(1),
2788
				)
2789

2790
				assert := assert.New(t)
2791
				assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
2792
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2793
				assert.True(peer.BlockParents.Contains(req.GetParentId()))
2794
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2795
			},
2796
		},
2797
		{
2798
			name: "parent can be loaded",
2799
			req: &schedulerv2.DownloadPieceFailedRequest{
2800
				ParentId:  mockSeedPeerID,
2801
				Temporary: true,
2802
			},
2803
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2804
				mp *resource.MockPeerManagerMockRecorder) {
2805
				gomock.InOrder(
2806
					mr.PeerManager().Return(peerManager).Times(1),
2807
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2808
					mr.PeerManager().Return(peerManager).Times(1),
2809
					mp.Load(gomock.Eq(req.GetParentId())).Return(peer, true).Times(1),
2810
				)
2811

2812
				assert := assert.New(t)
2813
				assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
2814
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2815
				assert.True(peer.BlockParents.Contains(req.GetParentId()))
2816
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2817
				assert.Equal(peer.Host.UploadFailedCount.Load(), int64(1))
2818
			},
2819
		},
2820
	}
2821

2822
	for _, tc := range tests {
2823
		t.Run(tc.name, func(t *testing.T) {
2824
			ctl := gomock.NewController(t)
2825
			defer ctl.Finish()
2826
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2827
			res := resource.NewMockResource(ctl)
2828
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2829
			storage := storagemocks.NewMockStorage(ctl)
2830
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2831
			peerManager := resource.NewMockPeerManager(ctl)
2832

2833
			mockHost := resource.NewHost(
2834
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2835
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2836
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2837
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2838
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2839

2840
			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2841
		})
2842
	}
2843
}
2844

2845
func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
2846
	mockPieceNumber := uint32(mockPiece.Number)
2847

2848
	tests := []struct {
2849
		name string
2850
		req  *schedulerv2.DownloadPieceBackToSourceFailedRequest
2851
		run  func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2852
			mp *resource.MockPeerManagerMockRecorder)
2853
	}{
2854
		{
2855
			name: "peer can not be loaded",
2856
			req:  &schedulerv2.DownloadPieceBackToSourceFailedRequest{},
2857
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2858
				mp *resource.MockPeerManagerMockRecorder) {
2859
				gomock.InOrder(
2860
					mr.PeerManager().Return(peerManager).Times(1),
2861
					mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2862
				)
2863

2864
				assert := assert.New(t)
2865
				assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2866
			},
2867
		},
2868
		{
2869
			name: "peer can be loaded",
2870
			req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{
2871
				PieceNumber: &mockPieceNumber,
2872
			},
2873
			run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2874
				mp *resource.MockPeerManagerMockRecorder) {
2875
				gomock.InOrder(
2876
					mr.PeerManager().Return(peerManager).Times(1),
2877
					mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2878
				)
2879

2880
				assert := assert.New(t)
2881
				assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "download piece from source failed"))
2882
				assert.NotEqual(peer.UpdatedAt.Load(), 0)
2883
				assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2884
			},
2885
		},
2886
	}
2887

2888
	for _, tc := range tests {
2889
		t.Run(tc.name, func(t *testing.T) {
2890
			ctl := gomock.NewController(t)
2891
			defer ctl.Finish()
2892
			scheduling := schedulingmocks.NewMockScheduling(ctl)
2893
			res := resource.NewMockResource(ctl)
2894
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2895
			storage := storagemocks.NewMockStorage(ctl)
2896
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2897
			peerManager := resource.NewMockPeerManager(ctl)
2898

2899
			mockHost := resource.NewHost(
2900
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2901
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2902
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2903
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2904
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2905

2906
			tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2907
		})
2908
	}
2909
}
2910

2911
func TestServiceV2_handleResource(t *testing.T) {
2912
	dgst := mockTaskDigest.String()
2913
	mismatchDgst := "foo"
2914

2915
	tests := []struct {
2916
		name     string
2917
		download *commonv2.Download
2918
		run      func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2919
			hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2920
			mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2921
	}{
2922
		{
2923
			name:     "host can not be loaded",
2924
			download: &commonv2.Download{},
2925
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2926
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2927
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2928
				gomock.InOrder(
2929
					mr.HostManager().Return(hostManager).Times(1),
2930
					mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false).Times(1),
2931
				)
2932

2933
				assert := assert.New(t)
2934
				_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2935
				assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID))
2936
			},
2937
		},
2938
		{
2939
			name: "task can be loaded",
2940
			download: &commonv2.Download{
2941
				Url:                 "foo",
2942
				FilteredQueryParams: []string{"bar"},
2943
				RequestHeader:       map[string]string{"baz": "bas"},
2944
			},
2945
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2946
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2947
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2948
				gomock.InOrder(
2949
					mr.HostManager().Return(hostManager).Times(1),
2950
					mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
2951
					mr.TaskManager().Return(taskManager).Times(1),
2952
					mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
2953
					mr.PeerManager().Return(peerManager).Times(1),
2954
					mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
2955
				)
2956

2957
				assert := assert.New(t)
2958
				host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2959
				assert.NoError(err)
2960
				assert.EqualValues(host, mockHost)
2961
				assert.Equal(task.ID, mockTask.ID)
2962
				assert.Equal(task.URL, download.Url)
2963
				assert.EqualValues(task.FilteredQueryParams, download.FilteredQueryParams)
2964
				assert.EqualValues(task.Header, download.RequestHeader)
2965
			},
2966
		},
2967
		{
2968
			name: "task can not be loaded",
2969
			download: &commonv2.Download{
2970
				Url:                 "foo",
2971
				FilteredQueryParams: []string{"bar"},
2972
				RequestHeader:       map[string]string{"baz": "bas"},
2973
				Digest:              &dgst,
2974
			},
2975
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2976
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2977
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2978
				gomock.InOrder(
2979
					mr.HostManager().Return(hostManager).Times(1),
2980
					mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
2981
					mr.TaskManager().Return(taskManager).Times(1),
2982
					mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1),
2983
					mr.TaskManager().Return(taskManager).Times(1),
2984
					mt.Store(gomock.Any()).Return().Times(1),
2985
					mr.PeerManager().Return(peerManager).Times(1),
2986
					mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
2987
				)
2988

2989
				assert := assert.New(t)
2990
				host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2991
				assert.NoError(err)
2992
				assert.EqualValues(host, mockHost)
2993
				assert.Equal(task.ID, mockTask.ID)
2994
				assert.Equal(task.Digest.String(), download.GetDigest())
2995
				assert.Equal(task.URL, download.GetUrl())
2996
				assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
2997
				assert.EqualValues(task.Header, download.RequestHeader)
2998
			},
2999
		},
3000
		{
3001
			name: "invalid digest",
3002
			download: &commonv2.Download{
3003
				Digest: &mismatchDgst,
3004
			},
3005
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3006
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3007
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3008
				gomock.InOrder(
3009
					mr.HostManager().Return(hostManager).Times(1),
3010
					mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3011
					mr.TaskManager().Return(taskManager).Times(1),
3012
					mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1),
3013
				)
3014

3015
				assert := assert.New(t)
3016
				_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3017
				assert.ErrorIs(err, status.Error(codes.InvalidArgument, "invalid digest"))
3018
			},
3019
		},
3020
		{
3021
			name: "peer can be loaded",
3022
			download: &commonv2.Download{
3023
				Url:                 "foo",
3024
				FilteredQueryParams: []string{"bar"},
3025
				RequestHeader:       map[string]string{"baz": "bas"},
3026
				Digest:              &dgst,
3027
			},
3028
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3029
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3030
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3031
				gomock.InOrder(
3032
					mr.HostManager().Return(hostManager).Times(1),
3033
					mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3034
					mr.TaskManager().Return(taskManager).Times(1),
3035
					mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3036
					mr.PeerManager().Return(peerManager).Times(1),
3037
					mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
3038
				)
3039

3040
				assert := assert.New(t)
3041
				host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3042
				assert.NoError(err)
3043
				assert.EqualValues(host, mockHost)
3044
				assert.Equal(task.ID, mockTask.ID)
3045
				assert.Equal(task.Digest.String(), download.GetDigest())
3046
				assert.Equal(task.URL, download.GetUrl())
3047
				assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
3048
				assert.EqualValues(task.Header, download.RequestHeader)
3049
				assert.EqualValues(peer, mockPeer)
3050
			},
3051
		},
3052
		{
3053
			name: "peer can not be loaded",
3054
			download: &commonv2.Download{
3055
				Url:                 "foo",
3056
				FilteredQueryParams: []string{"bar"},
3057
				RequestHeader:       map[string]string{"baz": "bas"},
3058
				Digest:              &dgst,
3059
				Priority:            commonv2.Priority_LEVEL1,
3060
				Range: &commonv2.Range{
3061
					Start:  uint64(mockPeerRange.Start),
3062
					Length: uint64(mockPeerRange.Length),
3063
				},
3064
			},
3065
			run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3066
				hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3067
				mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3068
				gomock.InOrder(
3069
					mr.HostManager().Return(hostManager).Times(1),
3070
					mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3071
					mr.TaskManager().Return(taskManager).Times(1),
3072
					mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3073
					mr.PeerManager().Return(peerManager).Times(1),
3074
					mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3075
					mr.PeerManager().Return(peerManager).Times(1),
3076
					mp.Store(gomock.Any()).Return().Times(1),
3077
				)
3078

3079
				assert := assert.New(t)
3080
				host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3081
				assert.NoError(err)
3082
				assert.EqualValues(host, mockHost)
3083
				assert.Equal(task.ID, mockTask.ID)
3084
				assert.Equal(task.Digest.String(), download.GetDigest())
3085
				assert.Equal(task.URL, download.GetUrl())
3086
				assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
3087
				assert.EqualValues(task.Header, download.RequestHeader)
3088
				assert.Equal(peer.ID, mockPeer.ID)
3089
				assert.Equal(peer.Priority, download.Priority)
3090
				assert.Equal(peer.Range.Start, int64(download.Range.Start))
3091
				assert.Equal(peer.Range.Length, int64(download.Range.Length))
3092
				assert.NotNil(peer.AnnouncePeerStream)
3093
				assert.EqualValues(peer.Host, mockHost)
3094
				assert.EqualValues(peer.Task, mockTask)
3095
			},
3096
		},
3097
	}
3098

3099
	for _, tc := range tests {
3100
		t.Run(tc.name, func(t *testing.T) {
3101
			ctl := gomock.NewController(t)
3102
			defer ctl.Finish()
3103
			scheduling := schedulingmocks.NewMockScheduling(ctl)
3104
			res := resource.NewMockResource(ctl)
3105
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
3106
			storage := storagemocks.NewMockStorage(ctl)
3107
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
3108
			hostManager := resource.NewMockHostManager(ctl)
3109
			taskManager := resource.NewMockTaskManager(ctl)
3110
			peerManager := resource.NewMockPeerManager(ctl)
3111
			stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
3112

3113
			mockHost := resource.NewHost(
3114
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
3115
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
3116
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
3117
			mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
3118
			svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
3119

3120
			tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
3121
		})
3122
	}
3123
}
3124

3125
func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
3126
	tests := []struct {
3127
		name   string
3128
		config config.Config
3129
		run    func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder)
3130
	}{
3131
		{
3132
			name: "priority is Priority_LEVEL6 and enable seed peer",
3133
			config: config.Config{
3134
				SeedPeer: config.SeedPeerConfig{
3135
					Enable: true,
3136
				},
3137
			},
3138
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3139
				var wg sync.WaitGroup
3140
				wg.Add(1)
3141
				defer wg.Wait()
3142

3143
				gomock.InOrder(
3144
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3145
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3146
				)
3147

3148
				peer.Priority = commonv2.Priority_LEVEL6
3149

3150
				assert := assert.New(t)
3151
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3152
				assert.False(peer.NeedBackToSource.Load())
3153
			},
3154
		},
3155
		{
3156
			name: "priority is Priority_LEVEL6, enable seed peer and download task failed",
3157
			config: config.Config{
3158
				SeedPeer: config.SeedPeerConfig{
3159
					Enable: true,
3160
				},
3161
			},
3162
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3163
				var wg sync.WaitGroup
3164
				wg.Add(1)
3165
				defer wg.Wait()
3166

3167
				gomock.InOrder(
3168
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3169
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3170
				)
3171

3172
				peer.Priority = commonv2.Priority_LEVEL6
3173

3174
				assert := assert.New(t)
3175
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3176
				assert.False(peer.NeedBackToSource.Load())
3177
			},
3178
		},
3179
		{
3180
			name: "priority is Priority_LEVEL6 and disable seed peer",
3181
			config: config.Config{
3182
				SeedPeer: config.SeedPeerConfig{
3183
					Enable: false,
3184
				},
3185
			},
3186
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3187
				peer.Priority = commonv2.Priority_LEVEL6
3188

3189
				assert := assert.New(t)
3190
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3191
				assert.True(peer.NeedBackToSource.Load())
3192
			},
3193
		},
3194
		{
3195
			name: "priority is Priority_LEVEL5 and enable seed peer",
3196
			config: config.Config{
3197
				SeedPeer: config.SeedPeerConfig{
3198
					Enable: true,
3199
				},
3200
			},
3201
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3202
				var wg sync.WaitGroup
3203
				wg.Add(1)
3204
				defer wg.Wait()
3205

3206
				gomock.InOrder(
3207
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3208
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3209
				)
3210

3211
				peer.Priority = commonv2.Priority_LEVEL5
3212

3213
				assert := assert.New(t)
3214
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3215
				assert.False(peer.NeedBackToSource.Load())
3216
			},
3217
		},
3218
		{
3219
			name: "priority is Priority_LEVEL5, enable seed peer and download task failed",
3220
			config: config.Config{
3221
				SeedPeer: config.SeedPeerConfig{
3222
					Enable: true,
3223
				},
3224
			},
3225
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3226
				var wg sync.WaitGroup
3227
				wg.Add(1)
3228
				defer wg.Wait()
3229

3230
				gomock.InOrder(
3231
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3232
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3233
				)
3234

3235
				peer.Priority = commonv2.Priority_LEVEL5
3236

3237
				assert := assert.New(t)
3238
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3239
				assert.False(peer.NeedBackToSource.Load())
3240
			},
3241
		},
3242
		{
3243
			name: "priority is Priority_LEVEL5 and disable seed peer",
3244
			config: config.Config{
3245
				SeedPeer: config.SeedPeerConfig{
3246
					Enable: false,
3247
				},
3248
			},
3249
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3250
				peer.Priority = commonv2.Priority_LEVEL5
3251

3252
				assert := assert.New(t)
3253
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3254
				assert.True(peer.NeedBackToSource.Load())
3255
			},
3256
		},
3257
		{
3258
			name: "priority is Priority_LEVEL4 and enable seed peer",
3259
			config: config.Config{
3260
				SeedPeer: config.SeedPeerConfig{
3261
					Enable: true,
3262
				},
3263
			},
3264
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3265
				var wg sync.WaitGroup
3266
				wg.Add(1)
3267
				defer wg.Wait()
3268

3269
				gomock.InOrder(
3270
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3271
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3272
				)
3273

3274
				peer.Priority = commonv2.Priority_LEVEL4
3275

3276
				assert := assert.New(t)
3277
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3278
				assert.False(peer.NeedBackToSource.Load())
3279
			},
3280
		},
3281
		{
3282
			name: "priority is Priority_LEVEL4, enable seed peer and download task failed",
3283
			config: config.Config{
3284
				SeedPeer: config.SeedPeerConfig{
3285
					Enable: true,
3286
				},
3287
			},
3288
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3289
				var wg sync.WaitGroup
3290
				wg.Add(1)
3291
				defer wg.Wait()
3292

3293
				gomock.InOrder(
3294
					mr.SeedPeer().Return(seedPeerClient).Times(1),
3295
					ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3296
				)
3297

3298
				peer.Priority = commonv2.Priority_LEVEL4
3299

3300
				assert := assert.New(t)
3301
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3302
				assert.False(peer.NeedBackToSource.Load())
3303
			},
3304
		},
3305
		{
3306
			name: "priority is Priority_LEVEL4 and disable seed peer",
3307
			config: config.Config{
3308
				SeedPeer: config.SeedPeerConfig{
3309
					Enable: false,
3310
				},
3311
			},
3312
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3313
				peer.Priority = commonv2.Priority_LEVEL4
3314

3315
				assert := assert.New(t)
3316
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3317
				assert.True(peer.NeedBackToSource.Load())
3318
			},
3319
		},
3320
		{
3321
			name: "priority is Priority_LEVEL3",
3322
			config: config.Config{
3323
				SeedPeer: config.SeedPeerConfig{
3324
					Enable: true,
3325
				},
3326
			},
3327
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3328
				peer.Priority = commonv2.Priority_LEVEL3
3329

3330
				assert := assert.New(t)
3331
				assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3332
				assert.True(peer.NeedBackToSource.Load())
3333
			},
3334
		},
3335
		{
3336
			name: "priority is Priority_LEVEL2",
3337
			config: config.Config{
3338
				SeedPeer: config.SeedPeerConfig{
3339
					Enable: true,
3340
				},
3341
			},
3342
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3343
				peer.Priority = commonv2.Priority_LEVEL2
3344

3345
				assert := assert.New(t)
3346
				assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String()))
3347
			},
3348
		},
3349
		{
3350
			name: "priority is Priority_LEVEL1",
3351
			config: config.Config{
3352
				SeedPeer: config.SeedPeerConfig{
3353
					Enable: true,
3354
				},
3355
			},
3356
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3357
				peer.Priority = commonv2.Priority_LEVEL1
3358

3359
				assert := assert.New(t)
3360
				assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
3361
			},
3362
		},
3363
		{
3364
			name: "priority is Priority_LEVEL0",
3365
			config: config.Config{
3366
				SeedPeer: config.SeedPeerConfig{
3367
					Enable: true,
3368
				},
3369
			},
3370
			run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3371
				peer.Priority = commonv2.Priority(100)
3372

3373
				assert := assert.New(t)
3374
				assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.InvalidArgument, "invalid priority %#v", peer.Priority))
3375
			},
3376
		},
3377
	}
3378

3379
	for _, tc := range tests {
3380
		t.Run(tc.name, func(t *testing.T) {
3381
			ctl := gomock.NewController(t)
3382
			defer ctl.Finish()
3383
			scheduling := schedulingmocks.NewMockScheduling(ctl)
3384
			res := resource.NewMockResource(ctl)
3385
			dynconfig := configmocks.NewMockDynconfigInterface(ctl)
3386
			storage := storagemocks.NewMockStorage(ctl)
3387
			networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
3388
			seedPeerClient := resource.NewMockSeedPeer(ctl)
3389

3390
			mockHost := resource.NewHost(
3391
				mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
3392
				mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
3393
			mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
3394
			peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
3395
			svc := NewV2(&tc.config, res, scheduling, dynconfig, storage, networkTopology)
3396

3397
			tc.run(t, svc, peer, seedPeerClient, res.EXPECT(), seedPeerClient.EXPECT())
3398
		})
3399
	}
3400
}
3401

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.