Dragonfly2
3400 строк · 151.7 Кб
1/*
2* Copyright 2023 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package service
18
19import (
20"context"
21"errors"
22"io"
23"net"
24"net/http"
25"net/http/httptest"
26"net/url"
27"reflect"
28"strconv"
29"sync"
30"testing"
31
32"github.com/stretchr/testify/assert"
33"go.uber.org/mock/gomock"
34"google.golang.org/grpc/codes"
35"google.golang.org/grpc/status"
36"google.golang.org/protobuf/types/known/durationpb"
37"google.golang.org/protobuf/types/known/timestamppb"
38
39commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
40dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
41managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
42schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
43schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
44
45managertypes "d7y.io/dragonfly/v2/manager/types"
46nethttp "d7y.io/dragonfly/v2/pkg/net/http"
47pkgtypes "d7y.io/dragonfly/v2/pkg/types"
48"d7y.io/dragonfly/v2/scheduler/config"
49configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
50"d7y.io/dragonfly/v2/scheduler/networktopology"
51networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
52"d7y.io/dragonfly/v2/scheduler/resource"
53"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
54schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
55storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
56)
57
58func TestService_NewV2(t *testing.T) {
59tests := []struct {
60name string
61expect func(t *testing.T, s any)
62}{
63{
64name: "new service",
65expect: func(t *testing.T, s any) {
66assert := assert.New(t)
67assert.Equal(reflect.TypeOf(s).Elem().Name(), "V2")
68},
69},
70}
71
72for _, tc := range tests {
73t.Run(tc.name, func(t *testing.T) {
74ctl := gomock.NewController(t)
75defer ctl.Finish()
76scheduling := schedulingmocks.NewMockScheduling(ctl)
77resource := resource.NewMockResource(ctl)
78dynconfig := configmocks.NewMockDynconfigInterface(ctl)
79storage := storagemocks.NewMockStorage(ctl)
80networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
81
82tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology))
83})
84}
85}
86
87func TestServiceV2_StatPeer(t *testing.T) {
88tests := []struct {
89name string
90mock func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
91expect func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error)
92}{
93{
94name: "peer not found",
95mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
96gomock.InOrder(
97mr.PeerManager().Return(peerManager).Times(1),
98mp.Load(gomock.Any()).Return(nil, false).Times(1),
99)
100},
101expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
102assert := assert.New(t)
103assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
104},
105},
106{
107name: "peer has been loaded",
108mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
109peer.StorePiece(&mockPiece)
110peer.Task.StorePiece(&mockPiece)
111gomock.InOrder(
112mr.PeerManager().Return(peerManager).Times(1),
113mp.Load(gomock.Any()).Return(peer, true).Times(1),
114)
115},
116expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
117dgst := peer.Task.Digest.String()
118
119assert := assert.New(t)
120assert.EqualValues(resp, &commonv2.Peer{
121Id: peer.ID,
122Range: &commonv2.Range{
123Start: uint64(peer.Range.Start),
124Length: uint64(peer.Range.Length),
125},
126Priority: peer.Priority,
127Pieces: []*commonv2.Piece{
128{
129Number: uint32(mockPiece.Number),
130ParentId: &mockPiece.ParentID,
131Offset: mockPiece.Offset,
132Length: mockPiece.Length,
133Digest: mockPiece.Digest.String(),
134TrafficType: &mockPiece.TrafficType,
135Cost: durationpb.New(mockPiece.Cost),
136CreatedAt: timestamppb.New(mockPiece.CreatedAt),
137},
138},
139Cost: durationpb.New(peer.Cost.Load()),
140State: peer.FSM.Current(),
141Task: &commonv2.Task{
142Id: peer.Task.ID,
143Type: peer.Task.Type,
144Url: peer.Task.URL,
145Digest: &dgst,
146Tag: &peer.Task.Tag,
147Application: &peer.Task.Application,
148FilteredQueryParams: peer.Task.FilteredQueryParams,
149RequestHeader: peer.Task.Header,
150PieceLength: uint32(peer.Task.PieceLength),
151ContentLength: uint64(peer.Task.ContentLength.Load()),
152PieceCount: uint32(peer.Task.TotalPieceCount.Load()),
153SizeScope: peer.Task.SizeScope(),
154Pieces: []*commonv2.Piece{
155{
156Number: uint32(mockPiece.Number),
157ParentId: &mockPiece.ParentID,
158Offset: mockPiece.Offset,
159Length: mockPiece.Length,
160Digest: mockPiece.Digest.String(),
161TrafficType: &mockPiece.TrafficType,
162Cost: durationpb.New(mockPiece.Cost),
163CreatedAt: timestamppb.New(mockPiece.CreatedAt),
164},
165},
166State: peer.Task.FSM.Current(),
167PeerCount: uint32(peer.Task.PeerCount()),
168CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()),
169UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()),
170},
171Host: &commonv2.Host{
172Id: peer.Host.ID,
173Type: uint32(peer.Host.Type),
174Hostname: peer.Host.Hostname,
175Ip: peer.Host.IP,
176Port: peer.Host.Port,
177DownloadPort: peer.Host.DownloadPort,
178Os: peer.Host.OS,
179Platform: peer.Host.Platform,
180PlatformFamily: peer.Host.PlatformFamily,
181PlatformVersion: peer.Host.PlatformVersion,
182KernelVersion: peer.Host.KernelVersion,
183Cpu: &commonv2.CPU{
184LogicalCount: peer.Host.CPU.LogicalCount,
185PhysicalCount: peer.Host.CPU.PhysicalCount,
186Percent: peer.Host.CPU.Percent,
187ProcessPercent: peer.Host.CPU.ProcessPercent,
188Times: &commonv2.CPUTimes{
189User: peer.Host.CPU.Times.User,
190System: peer.Host.CPU.Times.System,
191Idle: peer.Host.CPU.Times.Idle,
192Nice: peer.Host.CPU.Times.Nice,
193Iowait: peer.Host.CPU.Times.Iowait,
194Irq: peer.Host.CPU.Times.Irq,
195Softirq: peer.Host.CPU.Times.Softirq,
196Steal: peer.Host.CPU.Times.Steal,
197Guest: peer.Host.CPU.Times.Guest,
198GuestNice: peer.Host.CPU.Times.GuestNice,
199},
200},
201Memory: &commonv2.Memory{
202Total: peer.Host.Memory.Total,
203Available: peer.Host.Memory.Available,
204Used: peer.Host.Memory.Used,
205UsedPercent: peer.Host.Memory.UsedPercent,
206ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
207Free: peer.Host.Memory.Free,
208},
209Network: &commonv2.Network{
210TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
211UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
212Location: &peer.Host.Network.Location,
213Idc: &peer.Host.Network.IDC,
214},
215Disk: &commonv2.Disk{
216Total: peer.Host.Disk.Total,
217Free: peer.Host.Disk.Free,
218Used: peer.Host.Disk.Used,
219UsedPercent: peer.Host.Disk.UsedPercent,
220InodesTotal: peer.Host.Disk.InodesTotal,
221InodesUsed: peer.Host.Disk.InodesUsed,
222InodesFree: peer.Host.Disk.InodesFree,
223InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
224},
225Build: &commonv2.Build{
226GitVersion: peer.Host.Build.GitVersion,
227GitCommit: &peer.Host.Build.GitCommit,
228GoVersion: &peer.Host.Build.GoVersion,
229Platform: &peer.Host.Build.Platform,
230},
231},
232NeedBackToSource: peer.NeedBackToSource.Load(),
233CreatedAt: timestamppb.New(peer.CreatedAt.Load()),
234UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()),
235})
236},
237},
238}
239
240for _, tc := range tests {
241t.Run(tc.name, func(t *testing.T) {
242ctl := gomock.NewController(t)
243defer ctl.Finish()
244scheduling := schedulingmocks.NewMockScheduling(ctl)
245res := resource.NewMockResource(ctl)
246dynconfig := configmocks.NewMockDynconfigInterface(ctl)
247storage := storagemocks.NewMockStorage(ctl)
248networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
249peerManager := resource.NewMockPeerManager(ctl)
250mockHost := resource.NewHost(
251mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
252mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
253mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
254peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange))
255svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
256
257tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
258resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})
259tc.expect(t, peer, resp, err)
260})
261}
262}
263
264func TestServiceV2_LeavePeer(t *testing.T) {
265tests := []struct {
266name string
267mock func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
268expect func(t *testing.T, err error)
269}{
270{
271name: "peer not found",
272mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
273gomock.InOrder(
274mr.PeerManager().Return(peerManager).Times(1),
275mp.Load(gomock.Any()).Return(nil, false).Times(1),
276)
277},
278expect: func(t *testing.T, err error) {
279assert := assert.New(t)
280assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
281},
282},
283{
284name: "peer fsm event failed",
285mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
286peer.FSM.SetState(resource.PeerStateLeave)
287gomock.InOrder(
288mr.PeerManager().Return(peerManager).Times(1),
289mp.Load(gomock.Any()).Return(peer, true).Times(1),
290)
291},
292expect: func(t *testing.T, err error) {
293assert := assert.New(t)
294assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "peer fsm event failed: event Leave inappropriate in current state Leave"))
295},
296},
297{
298name: "peer leaves succeeded",
299mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
300gomock.InOrder(
301mr.PeerManager().Return(peerManager).Times(1),
302mp.Load(gomock.Any()).Return(peer, true).Times(1),
303)
304},
305expect: func(t *testing.T, err error) {
306assert := assert.New(t)
307assert.NoError(err)
308},
309},
310}
311
312for _, tc := range tests {
313t.Run(tc.name, func(t *testing.T) {
314ctl := gomock.NewController(t)
315defer ctl.Finish()
316scheduling := schedulingmocks.NewMockScheduling(ctl)
317res := resource.NewMockResource(ctl)
318dynconfig := configmocks.NewMockDynconfigInterface(ctl)
319storage := storagemocks.NewMockStorage(ctl)
320networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
321peerManager := resource.NewMockPeerManager(ctl)
322mockHost := resource.NewHost(
323mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
324mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
325mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
326peer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost, resource.WithRange(mockPeerRange))
327svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
328
329tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
330tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}))
331})
332}
333}
334
335func TestServiceV2_StatTask(t *testing.T) {
336tests := []struct {
337name string
338mock func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder)
339expect func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error)
340}{
341{
342name: "task not found",
343mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) {
344gomock.InOrder(
345mr.TaskManager().Return(taskManager).Times(1),
346mt.Load(gomock.Any()).Return(nil, false).Times(1),
347)
348},
349expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) {
350assert := assert.New(t)
351assert.ErrorIs(err, status.Errorf(codes.NotFound, "task %s not found", mockTaskID))
352},
353},
354{
355name: "task has been loaded",
356mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) {
357task.StorePiece(&mockPiece)
358gomock.InOrder(
359mr.TaskManager().Return(taskManager).Times(1),
360mt.Load(gomock.Any()).Return(task, true).Times(1),
361)
362},
363expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) {
364dgst := task.Digest.String()
365
366assert := assert.New(t)
367assert.EqualValues(resp, &commonv2.Task{
368Id: task.ID,
369Type: task.Type,
370Url: task.URL,
371Digest: &dgst,
372Tag: &task.Tag,
373Application: &task.Application,
374FilteredQueryParams: task.FilteredQueryParams,
375RequestHeader: task.Header,
376PieceLength: uint32(task.PieceLength),
377ContentLength: uint64(task.ContentLength.Load()),
378PieceCount: uint32(task.TotalPieceCount.Load()),
379SizeScope: task.SizeScope(),
380Pieces: []*commonv2.Piece{
381{
382Number: uint32(mockPiece.Number),
383ParentId: &mockPiece.ParentID,
384Offset: mockPiece.Offset,
385Length: mockPiece.Length,
386Digest: mockPiece.Digest.String(),
387TrafficType: &mockPiece.TrafficType,
388Cost: durationpb.New(mockPiece.Cost),
389CreatedAt: timestamppb.New(mockPiece.CreatedAt),
390},
391},
392State: task.FSM.Current(),
393PeerCount: uint32(task.PeerCount()),
394CreatedAt: timestamppb.New(task.CreatedAt.Load()),
395UpdatedAt: timestamppb.New(task.UpdatedAt.Load()),
396})
397},
398},
399}
400
401for _, tc := range tests {
402t.Run(tc.name, func(t *testing.T) {
403ctl := gomock.NewController(t)
404defer ctl.Finish()
405scheduling := schedulingmocks.NewMockScheduling(ctl)
406res := resource.NewMockResource(ctl)
407dynconfig := configmocks.NewMockDynconfigInterface(ctl)
408storage := storagemocks.NewMockStorage(ctl)
409networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
410taskManager := resource.NewMockTaskManager(ctl)
411task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
412svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
413
414tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT())
415resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{Id: mockTaskID})
416tc.expect(t, task, resp, err)
417})
418}
419}
420
421func TestServiceV2_AnnounceHost(t *testing.T) {
422tests := []struct {
423name string
424req *schedulerv2.AnnounceHostRequest
425run func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
426}{
427{
428name: "host not found",
429req: &schedulerv2.AnnounceHostRequest{
430Host: &commonv2.Host{
431Id: mockHostID,
432Type: uint32(pkgtypes.HostTypeNormal),
433Hostname: "hostname",
434Ip: "127.0.0.1",
435Port: 8003,
436DownloadPort: 8001,
437Os: "darwin",
438Platform: "darwin",
439PlatformFamily: "Standalone Workstation",
440PlatformVersion: "11.1",
441KernelVersion: "20.2.0",
442Cpu: &commonv2.CPU{
443LogicalCount: mockCPU.LogicalCount,
444PhysicalCount: mockCPU.PhysicalCount,
445Percent: mockCPU.Percent,
446ProcessPercent: mockCPU.ProcessPercent,
447Times: &commonv2.CPUTimes{
448User: mockCPU.Times.User,
449System: mockCPU.Times.System,
450Idle: mockCPU.Times.Idle,
451Nice: mockCPU.Times.Nice,
452Iowait: mockCPU.Times.Iowait,
453Irq: mockCPU.Times.Irq,
454Softirq: mockCPU.Times.Softirq,
455Steal: mockCPU.Times.Steal,
456Guest: mockCPU.Times.Guest,
457GuestNice: mockCPU.Times.GuestNice,
458},
459},
460Memory: &commonv2.Memory{
461Total: mockMemory.Total,
462Available: mockMemory.Available,
463Used: mockMemory.Used,
464UsedPercent: mockMemory.UsedPercent,
465ProcessUsedPercent: mockMemory.ProcessUsedPercent,
466Free: mockMemory.Free,
467},
468Network: &commonv2.Network{
469TcpConnectionCount: mockNetwork.TCPConnectionCount,
470UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
471Location: &mockNetwork.Location,
472Idc: &mockNetwork.IDC,
473},
474Disk: &commonv2.Disk{
475Total: mockDisk.Total,
476Free: mockDisk.Free,
477Used: mockDisk.Used,
478UsedPercent: mockDisk.UsedPercent,
479InodesTotal: mockDisk.InodesTotal,
480InodesUsed: mockDisk.InodesUsed,
481InodesFree: mockDisk.InodesFree,
482InodesUsedPercent: mockDisk.InodesUsedPercent,
483},
484Build: &commonv2.Build{
485GitVersion: mockBuild.GitVersion,
486GitCommit: &mockBuild.GitCommit,
487GoVersion: &mockBuild.GoVersion,
488Platform: &mockBuild.Platform,
489},
490},
491},
492run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
493gomock.InOrder(
494md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1),
495mr.HostManager().Return(hostManager).Times(1),
496mh.Load(gomock.Any()).Return(nil, false).Times(1),
497mr.HostManager().Return(hostManager).Times(1),
498mh.Store(gomock.Any()).Do(func(host *resource.Host) {
499assert := assert.New(t)
500assert.Equal(host.ID, req.Host.Id)
501assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
502assert.Equal(host.Hostname, req.Host.Hostname)
503assert.Equal(host.IP, req.Host.Ip)
504assert.Equal(host.Port, req.Host.Port)
505assert.Equal(host.DownloadPort, req.Host.DownloadPort)
506assert.Equal(host.OS, req.Host.Os)
507assert.Equal(host.Platform, req.Host.Platform)
508assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
509assert.Equal(host.KernelVersion, req.Host.KernelVersion)
510assert.EqualValues(host.CPU, mockCPU)
511assert.EqualValues(host.Memory, mockMemory)
512assert.EqualValues(host.Network, mockNetwork)
513assert.EqualValues(host.Disk, mockDisk)
514assert.EqualValues(host.Build, mockBuild)
515assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
516assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
517assert.Equal(host.UploadCount.Load(), int64(0))
518assert.Equal(host.UploadFailedCount.Load(), int64(0))
519assert.NotNil(host.Peers)
520assert.Equal(host.PeerCount.Load(), int32(0))
521assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
522assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
523assert.NotNil(host.Log)
524}).Return().Times(1),
525)
526
527assert := assert.New(t)
528assert.NoError(svc.AnnounceHost(context.Background(), req))
529},
530},
531{
532name: "host not found and dynconfig returns error",
533req: &schedulerv2.AnnounceHostRequest{
534Host: &commonv2.Host{
535Id: mockHostID,
536Type: uint32(pkgtypes.HostTypeNormal),
537Hostname: "hostname",
538Ip: "127.0.0.1",
539Port: 8003,
540DownloadPort: 8001,
541Os: "darwin",
542Platform: "darwin",
543PlatformFamily: "Standalone Workstation",
544PlatformVersion: "11.1",
545KernelVersion: "20.2.0",
546Cpu: &commonv2.CPU{
547LogicalCount: mockCPU.LogicalCount,
548PhysicalCount: mockCPU.PhysicalCount,
549Percent: mockCPU.Percent,
550ProcessPercent: mockCPU.ProcessPercent,
551Times: &commonv2.CPUTimes{
552User: mockCPU.Times.User,
553System: mockCPU.Times.System,
554Idle: mockCPU.Times.Idle,
555Nice: mockCPU.Times.Nice,
556Iowait: mockCPU.Times.Iowait,
557Irq: mockCPU.Times.Irq,
558Softirq: mockCPU.Times.Softirq,
559Steal: mockCPU.Times.Steal,
560Guest: mockCPU.Times.Guest,
561GuestNice: mockCPU.Times.GuestNice,
562},
563},
564Memory: &commonv2.Memory{
565Total: mockMemory.Total,
566Available: mockMemory.Available,
567Used: mockMemory.Used,
568UsedPercent: mockMemory.UsedPercent,
569ProcessUsedPercent: mockMemory.ProcessUsedPercent,
570Free: mockMemory.Free,
571},
572Network: &commonv2.Network{
573TcpConnectionCount: mockNetwork.TCPConnectionCount,
574UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
575Location: &mockNetwork.Location,
576Idc: &mockNetwork.IDC,
577},
578Disk: &commonv2.Disk{
579Total: mockDisk.Total,
580Free: mockDisk.Free,
581Used: mockDisk.Used,
582UsedPercent: mockDisk.UsedPercent,
583InodesTotal: mockDisk.InodesTotal,
584InodesUsed: mockDisk.InodesUsed,
585InodesFree: mockDisk.InodesFree,
586InodesUsedPercent: mockDisk.InodesUsedPercent,
587},
588Build: &commonv2.Build{
589GitVersion: mockBuild.GitVersion,
590GitCommit: &mockBuild.GitCommit,
591GoVersion: &mockBuild.GoVersion,
592Platform: &mockBuild.Platform,
593},
594},
595},
596run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
597gomock.InOrder(
598md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1),
599mr.HostManager().Return(hostManager).Times(1),
600mh.Load(gomock.Any()).Return(nil, false).Times(1),
601mr.HostManager().Return(hostManager).Times(1),
602mh.Store(gomock.Any()).Do(func(host *resource.Host) {
603assert := assert.New(t)
604assert.Equal(host.ID, req.Host.Id)
605assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
606assert.Equal(host.Hostname, req.Host.Hostname)
607assert.Equal(host.IP, req.Host.Ip)
608assert.Equal(host.Port, req.Host.Port)
609assert.Equal(host.DownloadPort, req.Host.DownloadPort)
610assert.Equal(host.OS, req.Host.Os)
611assert.Equal(host.Platform, req.Host.Platform)
612assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
613assert.Equal(host.KernelVersion, req.Host.KernelVersion)
614assert.EqualValues(host.CPU, mockCPU)
615assert.EqualValues(host.Memory, mockMemory)
616assert.EqualValues(host.Network, mockNetwork)
617assert.EqualValues(host.Disk, mockDisk)
618assert.EqualValues(host.Build, mockBuild)
619assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
620assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
621assert.Equal(host.UploadCount.Load(), int64(0))
622assert.Equal(host.UploadFailedCount.Load(), int64(0))
623assert.NotNil(host.Peers)
624assert.Equal(host.PeerCount.Load(), int32(0))
625assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
626assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
627assert.NotNil(host.Log)
628}).Return().Times(1),
629)
630
631assert := assert.New(t)
632assert.NoError(svc.AnnounceHost(context.Background(), req))
633},
634},
635{
636name: "host already exists",
637req: &schedulerv2.AnnounceHostRequest{
638Host: &commonv2.Host{
639Id: mockHostID,
640Type: uint32(pkgtypes.HostTypeNormal),
641Hostname: "foo",
642Ip: "127.0.0.1",
643Port: 8003,
644DownloadPort: 8001,
645Os: "darwin",
646Platform: "darwin",
647PlatformFamily: "Standalone Workstation",
648PlatformVersion: "11.1",
649KernelVersion: "20.2.0",
650Cpu: &commonv2.CPU{
651LogicalCount: mockCPU.LogicalCount,
652PhysicalCount: mockCPU.PhysicalCount,
653Percent: mockCPU.Percent,
654ProcessPercent: mockCPU.ProcessPercent,
655Times: &commonv2.CPUTimes{
656User: mockCPU.Times.User,
657System: mockCPU.Times.System,
658Idle: mockCPU.Times.Idle,
659Nice: mockCPU.Times.Nice,
660Iowait: mockCPU.Times.Iowait,
661Irq: mockCPU.Times.Irq,
662Softirq: mockCPU.Times.Softirq,
663Steal: mockCPU.Times.Steal,
664Guest: mockCPU.Times.Guest,
665GuestNice: mockCPU.Times.GuestNice,
666},
667},
668Memory: &commonv2.Memory{
669Total: mockMemory.Total,
670Available: mockMemory.Available,
671Used: mockMemory.Used,
672UsedPercent: mockMemory.UsedPercent,
673ProcessUsedPercent: mockMemory.ProcessUsedPercent,
674Free: mockMemory.Free,
675},
676Network: &commonv2.Network{
677TcpConnectionCount: mockNetwork.TCPConnectionCount,
678UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
679Location: &mockNetwork.Location,
680Idc: &mockNetwork.IDC,
681},
682Disk: &commonv2.Disk{
683Total: mockDisk.Total,
684Free: mockDisk.Free,
685Used: mockDisk.Used,
686UsedPercent: mockDisk.UsedPercent,
687InodesTotal: mockDisk.InodesTotal,
688InodesUsed: mockDisk.InodesUsed,
689InodesFree: mockDisk.InodesFree,
690InodesUsedPercent: mockDisk.InodesUsedPercent,
691},
692Build: &commonv2.Build{
693GitVersion: mockBuild.GitVersion,
694GitCommit: &mockBuild.GitCommit,
695GoVersion: &mockBuild.GoVersion,
696Platform: &mockBuild.Platform,
697},
698},
699},
700run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
701gomock.InOrder(
702md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1),
703mr.HostManager().Return(hostManager).Times(1),
704mh.Load(gomock.Any()).Return(host, true).Times(1),
705)
706
707assert := assert.New(t)
708assert.NoError(svc.AnnounceHost(context.Background(), req))
709assert.Equal(host.ID, req.Host.Id)
710assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
711assert.Equal(host.Hostname, req.Host.Hostname)
712assert.Equal(host.IP, req.Host.Ip)
713assert.Equal(host.Port, req.Host.Port)
714assert.Equal(host.DownloadPort, req.Host.DownloadPort)
715assert.Equal(host.OS, req.Host.Os)
716assert.Equal(host.Platform, req.Host.Platform)
717assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
718assert.Equal(host.KernelVersion, req.Host.KernelVersion)
719assert.EqualValues(host.CPU, mockCPU)
720assert.EqualValues(host.Memory, mockMemory)
721assert.EqualValues(host.Network, mockNetwork)
722assert.EqualValues(host.Disk, mockDisk)
723assert.EqualValues(host.Build, mockBuild)
724assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
725assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
726assert.Equal(host.UploadCount.Load(), int64(0))
727assert.Equal(host.UploadFailedCount.Load(), int64(0))
728assert.NotNil(host.Peers)
729assert.Equal(host.PeerCount.Load(), int32(0))
730assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
731assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
732assert.NotNil(host.Log)
733},
734},
735{
736name: "host already exists and dynconfig returns error",
737req: &schedulerv2.AnnounceHostRequest{
738Host: &commonv2.Host{
739Id: mockHostID,
740Type: uint32(pkgtypes.HostTypeNormal),
741Hostname: "foo",
742Ip: "127.0.0.1",
743Port: 8003,
744DownloadPort: 8001,
745Os: "darwin",
746Platform: "darwin",
747PlatformFamily: "Standalone Workstation",
748PlatformVersion: "11.1",
749KernelVersion: "20.2.0",
750Cpu: &commonv2.CPU{
751LogicalCount: mockCPU.LogicalCount,
752PhysicalCount: mockCPU.PhysicalCount,
753Percent: mockCPU.Percent,
754ProcessPercent: mockCPU.ProcessPercent,
755Times: &commonv2.CPUTimes{
756User: mockCPU.Times.User,
757System: mockCPU.Times.System,
758Idle: mockCPU.Times.Idle,
759Nice: mockCPU.Times.Nice,
760Iowait: mockCPU.Times.Iowait,
761Irq: mockCPU.Times.Irq,
762Softirq: mockCPU.Times.Softirq,
763Steal: mockCPU.Times.Steal,
764Guest: mockCPU.Times.Guest,
765GuestNice: mockCPU.Times.GuestNice,
766},
767},
768Memory: &commonv2.Memory{
769Total: mockMemory.Total,
770Available: mockMemory.Available,
771Used: mockMemory.Used,
772UsedPercent: mockMemory.UsedPercent,
773ProcessUsedPercent: mockMemory.ProcessUsedPercent,
774Free: mockMemory.Free,
775},
776Network: &commonv2.Network{
777TcpConnectionCount: mockNetwork.TCPConnectionCount,
778UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
779Location: &mockNetwork.Location,
780Idc: &mockNetwork.IDC,
781},
782Disk: &commonv2.Disk{
783Total: mockDisk.Total,
784Free: mockDisk.Free,
785Used: mockDisk.Used,
786UsedPercent: mockDisk.UsedPercent,
787InodesTotal: mockDisk.InodesTotal,
788InodesUsed: mockDisk.InodesUsed,
789InodesFree: mockDisk.InodesFree,
790InodesUsedPercent: mockDisk.InodesUsedPercent,
791},
792Build: &commonv2.Build{
793GitVersion: mockBuild.GitVersion,
794GitCommit: &mockBuild.GitCommit,
795GoVersion: &mockBuild.GoVersion,
796Platform: &mockBuild.Platform,
797},
798},
799},
800run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
801gomock.InOrder(
802md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1),
803mr.HostManager().Return(hostManager).Times(1),
804mh.Load(gomock.Any()).Return(host, true).Times(1),
805)
806
807assert := assert.New(t)
808assert.NoError(svc.AnnounceHost(context.Background(), req))
809assert.Equal(host.ID, req.Host.Id)
810assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type))
811assert.Equal(host.Hostname, req.Host.Hostname)
812assert.Equal(host.IP, req.Host.Ip)
813assert.Equal(host.Port, req.Host.Port)
814assert.Equal(host.DownloadPort, req.Host.DownloadPort)
815assert.Equal(host.OS, req.Host.Os)
816assert.Equal(host.Platform, req.Host.Platform)
817assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)
818assert.Equal(host.KernelVersion, req.Host.KernelVersion)
819assert.EqualValues(host.CPU, mockCPU)
820assert.EqualValues(host.Memory, mockMemory)
821assert.EqualValues(host.Network, mockNetwork)
822assert.EqualValues(host.Disk, mockDisk)
823assert.EqualValues(host.Build, mockBuild)
824assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
825assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
826assert.Equal(host.UploadCount.Load(), int64(0))
827assert.Equal(host.UploadFailedCount.Load(), int64(0))
828assert.NotNil(host.Peers)
829assert.Equal(host.PeerCount.Load(), int32(0))
830assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0)
831assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0)
832assert.NotNil(host.Log)
833},
834},
835}
836
837for _, tc := range tests {
838t.Run(tc.name, func(t *testing.T) {
839ctl := gomock.NewController(t)
840defer ctl.Finish()
841scheduling := schedulingmocks.NewMockScheduling(ctl)
842res := resource.NewMockResource(ctl)
843dynconfig := configmocks.NewMockDynconfigInterface(ctl)
844storage := storagemocks.NewMockStorage(ctl)
845networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
846hostManager := resource.NewMockHostManager(ctl)
847host := resource.NewHost(
848mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
849mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
850svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
851
852tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
853})
854}
855}
856
857func TestServiceV2_LeaveHost(t *testing.T) {
858tests := []struct {
859name string
860mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder)
861expect func(t *testing.T, peer *resource.Peer, err error)
862}{
863{
864name: "host not found",
865mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
866gomock.InOrder(
867mr.HostManager().Return(hostManager).Times(1),
868mh.Load(gomock.Any()).Return(nil, false).Times(1),
869)
870},
871expect: func(t *testing.T, peer *resource.Peer, err error) {
872assert := assert.New(t)
873assert.Error(err)
874},
875},
876{
877name: "host has not peers",
878mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
879gomock.InOrder(
880mr.HostManager().Return(hostManager).Times(1),
881mh.Load(gomock.Any()).Return(host, true).Times(1),
882mnt.DeleteHost(host.ID).Return(nil).Times(1),
883)
884},
885expect: func(t *testing.T, peer *resource.Peer, err error) {
886assert := assert.New(t)
887assert.NoError(err)
888},
889},
890{
891name: "peer leaves succeeded",
892mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
893host.Peers.Store(mockPeer.ID, mockPeer)
894mockPeer.FSM.SetState(resource.PeerStatePending)
895gomock.InOrder(
896mr.HostManager().Return(hostManager).Times(1),
897mh.Load(gomock.Any()).Return(host, true).Times(1),
898mnt.DeleteHost(host.ID).Return(nil).Times(1),
899)
900},
901expect: func(t *testing.T, peer *resource.Peer, err error) {
902assert := assert.New(t)
903assert.NoError(err)
904assert.Equal(peer.FSM.Current(), resource.PeerStateLeave)
905},
906},
907}
908
909for _, tc := range tests {
910t.Run(tc.name, func(t *testing.T) {
911ctl := gomock.NewController(t)
912defer ctl.Finish()
913scheduling := schedulingmocks.NewMockScheduling(ctl)
914res := resource.NewMockResource(ctl)
915dynconfig := configmocks.NewMockDynconfigInterface(ctl)
916storage := storagemocks.NewMockStorage(ctl)
917networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
918hostManager := resource.NewMockHostManager(ctl)
919host := resource.NewHost(
920mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
921mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
922mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
923mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
924svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
925
926tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT())
927tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID}))
928})
929}
930}
931
932func TestServiceV2_SyncProbes(t *testing.T) {
933tests := []struct {
934name string
935mock func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
936mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
937ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder)
938expect func(t *testing.T, err error)
939}{
940{
941name: "network topology is not enabled",
942mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
943mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
944ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
945svc.networkTopology = nil
946},
947expect: func(t *testing.T, err error) {
948assert := assert.New(t)
949assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled")
950},
951},
952{
953name: "synchronize probes when receive ProbeStartedRequest",
954mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
955mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
956ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
957gomock.InOrder(
958ms.Recv().Return(&schedulerv2.SyncProbesRequest{
959Host: &commonv2.Host{
960Id: mockSeedHostID,
961Type: uint32(pkgtypes.HostTypeSuperSeed),
962Hostname: "bar",
963Ip: "127.0.0.1",
964Port: 8003,
965DownloadPort: 8001,
966Os: "darwin",
967Platform: "darwin",
968PlatformFamily: "Standalone Workstation",
969PlatformVersion: "11.1",
970KernelVersion: "20.2.0",
971Cpu: mockV2Probe.Host.Cpu,
972Memory: mockV2Probe.Host.Memory,
973Network: mockV2Probe.Host.Network,
974Disk: mockV2Probe.Host.Disk,
975Build: mockV2Probe.Host.Build,
976},
977Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
978ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
979},
980}, nil).Times(1),
981mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
982ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
983Hosts: []*commonv2.Host{
984{
985Id: mockRawHost.ID,
986Type: uint32(mockRawHost.Type),
987Hostname: mockRawHost.Hostname,
988Ip: mockRawHost.IP,
989Port: mockRawHost.Port,
990DownloadPort: mockRawHost.DownloadPort,
991Os: mockRawHost.OS,
992Platform: mockRawHost.Platform,
993PlatformFamily: mockRawHost.PlatformFamily,
994PlatformVersion: mockRawHost.PlatformVersion,
995KernelVersion: mockRawHost.KernelVersion,
996Cpu: mockV2Probe.Host.Cpu,
997Memory: mockV2Probe.Host.Memory,
998Network: mockV2Probe.Host.Network,
999Disk: mockV2Probe.Host.Disk,
1000Build: mockV2Probe.Host.Build,
1001},
1002},
1003})).Return(nil).Times(1),
1004ms.Recv().Return(nil, io.EOF).Times(1),
1005)
1006},
1007expect: func(t *testing.T, err error) {
1008assert := assert.New(t)
1009assert.NoError(err)
1010},
1011},
1012{
1013name: "synchronize probes when receive ProbeFinishedRequest",
1014mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1015mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1016ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1017gomock.InOrder(
1018ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1019Host: &commonv2.Host{
1020Id: mockSeedHostID,
1021Type: uint32(pkgtypes.HostTypeSuperSeed),
1022Hostname: "bar",
1023Ip: "127.0.0.1",
1024Port: 8003,
1025DownloadPort: 8001,
1026Os: "darwin",
1027Platform: "darwin",
1028PlatformFamily: "Standalone Workstation",
1029PlatformVersion: "11.1",
1030KernelVersion: "20.2.0",
1031Cpu: mockV2Probe.Host.Cpu,
1032Memory: mockV2Probe.Host.Memory,
1033Network: mockV2Probe.Host.Network,
1034Disk: mockV2Probe.Host.Disk,
1035Build: mockV2Probe.Host.Build,
1036},
1037Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1038ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1039Probes: []*schedulerv2.Probe{mockV2Probe},
1040},
1041},
1042}, nil).Times(1),
1043mr.HostManager().Return(hostManager).Times(1),
1044mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1045mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
1046mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
1047mp.Enqueue(gomock.Eq(&networktopology.Probe{
1048Host: &mockRawHost,
1049RTT: mockV2Probe.Rtt.AsDuration(),
1050CreatedAt: mockV2Probe.CreatedAt.AsTime(),
1051})).Return(nil).Times(1),
1052ms.Recv().Return(nil, io.EOF).Times(1),
1053)
1054},
1055expect: func(t *testing.T, err error) {
1056assert := assert.New(t)
1057assert.NoError(err)
1058},
1059},
1060{
1061name: "synchronize probes when receive ProbeFailedRequest",
1062mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1063mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1064ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1065gomock.InOrder(
1066ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1067Host: &commonv2.Host{
1068Id: mockSeedHostID,
1069Type: uint32(pkgtypes.HostTypeSuperSeed),
1070Hostname: "bar",
1071Ip: "127.0.0.1",
1072Port: 8003,
1073DownloadPort: 8001,
1074Os: "darwin",
1075Platform: "darwin",
1076PlatformFamily: "Standalone Workstation",
1077PlatformVersion: "11.1",
1078KernelVersion: "20.2.0",
1079Cpu: mockV2Probe.Host.Cpu,
1080Memory: mockV2Probe.Host.Memory,
1081Network: mockV2Probe.Host.Network,
1082Disk: mockV2Probe.Host.Disk,
1083Build: mockV2Probe.Host.Build,
1084},
1085Request: &schedulerv2.SyncProbesRequest_ProbeFailedRequest{
1086ProbeFailedRequest: &schedulerv2.ProbeFailedRequest{
1087Probes: []*schedulerv2.FailedProbe{
1088{
1089Host: &commonv2.Host{
1090Id: mockRawHost.ID,
1091Type: uint32(mockRawHost.Type),
1092Hostname: mockRawHost.Hostname,
1093Ip: mockRawHost.IP,
1094Port: mockRawHost.Port,
1095DownloadPort: mockRawHost.DownloadPort,
1096Os: mockRawHost.OS,
1097Platform: mockRawHost.Platform,
1098PlatformFamily: mockRawHost.PlatformFamily,
1099PlatformVersion: mockRawHost.PlatformVersion,
1100KernelVersion: mockRawHost.KernelVersion,
1101Cpu: mockV2Probe.Host.Cpu,
1102Memory: mockV2Probe.Host.Memory,
1103Network: mockV2Probe.Host.Network,
1104Disk: mockV2Probe.Host.Disk,
1105Build: mockV2Probe.Host.Build,
1106},
1107},
1108},
1109},
1110},
1111}, nil).Times(1),
1112ms.Recv().Return(nil, io.EOF).Times(1),
1113)
1114},
1115expect: func(t *testing.T, err error) {
1116assert := assert.New(t)
1117assert.NoError(err)
1118},
1119},
1120{
1121name: "synchronize probes when receive fail type request",
1122mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1123mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1124ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1125ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1126Host: &commonv2.Host{
1127Id: mockSeedHostID,
1128Type: uint32(pkgtypes.HostTypeSuperSeed),
1129Hostname: "bar",
1130Ip: "127.0.0.1",
1131Port: 8003,
1132DownloadPort: 8001,
1133Os: "darwin",
1134Platform: "darwin",
1135PlatformFamily: "Standalone Workstation",
1136PlatformVersion: "11.1",
1137KernelVersion: "20.2.0",
1138Cpu: mockV2Probe.Host.Cpu,
1139Memory: mockV2Probe.Host.Memory,
1140Network: mockV2Probe.Host.Network,
1141Disk: mockV2Probe.Host.Disk,
1142Build: mockV2Probe.Host.Build,
1143},
1144Request: nil,
1145}, nil).Times(1)
1146},
1147expect: func(t *testing.T, err error) {
1148assert := assert.New(t)
1149assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: <nil>")
1150},
1151},
1152{
1153name: "receive error",
1154mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1155mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1156ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1157ms.Recv().Return(nil, errors.New("receive error")).Times(1)
1158},
1159expect: func(t *testing.T, err error) {
1160assert := assert.New(t)
1161assert.EqualError(err, "receive error")
1162},
1163},
1164{
1165name: "receive end of file",
1166mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1167mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1168ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1169ms.Recv().Return(nil, io.EOF).Times(1)
1170},
1171expect: func(t *testing.T, err error) {
1172assert := assert.New(t)
1173assert.NoError(err)
1174},
1175},
1176{
1177name: "find probed host ids error",
1178mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1179mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1180ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1181gomock.InOrder(
1182ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1183Host: &commonv2.Host{
1184Id: mockSeedHostID,
1185Type: uint32(pkgtypes.HostTypeSuperSeed),
1186Hostname: "bar",
1187Ip: "127.0.0.1",
1188Port: 8003,
1189DownloadPort: 8001,
1190Os: "darwin",
1191Platform: "darwin",
1192PlatformFamily: "Standalone Workstation",
1193PlatformVersion: "11.1",
1194KernelVersion: "20.2.0",
1195Cpu: mockV2Probe.Host.Cpu,
1196Memory: mockV2Probe.Host.Memory,
1197Network: mockV2Probe.Host.Network,
1198Disk: mockV2Probe.Host.Disk,
1199Build: mockV2Probe.Host.Build,
1200},
1201Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
1202ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
1203},
1204}, nil).Times(1),
1205mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
1206)
1207},
1208expect: func(t *testing.T, err error) {
1209assert := assert.New(t)
1210assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
1211},
1212},
1213{
1214name: "send synchronize probes response error",
1215mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1216mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1217ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1218gomock.InOrder(
1219ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1220Host: &commonv2.Host{
1221Id: mockSeedHostID,
1222Type: uint32(pkgtypes.HostTypeSuperSeed),
1223Hostname: "bar",
1224Ip: "127.0.0.1",
1225Port: 8003,
1226DownloadPort: 8001,
1227Os: "darwin",
1228Platform: "darwin",
1229PlatformFamily: "Standalone Workstation",
1230PlatformVersion: "11.1",
1231KernelVersion: "20.2.0",
1232Cpu: mockV2Probe.Host.Cpu,
1233Memory: mockV2Probe.Host.Memory,
1234Network: mockV2Probe.Host.Network,
1235Disk: mockV2Probe.Host.Disk,
1236Build: mockV2Probe.Host.Build,
1237},
1238Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
1239ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
1240},
1241}, nil).Times(1),
1242mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
1243ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
1244Hosts: []*commonv2.Host{
1245{
1246Id: mockRawHost.ID,
1247Type: uint32(mockRawHost.Type),
1248Hostname: mockRawHost.Hostname,
1249Ip: mockRawHost.IP,
1250Port: mockRawHost.Port,
1251DownloadPort: mockRawHost.DownloadPort,
1252Os: mockRawHost.OS,
1253Platform: mockRawHost.Platform,
1254PlatformFamily: mockRawHost.PlatformFamily,
1255PlatformVersion: mockRawHost.PlatformVersion,
1256KernelVersion: mockRawHost.KernelVersion,
1257Cpu: mockV2Probe.Host.Cpu,
1258Memory: mockV2Probe.Host.Memory,
1259Network: mockV2Probe.Host.Network,
1260Disk: mockV2Probe.Host.Disk,
1261Build: mockV2Probe.Host.Build,
1262},
1263},
1264})).Return(errors.New("send synchronize probes response error")).Times(1),
1265)
1266},
1267expect: func(t *testing.T, err error) {
1268assert := assert.New(t)
1269assert.EqualError(err, "send synchronize probes response error")
1270},
1271},
1272{
1273name: "load host error when receive ProbeFinishedRequest",
1274mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1275mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1276ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1277gomock.InOrder(
1278ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1279Host: &commonv2.Host{
1280Id: mockSeedHostID,
1281Type: uint32(pkgtypes.HostTypeSuperSeed),
1282Hostname: "bar",
1283Ip: "127.0.0.1",
1284Port: 8003,
1285DownloadPort: 8001,
1286Os: "darwin",
1287Platform: "darwin",
1288PlatformFamily: "Standalone Workstation",
1289PlatformVersion: "11.1",
1290KernelVersion: "20.2.0",
1291Cpu: mockV2Probe.Host.Cpu,
1292Memory: mockV2Probe.Host.Memory,
1293Network: mockV2Probe.Host.Network,
1294Disk: mockV2Probe.Host.Disk,
1295Build: mockV2Probe.Host.Build,
1296},
1297Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1298ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1299Probes: []*schedulerv2.Probe{mockV2Probe},
1300},
1301},
1302}, nil).Times(1),
1303mr.HostManager().Return(hostManager).Times(1),
1304mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
1305ms.Recv().Return(nil, io.EOF).Times(1),
1306)
1307},
1308expect: func(t *testing.T, err error) {
1309assert := assert.New(t)
1310assert.NoError(err)
1311},
1312},
1313{
1314name: "store error when receive ProbeFinishedRequest",
1315mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1316mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1317ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1318gomock.InOrder(
1319ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1320Host: &commonv2.Host{
1321Id: mockSeedHostID,
1322Type: uint32(pkgtypes.HostTypeSuperSeed),
1323Hostname: "bar",
1324Ip: "127.0.0.1",
1325Port: 8003,
1326DownloadPort: 8001,
1327Os: "darwin",
1328Platform: "darwin",
1329PlatformFamily: "Standalone Workstation",
1330PlatformVersion: "11.1",
1331KernelVersion: "20.2.0",
1332Cpu: mockV2Probe.Host.Cpu,
1333Memory: mockV2Probe.Host.Memory,
1334Network: mockV2Probe.Host.Network,
1335Disk: mockV2Probe.Host.Disk,
1336Build: mockV2Probe.Host.Build,
1337},
1338Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1339ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1340Probes: []*schedulerv2.Probe{mockV2Probe},
1341},
1342},
1343}, nil).Times(1),
1344mr.HostManager().Return(hostManager).Times(1),
1345mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1346mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1),
1347ms.Recv().Return(nil, io.EOF).Times(1),
1348)
1349},
1350expect: func(t *testing.T, err error) {
1351assert := assert.New(t)
1352assert.NoError(err)
1353},
1354},
1355{
1356name: "enqueue probe error when receive ProbeFinishedRequest",
1357mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
1358mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
1359ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
1360gomock.InOrder(
1361ms.Recv().Return(&schedulerv2.SyncProbesRequest{
1362Host: &commonv2.Host{
1363Id: mockSeedHostID,
1364Type: uint32(pkgtypes.HostTypeSuperSeed),
1365Hostname: "bar",
1366Ip: "127.0.0.1",
1367Port: 8003,
1368DownloadPort: 8001,
1369Os: "darwin",
1370Platform: "darwin",
1371PlatformFamily: "Standalone Workstation",
1372PlatformVersion: "11.1",
1373KernelVersion: "20.2.0",
1374Cpu: mockV2Probe.Host.Cpu,
1375Memory: mockV2Probe.Host.Memory,
1376Network: mockV2Probe.Host.Network,
1377Disk: mockV2Probe.Host.Disk,
1378Build: mockV2Probe.Host.Build,
1379},
1380Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
1381ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
1382Probes: []*schedulerv2.Probe{mockV2Probe},
1383},
1384},
1385}, nil).Times(1),
1386mr.HostManager().Return(hostManager).Times(1),
1387mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
1388mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
1389mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
1390mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1),
1391ms.Recv().Return(nil, io.EOF).Times(1),
1392)
1393},
1394expect: func(t *testing.T, err error) {
1395assert := assert.New(t)
1396assert.NoError(err)
1397},
1398},
1399}
1400
1401for _, tc := range tests {
1402t.Run(tc.name, func(t *testing.T) {
1403ctl := gomock.NewController(t)
1404defer ctl.Finish()
1405
1406scheduling := mocks.NewMockScheduling(ctl)
1407res := resource.NewMockResource(ctl)
1408dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1409storage := storagemocks.NewMockStorage(ctl)
1410probes := networktopologymocks.NewMockProbes(ctl)
1411networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1412hostManager := resource.NewMockHostManager(ctl)
1413stream := schedulerv2mocks.NewMockScheduler_SyncProbesServer(ctl)
1414svc := NewV2(&config.Config{Scheduler: config.SchedulerConfig{NetworkTopology: mockNetworkTopologyConfig}, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
1415
1416tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT())
1417tc.expect(t, svc.SyncProbes(stream))
1418})
1419}
1420}
1421
1422func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
1423dgst := mockTaskDigest.String()
1424
1425tests := []struct {
1426name string
1427req *schedulerv2.RegisterPeerRequest
1428run func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1429peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1430mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
1431}{
1432{
1433name: "host not found",
1434req: &schedulerv2.RegisterPeerRequest{},
1435run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1436peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1437mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1438gomock.InOrder(
1439mr.HostManager().Return(hostManager).Times(1),
1440mh.Load(gomock.Eq(peer.Host.ID)).Return(nil, false).Times(1),
1441)
1442
1443assert := assert.New(t)
1444assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1445status.Errorf(codes.NotFound, "host %s not found", peer.Host.ID))
1446},
1447},
1448{
1449name: "can not found available peer and download task failed",
1450req: &schedulerv2.RegisterPeerRequest{
1451Download: &commonv2.Download{
1452Digest: &dgst,
1453},
1454},
1455run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1456peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1457mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1458gomock.InOrder(
1459mr.HostManager().Return(hostManager).Times(1),
1460mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1461mr.TaskManager().Return(taskManager).Times(1),
1462mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1463mr.PeerManager().Return(peerManager).Times(1),
1464mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1465)
1466
1467peer.Priority = commonv2.Priority_LEVEL1
1468
1469assert := assert.New(t)
1470assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1471status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
1472},
1473},
1474{
1475name: "task state is TaskStateFailed and download task failed",
1476req: &schedulerv2.RegisterPeerRequest{
1477Download: &commonv2.Download{
1478Digest: &dgst,
1479},
1480},
1481run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1482peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1483mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1484gomock.InOrder(
1485mr.HostManager().Return(hostManager).Times(1),
1486mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1487mr.TaskManager().Return(taskManager).Times(1),
1488mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1489mr.PeerManager().Return(peerManager).Times(1),
1490mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1491)
1492
1493peer.Priority = commonv2.Priority_LEVEL1
1494peer.Task.FSM.SetState(resource.TaskStateFailed)
1495peer.Task.StorePeer(peer)
1496peer.Task.StorePeer(seedPeer)
1497seedPeer.FSM.SetState(resource.PeerStateRunning)
1498
1499assert := assert.New(t)
1500assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), stream, peer.Host.ID, peer.Task.ID, peer.ID, req),
1501status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
1502},
1503},
1504{
1505name: "size scope is SizeScope_EMPTY and load AnnouncePeerStream failed",
1506req: &schedulerv2.RegisterPeerRequest{
1507Download: &commonv2.Download{
1508Digest: &dgst,
1509},
1510},
1511run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1512peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1513mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1514gomock.InOrder(
1515mr.HostManager().Return(hostManager).Times(1),
1516mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1517mr.TaskManager().Return(taskManager).Times(1),
1518mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1519mr.PeerManager().Return(peerManager).Times(1),
1520mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1521)
1522
1523peer.Task.ContentLength.Store(0)
1524peer.Priority = commonv2.Priority_LEVEL6
1525
1526assert := assert.New(t)
1527assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1528status.Error(codes.NotFound, "AnnouncePeerStream not found"))
1529assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
1530assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1531},
1532},
1533{
1534name: "size scope is SizeScope_EMPTY and event PeerEventRegisterEmpty failed",
1535req: &schedulerv2.RegisterPeerRequest{
1536Download: &commonv2.Download{
1537Digest: &dgst,
1538},
1539},
1540run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1541peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1542mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1543gomock.InOrder(
1544mr.HostManager().Return(hostManager).Times(1),
1545mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1546mr.TaskManager().Return(taskManager).Times(1),
1547mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1548mr.PeerManager().Return(peerManager).Times(1),
1549mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1550)
1551
1552peer.Task.ContentLength.Store(0)
1553peer.Priority = commonv2.Priority_LEVEL6
1554peer.StoreAnnouncePeerStream(stream)
1555peer.FSM.SetState(resource.PeerStateReceivedEmpty)
1556
1557assert := assert.New(t)
1558assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1559status.Errorf(codes.Internal, "event RegisterEmpty inappropriate in current state ReceivedEmpty"))
1560assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1561},
1562},
1563{
1564name: "size scope is SizeScope_EMPTY and send EmptyTaskResponse failed",
1565req: &schedulerv2.RegisterPeerRequest{
1566Download: &commonv2.Download{
1567Digest: &dgst,
1568},
1569},
1570run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1571peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1572mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1573gomock.InOrder(
1574mr.HostManager().Return(hostManager).Times(1),
1575mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1576mr.TaskManager().Return(taskManager).Times(1),
1577mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1578mr.PeerManager().Return(peerManager).Times(1),
1579mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1580ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
1581Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{
1582EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},
1583},
1584})).Return(errors.New("foo")).Times(1),
1585)
1586
1587peer.Task.ContentLength.Store(0)
1588peer.Priority = commonv2.Priority_LEVEL6
1589peer.StoreAnnouncePeerStream(stream)
1590
1591assert := assert.New(t)
1592assert.ErrorIs(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req),
1593status.Errorf(codes.Internal, "foo"))
1594assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedEmpty)
1595assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1596},
1597},
1598{
1599name: "size scope is SizeScope_NORMAL and need back-to-source",
1600req: &schedulerv2.RegisterPeerRequest{
1601Download: &commonv2.Download{
1602Digest: &dgst,
1603NeedBackToSource: true,
1604},
1605},
1606run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1607peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1608mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1609gomock.InOrder(
1610mr.HostManager().Return(hostManager).Times(1),
1611mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1612mr.TaskManager().Return(taskManager).Times(1),
1613mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1614mr.PeerManager().Return(peerManager).Times(1),
1615mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1616ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1617)
1618
1619peer.Task.ContentLength.Store(129)
1620peer.Task.TotalPieceCount.Store(2)
1621peer.Task.StorePeer(peer)
1622peer.Task.StorePeer(seedPeer)
1623peer.Priority = commonv2.Priority_LEVEL6
1624peer.NeedBackToSource.Store(true)
1625peer.StoreAnnouncePeerStream(stream)
1626
1627assert := assert.New(t)
1628assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1629assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1630assert.Equal(peer.NeedBackToSource.Load(), true)
1631assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1632},
1633},
1634{
1635name: "size scope is SizeScope_NORMAL",
1636req: &schedulerv2.RegisterPeerRequest{
1637Download: &commonv2.Download{
1638Digest: &dgst,
1639},
1640},
1641run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1642peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1643mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1644gomock.InOrder(
1645mr.HostManager().Return(hostManager).Times(1),
1646mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1647mr.TaskManager().Return(taskManager).Times(1),
1648mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1649mr.PeerManager().Return(peerManager).Times(1),
1650mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1651ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1652)
1653
1654peer.Task.ContentLength.Store(129)
1655peer.Task.TotalPieceCount.Store(2)
1656peer.Task.StorePeer(peer)
1657peer.Task.StorePeer(seedPeer)
1658peer.Priority = commonv2.Priority_LEVEL6
1659peer.StoreAnnouncePeerStream(stream)
1660
1661assert := assert.New(t)
1662assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1663assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1664assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1665},
1666},
1667{
1668name: "size scope is SizeScope_UNKNOW",
1669req: &schedulerv2.RegisterPeerRequest{
1670Download: &commonv2.Download{
1671Digest: &dgst,
1672},
1673},
1674run: func(t *testing.T, svc *V2, req *schedulerv2.RegisterPeerRequest, peer *resource.Peer, seedPeer *resource.Peer, hostManager resource.HostManager, taskManager resource.TaskManager,
1675peerManager resource.PeerManager, stream schedulerv2.Scheduler_AnnouncePeerServer, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
1676mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1677gomock.InOrder(
1678mr.HostManager().Return(hostManager).Times(1),
1679mh.Load(gomock.Eq(peer.Host.ID)).Return(peer.Host, true).Times(1),
1680mr.TaskManager().Return(taskManager).Times(1),
1681mt.Load(gomock.Eq(peer.Task.ID)).Return(peer.Task, true).Times(1),
1682mr.PeerManager().Return(peerManager).Times(1),
1683mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1684ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1685)
1686
1687peer.Priority = commonv2.Priority_LEVEL6
1688
1689assert := assert.New(t)
1690assert.NoError(svc.handleRegisterPeerRequest(context.Background(), nil, peer.Host.ID, peer.Task.ID, peer.ID, req))
1691assert.Equal(peer.FSM.Current(), resource.PeerStateReceivedNormal)
1692assert.Equal(peer.Task.FSM.Current(), resource.TaskStateRunning)
1693},
1694},
1695}
1696
1697for _, tc := range tests {
1698t.Run(tc.name, func(t *testing.T) {
1699ctl := gomock.NewController(t)
1700defer ctl.Finish()
1701scheduling := schedulingmocks.NewMockScheduling(ctl)
1702res := resource.NewMockResource(ctl)
1703dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1704storage := storagemocks.NewMockStorage(ctl)
1705networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1706hostManager := resource.NewMockHostManager(ctl)
1707peerManager := resource.NewMockPeerManager(ctl)
1708taskManager := resource.NewMockTaskManager(ctl)
1709stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
1710
1711mockHost := resource.NewHost(
1712mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1713mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1714mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1715peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1716seedPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, mockHost)
1717svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1718
1719tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
1720})
1721}
1722}
1723
1724func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
1725tests := []struct {
1726name string
1727run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1728}{
1729{
1730name: "peer can not be loaded",
1731run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1732gomock.InOrder(
1733mr.PeerManager().Return(peerManager).Times(1),
1734mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1735)
1736
1737assert := assert.New(t)
1738assert.ErrorIs(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1739},
1740},
1741{
1742name: "peer state is PeerStateRunning",
1743run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1744gomock.InOrder(
1745mr.PeerManager().Return(peerManager).Times(1),
1746mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1747md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1748)
1749
1750peer.FSM.SetState(resource.PeerStateRunning)
1751
1752assert := assert.New(t)
1753assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1754},
1755},
1756{
1757name: "task state is TaskStateRunning",
1758run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1759gomock.InOrder(
1760mr.PeerManager().Return(peerManager).Times(1),
1761mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1762md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1763)
1764
1765peer.FSM.SetState(resource.PeerStateReceivedNormal)
1766peer.Task.FSM.SetState(resource.TaskStateRunning)
1767
1768assert := assert.New(t)
1769assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1770assert.NotEqual(peer.UpdatedAt.Load(), 0)
1771assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1772},
1773},
1774{
1775name: "task state is TaskStatePending",
1776run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1777gomock.InOrder(
1778mr.PeerManager().Return(peerManager).Times(1),
1779mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1780md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1781)
1782
1783peer.FSM.SetState(resource.PeerStateReceivedNormal)
1784peer.Task.FSM.SetState(resource.TaskStatePending)
1785
1786assert := assert.New(t)
1787assert.NoError(svc.handleDownloadPeerStartedRequest(context.Background(), peer.ID))
1788assert.NotEqual(peer.UpdatedAt.Load(), 0)
1789assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1790},
1791},
1792}
1793
1794for _, tc := range tests {
1795t.Run(tc.name, func(t *testing.T) {
1796ctl := gomock.NewController(t)
1797defer ctl.Finish()
1798scheduling := schedulingmocks.NewMockScheduling(ctl)
1799res := resource.NewMockResource(ctl)
1800dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1801storage := storagemocks.NewMockStorage(ctl)
1802networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1803peerManager := resource.NewMockPeerManager(ctl)
1804
1805mockHost := resource.NewHost(
1806mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1807mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1808mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1809peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1810svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1811
1812tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
1813})
1814}
1815}
1816
1817func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
1818tests := []struct {
1819name string
1820run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1821}{
1822{
1823name: "peer can not be loaded",
1824run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1825gomock.InOrder(
1826mr.PeerManager().Return(peerManager).Times(1),
1827mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1828)
1829
1830assert := assert.New(t)
1831assert.ErrorIs(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1832},
1833},
1834{
1835name: "peer state is PeerStateRunning",
1836run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1837gomock.InOrder(
1838mr.PeerManager().Return(peerManager).Times(1),
1839mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1840md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1841)
1842
1843peer.FSM.SetState(resource.PeerStateBackToSource)
1844
1845assert := assert.New(t)
1846assert.ErrorIs(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadBackToSource inappropriate in current state BackToSource"))
1847},
1848},
1849{
1850name: "task state is TaskStateRunning",
1851run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1852gomock.InOrder(
1853mr.PeerManager().Return(peerManager).Times(1),
1854mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1855md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1856)
1857
1858peer.FSM.SetState(resource.PeerStateReceivedNormal)
1859peer.Task.FSM.SetState(resource.TaskStateRunning)
1860
1861assert := assert.New(t)
1862assert.NoError(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID))
1863assert.NotEqual(peer.UpdatedAt.Load(), 0)
1864assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1865},
1866},
1867{
1868name: "task state is TaskStatePending",
1869run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1870gomock.InOrder(
1871mr.PeerManager().Return(peerManager).Times(1),
1872mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1873md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
1874)
1875
1876peer.FSM.SetState(resource.PeerStateReceivedNormal)
1877peer.Task.FSM.SetState(resource.TaskStatePending)
1878
1879assert := assert.New(t)
1880assert.NoError(svc.handleDownloadPeerBackToSourceStartedRequest(context.Background(), peer.ID))
1881assert.NotEqual(peer.UpdatedAt.Load(), 0)
1882assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
1883},
1884},
1885}
1886
1887for _, tc := range tests {
1888t.Run(tc.name, func(t *testing.T) {
1889ctl := gomock.NewController(t)
1890defer ctl.Finish()
1891scheduling := schedulingmocks.NewMockScheduling(ctl)
1892res := resource.NewMockResource(ctl)
1893dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1894storage := storagemocks.NewMockStorage(ctl)
1895networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1896peerManager := resource.NewMockPeerManager(ctl)
1897
1898mockHost := resource.NewHost(
1899mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1900mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1901mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1902peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1903svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1904
1905tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
1906})
1907}
1908}
1909
1910func TestServiceV2_handleRescheduleRequest(t *testing.T) {
1911tests := []struct {
1912name string
1913run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1914mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
1915}{
1916{
1917name: "peer can not be loaded",
1918run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1919mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1920gomock.InOrder(
1921mr.PeerManager().Return(peerManager).Times(1),
1922mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1923)
1924
1925assert := assert.New(t)
1926assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1927},
1928},
1929{
1930name: "reschedule failed",
1931run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1932mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1933gomock.InOrder(
1934mr.PeerManager().Return(peerManager).Times(1),
1935mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1936ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
1937)
1938
1939assert := assert.New(t)
1940assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}), status.Error(codes.FailedPrecondition, "foo"))
1941},
1942},
1943{
1944name: "reschedule succeeded",
1945run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
1946mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
1947gomock.InOrder(
1948mr.PeerManager().Return(peerManager).Times(1),
1949mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
1950ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
1951)
1952
1953assert := assert.New(t)
1954assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID, []*commonv2.Peer{}))
1955},
1956},
1957}
1958
1959for _, tc := range tests {
1960t.Run(tc.name, func(t *testing.T) {
1961ctl := gomock.NewController(t)
1962defer ctl.Finish()
1963scheduling := schedulingmocks.NewMockScheduling(ctl)
1964res := resource.NewMockResource(ctl)
1965dynconfig := configmocks.NewMockDynconfigInterface(ctl)
1966storage := storagemocks.NewMockStorage(ctl)
1967networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
1968peerManager := resource.NewMockPeerManager(ctl)
1969
1970mockHost := resource.NewHost(
1971mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
1972mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
1973mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
1974peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
1975svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
1976
1977tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
1978})
1979}
1980}
1981
1982func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
1983tests := []struct {
1984name string
1985run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
1986}{
1987{
1988name: "peer can not be loaded",
1989run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
1990gomock.InOrder(
1991mr.PeerManager().Return(peerManager).Times(1),
1992mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
1993)
1994
1995assert := assert.New(t)
1996assert.ErrorIs(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
1997},
1998},
1999{
2000name: "peer state is PeerStateSucceeded",
2001run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2002gomock.InOrder(
2003mr.PeerManager().Return(peerManager).Times(1),
2004mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2005)
2006
2007peer.FSM.SetState(resource.PeerStateSucceeded)
2008
2009assert := assert.New(t)
2010assert.ErrorIs(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Succeeded"))
2011assert.NotEqual(peer.Cost.Load(), 0)
2012},
2013},
2014{
2015name: "peer state is PeerStateRunning",
2016run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2017gomock.InOrder(
2018mr.PeerManager().Return(peerManager).Times(1),
2019mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2020md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2021)
2022
2023peer.FSM.SetState(resource.PeerStateRunning)
2024
2025assert := assert.New(t)
2026assert.NoError(svc.handleDownloadPeerFinishedRequest(context.Background(), peer.ID))
2027assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2028assert.NotEqual(peer.Cost.Load(), 0)
2029},
2030},
2031}
2032
2033for _, tc := range tests {
2034t.Run(tc.name, func(t *testing.T) {
2035ctl := gomock.NewController(t)
2036defer ctl.Finish()
2037scheduling := schedulingmocks.NewMockScheduling(ctl)
2038res := resource.NewMockResource(ctl)
2039dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2040storage := storagemocks.NewMockStorage(ctl)
2041networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2042peerManager := resource.NewMockPeerManager(ctl)
2043
2044mockHost := resource.NewHost(
2045mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2046mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2047mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2048peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2049svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2050
2051tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2052})
2053}
2054}
2055
2056func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
2057s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2058if _, err := w.Write([]byte{1}); err != nil {
2059w.WriteHeader(http.StatusInternalServerError)
2060return
2061}
2062
2063w.WriteHeader(http.StatusOK)
2064}))
2065defer s.Close()
2066
2067tests := []struct {
2068name string
2069req *schedulerv2.DownloadPeerBackToSourceFinishedRequest
2070run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2071mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2072}{
2073{
2074name: "peer can not be loaded",
2075req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2076run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2077mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2078gomock.InOrder(
2079mr.PeerManager().Return(peerManager).Times(1),
2080mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2081)
2082
2083assert := assert.New(t)
2084assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2085assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2086assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2087assert.Equal(len(peer.Task.DirectPiece), 0)
2088assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2089},
2090},
2091{
2092name: "peer state is PeerStateSucceeded",
2093req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2094run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2095mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2096gomock.InOrder(
2097mr.PeerManager().Return(peerManager).Times(1),
2098mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2099)
2100
2101peer.FSM.SetState(resource.PeerStateSucceeded)
2102
2103assert := assert.New(t)
2104assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Succeeded"))
2105assert.NotEqual(peer.Cost.Load(), 0)
2106assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2107assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2108assert.Equal(len(peer.Task.DirectPiece), 0)
2109assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2110},
2111},
2112{
2113name: "peer has range",
2114req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2115run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2116mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2117gomock.InOrder(
2118mr.PeerManager().Return(peerManager).Times(1),
2119mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2120md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2121)
2122
2123peer.FSM.SetState(resource.PeerStateRunning)
2124peer.Range = &nethttp.Range{}
2125
2126assert := assert.New(t)
2127assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2128assert.NotEqual(peer.Cost.Load(), 0)
2129assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2130assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2131assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2132assert.Equal(len(peer.Task.DirectPiece), 0)
2133assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2134},
2135},
2136{
2137name: "task state is TaskStateSucceeded",
2138req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{},
2139run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2140mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2141gomock.InOrder(
2142mr.PeerManager().Return(peerManager).Times(1),
2143mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2144md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2145)
2146
2147peer.FSM.SetState(resource.PeerStateRunning)
2148peer.Task.FSM.SetState(resource.TaskStateSucceeded)
2149
2150assert := assert.New(t)
2151assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2152assert.NotEqual(peer.Cost.Load(), 0)
2153assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2154assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2155assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2156assert.Equal(len(peer.Task.DirectPiece), 0)
2157assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
2158},
2159},
2160{
2161name: "task state is TaskStatePending",
2162req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
2163ContentLength: 1024,
2164PieceCount: 10,
2165},
2166run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2167mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2168gomock.InOrder(
2169mr.PeerManager().Return(peerManager).Times(1),
2170mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2171)
2172
2173peer.FSM.SetState(resource.PeerStateRunning)
2174peer.Task.FSM.SetState(resource.TaskStatePending)
2175
2176assert := assert.New(t)
2177assert.ErrorIs(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "event DownloadSucceeded inappropriate in current state Pending"))
2178assert.NotEqual(peer.Cost.Load(), 0)
2179assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2180assert.Equal(peer.Task.ContentLength.Load(), int64(1024))
2181assert.Equal(peer.Task.TotalPieceCount.Load(), int32(10))
2182assert.Equal(len(peer.Task.DirectPiece), 0)
2183assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2184},
2185},
2186{
2187name: "task state is TaskStateRunning",
2188req: &schedulerv2.DownloadPeerBackToSourceFinishedRequest{
2189ContentLength: 1024,
2190PieceCount: 10,
2191},
2192run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2193mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2194gomock.InOrder(
2195mr.PeerManager().Return(peerManager).Times(1),
2196mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2197md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2198)
2199
2200peer.FSM.SetState(resource.PeerStateRunning)
2201peer.Task.FSM.SetState(resource.TaskStateRunning)
2202
2203assert := assert.New(t)
2204assert.NoError(svc.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2205assert.NotEqual(peer.Cost.Load(), 0)
2206assert.Equal(peer.FSM.Current(), resource.PeerStateSucceeded)
2207assert.Equal(peer.Task.ContentLength.Load(), int64(1024))
2208assert.Equal(peer.Task.TotalPieceCount.Load(), int32(10))
2209assert.Equal(len(peer.Task.DirectPiece), 0)
2210assert.Equal(peer.Task.FSM.Current(), resource.TaskStateSucceeded)
2211},
2212},
2213}
2214
2215for _, tc := range tests {
2216t.Run(tc.name, func(t *testing.T) {
2217ctl := gomock.NewController(t)
2218defer ctl.Finish()
2219scheduling := schedulingmocks.NewMockScheduling(ctl)
2220res := resource.NewMockResource(ctl)
2221dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2222storage := storagemocks.NewMockStorage(ctl)
2223networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2224peerManager := resource.NewMockPeerManager(ctl)
2225
2226url, err := url.Parse(s.URL)
2227if err != nil {
2228t.Fatal(err)
2229}
2230
2231ip, rawPort, err := net.SplitHostPort(url.Host)
2232if err != nil {
2233t.Fatal(err)
2234}
2235
2236port, err := strconv.ParseInt(rawPort, 10, 32)
2237if err != nil {
2238t.Fatal(err)
2239}
2240
2241mockHost := resource.NewHost(
2242mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2243mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2244mockHost.IP = ip
2245mockHost.DownloadPort = int32(port)
2246
2247mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2248peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2249svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2250
2251tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2252})
2253}
2254}
2255
2256func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
2257tests := []struct {
2258name string
2259run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2260}{
2261{
2262name: "peer can not be loaded",
2263run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2264gomock.InOrder(
2265mr.PeerManager().Return(peerManager).Times(1),
2266mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2267)
2268
2269assert := assert.New(t)
2270assert.ErrorIs(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2271},
2272},
2273{
2274name: "peer state is PeerEventDownloadFailed",
2275run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2276gomock.InOrder(
2277mr.PeerManager().Return(peerManager).Times(1),
2278mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2279)
2280
2281peer.FSM.SetState(resource.PeerEventDownloadFailed)
2282
2283assert := assert.New(t)
2284assert.ErrorIs(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state DownloadFailed"))
2285},
2286},
2287{
2288name: "peer state is PeerStateRunning",
2289run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2290gomock.InOrder(
2291mr.PeerManager().Return(peerManager).Times(1),
2292mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2293md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2294)
2295
2296peer.FSM.SetState(resource.PeerStateRunning)
2297
2298assert := assert.New(t)
2299assert.NoError(svc.handleDownloadPeerFailedRequest(context.Background(), peer.ID))
2300assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2301assert.NotEqual(peer.UpdatedAt.Load(), 0)
2302},
2303},
2304}
2305
2306for _, tc := range tests {
2307t.Run(tc.name, func(t *testing.T) {
2308ctl := gomock.NewController(t)
2309defer ctl.Finish()
2310scheduling := schedulingmocks.NewMockScheduling(ctl)
2311res := resource.NewMockResource(ctl)
2312dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2313storage := storagemocks.NewMockStorage(ctl)
2314networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2315peerManager := resource.NewMockPeerManager(ctl)
2316
2317mockHost := resource.NewHost(
2318mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2319mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2320mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2321peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2322svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2323
2324tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2325})
2326}
2327}
2328
2329func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
2330tests := []struct {
2331name string
2332run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
2333}{
2334{
2335name: "peer can not be loaded",
2336run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2337gomock.InOrder(
2338mr.PeerManager().Return(peerManager).Times(1),
2339mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2340)
2341
2342peer.Task.ContentLength.Store(1)
2343peer.Task.TotalPieceCount.Store(1)
2344peer.Task.DirectPiece = []byte{1}
2345
2346assert := assert.New(t)
2347assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2348assert.Equal(peer.FSM.Current(), resource.PeerStatePending)
2349assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2350assert.Equal(peer.Task.ContentLength.Load(), int64(1))
2351assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
2352assert.Equal(peer.Task.DirectPiece, []byte{1})
2353},
2354},
2355{
2356name: "peer state is PeerStateFailed",
2357run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2358gomock.InOrder(
2359mr.PeerManager().Return(peerManager).Times(1),
2360mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2361)
2362
2363peer.FSM.SetState(resource.PeerStateFailed)
2364peer.Task.ContentLength.Store(1)
2365peer.Task.TotalPieceCount.Store(1)
2366peer.Task.DirectPiece = []byte{1}
2367
2368assert := assert.New(t)
2369assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state Failed"))
2370assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2371assert.Equal(peer.Task.FSM.Current(), resource.TaskStatePending)
2372assert.Equal(peer.Task.ContentLength.Load(), int64(1))
2373assert.Equal(peer.Task.TotalPieceCount.Load(), int32(1))
2374assert.Equal(peer.Task.DirectPiece, []byte{1})
2375},
2376},
2377{
2378name: "task state is TaskStateFailed",
2379run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2380gomock.InOrder(
2381mr.PeerManager().Return(peerManager).Times(1),
2382mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2383)
2384
2385peer.FSM.SetState(resource.PeerStateRunning)
2386peer.Task.FSM.SetState(resource.TaskStateFailed)
2387peer.Task.ContentLength.Store(1)
2388peer.Task.TotalPieceCount.Store(1)
2389peer.Task.DirectPiece = []byte{1}
2390
2391assert := assert.New(t)
2392assert.ErrorIs(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID), status.Error(codes.Internal, "event DownloadFailed inappropriate in current state Failed"))
2393assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2394assert.Equal(peer.Task.FSM.Current(), resource.TaskStateFailed)
2395assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2396assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2397assert.Equal(peer.Task.DirectPiece, []byte{})
2398},
2399},
2400{
2401name: "task state is TaskStateRunning",
2402run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
2403gomock.InOrder(
2404mr.PeerManager().Return(peerManager).Times(1),
2405mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2406md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
2407)
2408
2409peer.FSM.SetState(resource.PeerStateRunning)
2410peer.Task.FSM.SetState(resource.TaskStateRunning)
2411peer.Task.ContentLength.Store(1)
2412peer.Task.TotalPieceCount.Store(1)
2413peer.Task.DirectPiece = []byte{1}
2414
2415assert := assert.New(t)
2416assert.NoError(svc.handleDownloadPeerBackToSourceFailedRequest(context.Background(), peer.ID))
2417assert.Equal(peer.FSM.Current(), resource.PeerStateFailed)
2418assert.Equal(peer.Task.FSM.Current(), resource.TaskStateFailed)
2419assert.Equal(peer.Task.ContentLength.Load(), int64(-1))
2420assert.Equal(peer.Task.TotalPieceCount.Load(), int32(0))
2421assert.Equal(peer.Task.DirectPiece, []byte{})
2422},
2423},
2424}
2425
2426for _, tc := range tests {
2427t.Run(tc.name, func(t *testing.T) {
2428ctl := gomock.NewController(t)
2429defer ctl.Finish()
2430scheduling := schedulingmocks.NewMockScheduling(ctl)
2431res := resource.NewMockResource(ctl)
2432dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2433storage := storagemocks.NewMockStorage(ctl)
2434networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2435peerManager := resource.NewMockPeerManager(ctl)
2436
2437mockHost := resource.NewHost(
2438mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2439mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2440mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2441peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2442svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2443
2444tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
2445})
2446}
2447}
2448
2449func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
2450tests := []struct {
2451name string
2452req *schedulerv2.DownloadPieceFinishedRequest
2453run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2454}{
2455{
2456name: "invalid digest",
2457req: &schedulerv2.DownloadPieceFinishedRequest{
2458Piece: &commonv2.Piece{
2459Number: uint32(mockPiece.Number),
2460ParentId: &mockPiece.ParentID,
2461Offset: mockPiece.Offset,
2462Length: mockPiece.Length,
2463Digest: "foo",
2464TrafficType: &mockPiece.TrafficType,
2465Cost: durationpb.New(mockPiece.Cost),
2466CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2467},
2468},
2469run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2470assert := assert.New(t)
2471assert.ErrorIs(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.InvalidArgument, "invalid digest"))
2472},
2473},
2474{
2475name: "peer can not be loaded",
2476req: &schedulerv2.DownloadPieceFinishedRequest{
2477Piece: &commonv2.Piece{
2478Number: uint32(mockPiece.Number),
2479ParentId: &mockPiece.ParentID,
2480Offset: mockPiece.Offset,
2481Length: mockPiece.Length,
2482Digest: mockPiece.Digest.String(),
2483TrafficType: &mockPiece.TrafficType,
2484Cost: durationpb.New(mockPiece.Cost),
2485CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2486},
2487},
2488run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2489gomock.InOrder(
2490mr.PeerManager().Return(peerManager).Times(1),
2491mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2492)
2493
2494assert := assert.New(t)
2495assert.ErrorIs(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2496},
2497},
2498{
2499name: "parent can not be loaded",
2500req: &schedulerv2.DownloadPieceFinishedRequest{
2501Piece: &commonv2.Piece{
2502Number: uint32(mockPiece.Number),
2503ParentId: &mockPiece.ParentID,
2504Offset: mockPiece.Offset,
2505Length: mockPiece.Length,
2506Digest: mockPiece.Digest.String(),
2507TrafficType: &mockPiece.TrafficType,
2508Cost: durationpb.New(mockPiece.Cost),
2509CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2510},
2511},
2512run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2513gomock.InOrder(
2514mr.PeerManager().Return(peerManager).Times(1),
2515mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2516mr.PeerManager().Return(peerManager).Times(1),
2517mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
2518)
2519
2520assert := assert.New(t)
2521assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req))
2522
2523piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2524assert.True(loaded)
2525assert.Equal(piece.Number, mockPiece.Number)
2526assert.Equal(piece.ParentID, mockPiece.ParentID)
2527assert.Equal(piece.Offset, mockPiece.Offset)
2528assert.Equal(piece.Length, mockPiece.Length)
2529assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2530assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2531assert.Equal(piece.Cost, mockPiece.Cost)
2532assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2533assert.Equal(peer.FinishedPieces.Count(), uint(1))
2534assert.Equal(len(peer.PieceCosts()), 1)
2535assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2536assert.NotEqual(peer.UpdatedAt.Load(), 0)
2537assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2538},
2539},
2540{
2541name: "parent can be loaded",
2542req: &schedulerv2.DownloadPieceFinishedRequest{
2543Piece: &commonv2.Piece{
2544Number: uint32(mockPiece.Number),
2545ParentId: &mockPiece.ParentID,
2546Offset: mockPiece.Offset,
2547Length: mockPiece.Length,
2548Digest: mockPiece.Digest.String(),
2549TrafficType: &mockPiece.TrafficType,
2550Cost: durationpb.New(mockPiece.Cost),
2551CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2552},
2553},
2554run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2555gomock.InOrder(
2556mr.PeerManager().Return(peerManager).Times(1),
2557mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2558mr.PeerManager().Return(peerManager).Times(1),
2559mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
2560)
2561
2562assert := assert.New(t)
2563assert.NoError(svc.handleDownloadPieceFinishedRequest(context.Background(), peer.ID, req))
2564
2565piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2566assert.True(loaded)
2567assert.Equal(piece.Number, mockPiece.Number)
2568assert.Equal(piece.ParentID, mockPiece.ParentID)
2569assert.Equal(piece.Offset, mockPiece.Offset)
2570assert.Equal(piece.Length, mockPiece.Length)
2571assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2572assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2573assert.Equal(piece.Cost, mockPiece.Cost)
2574assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2575assert.Equal(peer.FinishedPieces.Count(), uint(1))
2576assert.Equal(len(peer.PieceCosts()), 1)
2577assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2578assert.NotEqual(peer.UpdatedAt.Load(), 0)
2579assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2580assert.NotEqual(peer.Host.UpdatedAt.Load(), 0)
2581},
2582},
2583}
2584
2585for _, tc := range tests {
2586t.Run(tc.name, func(t *testing.T) {
2587ctl := gomock.NewController(t)
2588defer ctl.Finish()
2589scheduling := schedulingmocks.NewMockScheduling(ctl)
2590res := resource.NewMockResource(ctl)
2591dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2592storage := storagemocks.NewMockStorage(ctl)
2593networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2594peerManager := resource.NewMockPeerManager(ctl)
2595
2596mockHost := resource.NewHost(
2597mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2598mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2599mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2600peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2601svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2602
2603tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2604})
2605}
2606}
2607
2608func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) {
2609tests := []struct {
2610name string
2611req *schedulerv2.DownloadPieceBackToSourceFinishedRequest
2612run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2613}{
2614{
2615name: "invalid digest",
2616req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2617Piece: &commonv2.Piece{
2618Number: uint32(mockPiece.Number),
2619ParentId: &mockPiece.ParentID,
2620Offset: mockPiece.Offset,
2621Length: mockPiece.Length,
2622Digest: "foo",
2623TrafficType: &mockPiece.TrafficType,
2624Cost: durationpb.New(mockPiece.Cost),
2625CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2626},
2627},
2628run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2629assert := assert.New(t)
2630assert.ErrorIs(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Error(codes.InvalidArgument, "invalid digest"))
2631},
2632},
2633{
2634name: "peer can not be loaded",
2635req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2636Piece: &commonv2.Piece{
2637Number: uint32(mockPiece.Number),
2638ParentId: &mockPiece.ParentID,
2639Offset: mockPiece.Offset,
2640Length: mockPiece.Length,
2641Digest: mockPiece.Digest.String(),
2642TrafficType: &mockPiece.TrafficType,
2643Cost: durationpb.New(mockPiece.Cost),
2644CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2645},
2646},
2647run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2648gomock.InOrder(
2649mr.PeerManager().Return(peerManager).Times(1),
2650mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2651)
2652
2653assert := assert.New(t)
2654assert.ErrorIs(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2655},
2656},
2657{
2658name: "peer can be loaded",
2659req: &schedulerv2.DownloadPieceBackToSourceFinishedRequest{
2660Piece: &commonv2.Piece{
2661Number: uint32(mockPiece.Number),
2662ParentId: &mockPiece.ParentID,
2663Offset: mockPiece.Offset,
2664Length: mockPiece.Length,
2665Digest: mockPiece.Digest.String(),
2666TrafficType: &mockPiece.TrafficType,
2667Cost: durationpb.New(mockPiece.Cost),
2668CreatedAt: timestamppb.New(mockPiece.CreatedAt),
2669},
2670},
2671run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2672gomock.InOrder(
2673mr.PeerManager().Return(peerManager).Times(1),
2674mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2675)
2676
2677assert := assert.New(t)
2678assert.NoError(svc.handleDownloadPieceBackToSourceFinishedRequest(context.Background(), peer.ID, req))
2679
2680piece, loaded := peer.LoadPiece(int32(req.Piece.Number))
2681assert.True(loaded)
2682assert.Equal(piece.Number, mockPiece.Number)
2683assert.Equal(piece.ParentID, mockPiece.ParentID)
2684assert.Equal(piece.Offset, mockPiece.Offset)
2685assert.Equal(piece.Length, mockPiece.Length)
2686assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2687assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2688assert.Equal(piece.Cost, mockPiece.Cost)
2689assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2690assert.Equal(peer.FinishedPieces.Count(), uint(1))
2691assert.Equal(len(peer.PieceCosts()), 1)
2692assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
2693assert.NotEqual(peer.UpdatedAt.Load(), 0)
2694
2695piece, loaded = peer.Task.LoadPiece(int32(req.Piece.Number))
2696assert.True(loaded)
2697assert.Equal(piece.Number, mockPiece.Number)
2698assert.Equal(piece.ParentID, mockPiece.ParentID)
2699assert.Equal(piece.Offset, mockPiece.Offset)
2700assert.Equal(piece.Length, mockPiece.Length)
2701assert.Equal(piece.Digest.String(), mockPiece.Digest.String())
2702assert.Equal(piece.TrafficType, mockPiece.TrafficType)
2703assert.Equal(piece.Cost, mockPiece.Cost)
2704assert.True(piece.CreatedAt.Equal(mockPiece.CreatedAt))
2705assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2706assert.NotEqual(peer.Host.UpdatedAt.Load(), 0)
2707},
2708},
2709}
2710
2711for _, tc := range tests {
2712t.Run(tc.name, func(t *testing.T) {
2713ctl := gomock.NewController(t)
2714defer ctl.Finish()
2715scheduling := schedulingmocks.NewMockScheduling(ctl)
2716res := resource.NewMockResource(ctl)
2717dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2718storage := storagemocks.NewMockStorage(ctl)
2719networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2720peerManager := resource.NewMockPeerManager(ctl)
2721
2722mockHost := resource.NewHost(
2723mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2724mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2725mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2726peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2727svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2728
2729tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2730})
2731}
2732}
2733
2734func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
2735tests := []struct {
2736name string
2737req *schedulerv2.DownloadPieceFailedRequest
2738run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2739mp *resource.MockPeerManagerMockRecorder)
2740}{
2741{
2742name: "peer can not be loaded",
2743req: &schedulerv2.DownloadPieceFailedRequest{
2744ParentId: mockSeedPeerID,
2745Temporary: true,
2746},
2747run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2748mp *resource.MockPeerManagerMockRecorder) {
2749gomock.InOrder(
2750mr.PeerManager().Return(peerManager).Times(1),
2751mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2752)
2753
2754assert := assert.New(t)
2755assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2756},
2757},
2758{
2759name: "temporary is false",
2760req: &schedulerv2.DownloadPieceFailedRequest{
2761ParentId: mockSeedPeerID,
2762Temporary: false,
2763},
2764run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2765mp *resource.MockPeerManagerMockRecorder) {
2766gomock.InOrder(
2767mr.PeerManager().Return(peerManager).Times(1),
2768mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2769)
2770
2771assert := assert.New(t)
2772assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed"))
2773},
2774},
2775{
2776name: "parent can not be loaded",
2777req: &schedulerv2.DownloadPieceFailedRequest{
2778ParentId: mockSeedPeerID,
2779Temporary: true,
2780},
2781run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2782mp *resource.MockPeerManagerMockRecorder) {
2783gomock.InOrder(
2784mr.PeerManager().Return(peerManager).Times(1),
2785mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2786mr.PeerManager().Return(peerManager).Times(1),
2787mp.Load(gomock.Eq(req.GetParentId())).Return(nil, false).Times(1),
2788)
2789
2790assert := assert.New(t)
2791assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
2792assert.NotEqual(peer.UpdatedAt.Load(), 0)
2793assert.True(peer.BlockParents.Contains(req.GetParentId()))
2794assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2795},
2796},
2797{
2798name: "parent can be loaded",
2799req: &schedulerv2.DownloadPieceFailedRequest{
2800ParentId: mockSeedPeerID,
2801Temporary: true,
2802},
2803run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2804mp *resource.MockPeerManagerMockRecorder) {
2805gomock.InOrder(
2806mr.PeerManager().Return(peerManager).Times(1),
2807mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2808mr.PeerManager().Return(peerManager).Times(1),
2809mp.Load(gomock.Eq(req.GetParentId())).Return(peer, true).Times(1),
2810)
2811
2812assert := assert.New(t)
2813assert.NoError(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req))
2814assert.NotEqual(peer.UpdatedAt.Load(), 0)
2815assert.True(peer.BlockParents.Contains(req.GetParentId()))
2816assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2817assert.Equal(peer.Host.UploadFailedCount.Load(), int64(1))
2818},
2819},
2820}
2821
2822for _, tc := range tests {
2823t.Run(tc.name, func(t *testing.T) {
2824ctl := gomock.NewController(t)
2825defer ctl.Finish()
2826scheduling := schedulingmocks.NewMockScheduling(ctl)
2827res := resource.NewMockResource(ctl)
2828dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2829storage := storagemocks.NewMockStorage(ctl)
2830networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2831peerManager := resource.NewMockPeerManager(ctl)
2832
2833mockHost := resource.NewHost(
2834mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2835mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2836mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2837peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2838svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2839
2840tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2841})
2842}
2843}
2844
2845func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
2846mockPieceNumber := uint32(mockPiece.Number)
2847
2848tests := []struct {
2849name string
2850req *schedulerv2.DownloadPieceBackToSourceFailedRequest
2851run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2852mp *resource.MockPeerManagerMockRecorder)
2853}{
2854{
2855name: "peer can not be loaded",
2856req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{},
2857run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2858mp *resource.MockPeerManagerMockRecorder) {
2859gomock.InOrder(
2860mr.PeerManager().Return(peerManager).Times(1),
2861mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
2862)
2863
2864assert := assert.New(t)
2865assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
2866},
2867},
2868{
2869name: "peer can be loaded",
2870req: &schedulerv2.DownloadPieceBackToSourceFailedRequest{
2871PieceNumber: &mockPieceNumber,
2872},
2873run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceBackToSourceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
2874mp *resource.MockPeerManagerMockRecorder) {
2875gomock.InOrder(
2876mr.PeerManager().Return(peerManager).Times(1),
2877mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
2878)
2879
2880assert := assert.New(t)
2881assert.ErrorIs(svc.handleDownloadPieceBackToSourceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.Internal, "download piece from source failed"))
2882assert.NotEqual(peer.UpdatedAt.Load(), 0)
2883assert.NotEqual(peer.Task.UpdatedAt.Load(), 0)
2884},
2885},
2886}
2887
2888for _, tc := range tests {
2889t.Run(tc.name, func(t *testing.T) {
2890ctl := gomock.NewController(t)
2891defer ctl.Finish()
2892scheduling := schedulingmocks.NewMockScheduling(ctl)
2893res := resource.NewMockResource(ctl)
2894dynconfig := configmocks.NewMockDynconfigInterface(ctl)
2895storage := storagemocks.NewMockStorage(ctl)
2896networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
2897peerManager := resource.NewMockPeerManager(ctl)
2898
2899mockHost := resource.NewHost(
2900mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
2901mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
2902mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
2903peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
2904svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
2905
2906tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
2907})
2908}
2909}
2910
2911func TestServiceV2_handleResource(t *testing.T) {
2912dgst := mockTaskDigest.String()
2913mismatchDgst := "foo"
2914
2915tests := []struct {
2916name string
2917download *commonv2.Download
2918run func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2919hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2920mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder)
2921}{
2922{
2923name: "host can not be loaded",
2924download: &commonv2.Download{},
2925run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2926hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2927mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2928gomock.InOrder(
2929mr.HostManager().Return(hostManager).Times(1),
2930mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false).Times(1),
2931)
2932
2933assert := assert.New(t)
2934_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2935assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID))
2936},
2937},
2938{
2939name: "task can be loaded",
2940download: &commonv2.Download{
2941Url: "foo",
2942FilteredQueryParams: []string{"bar"},
2943RequestHeader: map[string]string{"baz": "bas"},
2944},
2945run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2946hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2947mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2948gomock.InOrder(
2949mr.HostManager().Return(hostManager).Times(1),
2950mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
2951mr.TaskManager().Return(taskManager).Times(1),
2952mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
2953mr.PeerManager().Return(peerManager).Times(1),
2954mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
2955)
2956
2957assert := assert.New(t)
2958host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2959assert.NoError(err)
2960assert.EqualValues(host, mockHost)
2961assert.Equal(task.ID, mockTask.ID)
2962assert.Equal(task.URL, download.Url)
2963assert.EqualValues(task.FilteredQueryParams, download.FilteredQueryParams)
2964assert.EqualValues(task.Header, download.RequestHeader)
2965},
2966},
2967{
2968name: "task can not be loaded",
2969download: &commonv2.Download{
2970Url: "foo",
2971FilteredQueryParams: []string{"bar"},
2972RequestHeader: map[string]string{"baz": "bas"},
2973Digest: &dgst,
2974},
2975run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
2976hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
2977mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
2978gomock.InOrder(
2979mr.HostManager().Return(hostManager).Times(1),
2980mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
2981mr.TaskManager().Return(taskManager).Times(1),
2982mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1),
2983mr.TaskManager().Return(taskManager).Times(1),
2984mt.Store(gomock.Any()).Return().Times(1),
2985mr.PeerManager().Return(peerManager).Times(1),
2986mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
2987)
2988
2989assert := assert.New(t)
2990host, task, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
2991assert.NoError(err)
2992assert.EqualValues(host, mockHost)
2993assert.Equal(task.ID, mockTask.ID)
2994assert.Equal(task.Digest.String(), download.GetDigest())
2995assert.Equal(task.URL, download.GetUrl())
2996assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
2997assert.EqualValues(task.Header, download.RequestHeader)
2998},
2999},
3000{
3001name: "invalid digest",
3002download: &commonv2.Download{
3003Digest: &mismatchDgst,
3004},
3005run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3006hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3007mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3008gomock.InOrder(
3009mr.HostManager().Return(hostManager).Times(1),
3010mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3011mr.TaskManager().Return(taskManager).Times(1),
3012mt.Load(gomock.Eq(mockTask.ID)).Return(nil, false).Times(1),
3013)
3014
3015assert := assert.New(t)
3016_, _, _, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3017assert.ErrorIs(err, status.Error(codes.InvalidArgument, "invalid digest"))
3018},
3019},
3020{
3021name: "peer can be loaded",
3022download: &commonv2.Download{
3023Url: "foo",
3024FilteredQueryParams: []string{"bar"},
3025RequestHeader: map[string]string{"baz": "bas"},
3026Digest: &dgst,
3027},
3028run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3029hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3030mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3031gomock.InOrder(
3032mr.HostManager().Return(hostManager).Times(1),
3033mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3034mr.TaskManager().Return(taskManager).Times(1),
3035mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3036mr.PeerManager().Return(peerManager).Times(1),
3037mp.Load(gomock.Eq(mockPeer.ID)).Return(mockPeer, true).Times(1),
3038)
3039
3040assert := assert.New(t)
3041host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3042assert.NoError(err)
3043assert.EqualValues(host, mockHost)
3044assert.Equal(task.ID, mockTask.ID)
3045assert.Equal(task.Digest.String(), download.GetDigest())
3046assert.Equal(task.URL, download.GetUrl())
3047assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
3048assert.EqualValues(task.Header, download.RequestHeader)
3049assert.EqualValues(peer, mockPeer)
3050},
3051},
3052{
3053name: "peer can not be loaded",
3054download: &commonv2.Download{
3055Url: "foo",
3056FilteredQueryParams: []string{"bar"},
3057RequestHeader: map[string]string{"baz": "bas"},
3058Digest: &dgst,
3059Priority: commonv2.Priority_LEVEL1,
3060Range: &commonv2.Range{
3061Start: uint64(mockPeerRange.Start),
3062Length: uint64(mockPeerRange.Length),
3063},
3064},
3065run: func(t *testing.T, svc *V2, download *commonv2.Download, stream schedulerv2.Scheduler_AnnouncePeerServer, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer,
3066hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder,
3067mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
3068gomock.InOrder(
3069mr.HostManager().Return(hostManager).Times(1),
3070mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
3071mr.TaskManager().Return(taskManager).Times(1),
3072mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1),
3073mr.PeerManager().Return(peerManager).Times(1),
3074mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1),
3075mr.PeerManager().Return(peerManager).Times(1),
3076mp.Store(gomock.Any()).Return().Times(1),
3077)
3078
3079assert := assert.New(t)
3080host, task, peer, err := svc.handleResource(context.Background(), stream, mockHost.ID, mockTask.ID, mockPeer.ID, download)
3081assert.NoError(err)
3082assert.EqualValues(host, mockHost)
3083assert.Equal(task.ID, mockTask.ID)
3084assert.Equal(task.Digest.String(), download.GetDigest())
3085assert.Equal(task.URL, download.GetUrl())
3086assert.EqualValues(task.FilteredQueryParams, download.GetFilteredQueryParams())
3087assert.EqualValues(task.Header, download.RequestHeader)
3088assert.Equal(peer.ID, mockPeer.ID)
3089assert.Equal(peer.Priority, download.Priority)
3090assert.Equal(peer.Range.Start, int64(download.Range.Start))
3091assert.Equal(peer.Range.Length, int64(download.Range.Length))
3092assert.NotNil(peer.AnnouncePeerStream)
3093assert.EqualValues(peer.Host, mockHost)
3094assert.EqualValues(peer.Task, mockTask)
3095},
3096},
3097}
3098
3099for _, tc := range tests {
3100t.Run(tc.name, func(t *testing.T) {
3101ctl := gomock.NewController(t)
3102defer ctl.Finish()
3103scheduling := schedulingmocks.NewMockScheduling(ctl)
3104res := resource.NewMockResource(ctl)
3105dynconfig := configmocks.NewMockDynconfigInterface(ctl)
3106storage := storagemocks.NewMockStorage(ctl)
3107networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
3108hostManager := resource.NewMockHostManager(ctl)
3109taskManager := resource.NewMockTaskManager(ctl)
3110peerManager := resource.NewMockPeerManager(ctl)
3111stream := schedulerv2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
3112
3113mockHost := resource.NewHost(
3114mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
3115mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
3116mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
3117mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
3118svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
3119
3120tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
3121})
3122}
3123}
3124
3125func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
3126tests := []struct {
3127name string
3128config config.Config
3129run func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder)
3130}{
3131{
3132name: "priority is Priority_LEVEL6 and enable seed peer",
3133config: config.Config{
3134SeedPeer: config.SeedPeerConfig{
3135Enable: true,
3136},
3137},
3138run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3139var wg sync.WaitGroup
3140wg.Add(1)
3141defer wg.Wait()
3142
3143gomock.InOrder(
3144mr.SeedPeer().Return(seedPeerClient).Times(1),
3145ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3146)
3147
3148peer.Priority = commonv2.Priority_LEVEL6
3149
3150assert := assert.New(t)
3151assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3152assert.False(peer.NeedBackToSource.Load())
3153},
3154},
3155{
3156name: "priority is Priority_LEVEL6, enable seed peer and download task failed",
3157config: config.Config{
3158SeedPeer: config.SeedPeerConfig{
3159Enable: true,
3160},
3161},
3162run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3163var wg sync.WaitGroup
3164wg.Add(1)
3165defer wg.Wait()
3166
3167gomock.InOrder(
3168mr.SeedPeer().Return(seedPeerClient).Times(1),
3169ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3170)
3171
3172peer.Priority = commonv2.Priority_LEVEL6
3173
3174assert := assert.New(t)
3175assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3176assert.False(peer.NeedBackToSource.Load())
3177},
3178},
3179{
3180name: "priority is Priority_LEVEL6 and disable seed peer",
3181config: config.Config{
3182SeedPeer: config.SeedPeerConfig{
3183Enable: false,
3184},
3185},
3186run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3187peer.Priority = commonv2.Priority_LEVEL6
3188
3189assert := assert.New(t)
3190assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3191assert.True(peer.NeedBackToSource.Load())
3192},
3193},
3194{
3195name: "priority is Priority_LEVEL5 and enable seed peer",
3196config: config.Config{
3197SeedPeer: config.SeedPeerConfig{
3198Enable: true,
3199},
3200},
3201run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3202var wg sync.WaitGroup
3203wg.Add(1)
3204defer wg.Wait()
3205
3206gomock.InOrder(
3207mr.SeedPeer().Return(seedPeerClient).Times(1),
3208ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3209)
3210
3211peer.Priority = commonv2.Priority_LEVEL5
3212
3213assert := assert.New(t)
3214assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3215assert.False(peer.NeedBackToSource.Load())
3216},
3217},
3218{
3219name: "priority is Priority_LEVEL5, enable seed peer and download task failed",
3220config: config.Config{
3221SeedPeer: config.SeedPeerConfig{
3222Enable: true,
3223},
3224},
3225run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3226var wg sync.WaitGroup
3227wg.Add(1)
3228defer wg.Wait()
3229
3230gomock.InOrder(
3231mr.SeedPeer().Return(seedPeerClient).Times(1),
3232ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3233)
3234
3235peer.Priority = commonv2.Priority_LEVEL5
3236
3237assert := assert.New(t)
3238assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3239assert.False(peer.NeedBackToSource.Load())
3240},
3241},
3242{
3243name: "priority is Priority_LEVEL5 and disable seed peer",
3244config: config.Config{
3245SeedPeer: config.SeedPeerConfig{
3246Enable: false,
3247},
3248},
3249run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3250peer.Priority = commonv2.Priority_LEVEL5
3251
3252assert := assert.New(t)
3253assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3254assert.True(peer.NeedBackToSource.Load())
3255},
3256},
3257{
3258name: "priority is Priority_LEVEL4 and enable seed peer",
3259config: config.Config{
3260SeedPeer: config.SeedPeerConfig{
3261Enable: true,
3262},
3263},
3264run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3265var wg sync.WaitGroup
3266wg.Add(1)
3267defer wg.Wait()
3268
3269gomock.InOrder(
3270mr.SeedPeer().Return(seedPeerClient).Times(1),
3271ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil).Times(1),
3272)
3273
3274peer.Priority = commonv2.Priority_LEVEL4
3275
3276assert := assert.New(t)
3277assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3278assert.False(peer.NeedBackToSource.Load())
3279},
3280},
3281{
3282name: "priority is Priority_LEVEL4, enable seed peer and download task failed",
3283config: config.Config{
3284SeedPeer: config.SeedPeerConfig{
3285Enable: true,
3286},
3287},
3288run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3289var wg sync.WaitGroup
3290wg.Add(1)
3291defer wg.Wait()
3292
3293gomock.InOrder(
3294mr.SeedPeer().Return(seedPeerClient).Times(1),
3295ms.TriggerDownloadTask(gomock.All(), gomock.Any(), gomock.Any()).Do(func(context.Context, string, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(errors.New("foo")).Times(1),
3296)
3297
3298peer.Priority = commonv2.Priority_LEVEL4
3299
3300assert := assert.New(t)
3301assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3302assert.False(peer.NeedBackToSource.Load())
3303},
3304},
3305{
3306name: "priority is Priority_LEVEL4 and disable seed peer",
3307config: config.Config{
3308SeedPeer: config.SeedPeerConfig{
3309Enable: false,
3310},
3311},
3312run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3313peer.Priority = commonv2.Priority_LEVEL4
3314
3315assert := assert.New(t)
3316assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3317assert.True(peer.NeedBackToSource.Load())
3318},
3319},
3320{
3321name: "priority is Priority_LEVEL3",
3322config: config.Config{
3323SeedPeer: config.SeedPeerConfig{
3324Enable: true,
3325},
3326},
3327run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3328peer.Priority = commonv2.Priority_LEVEL3
3329
3330assert := assert.New(t)
3331assert.NoError(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer))
3332assert.True(peer.NeedBackToSource.Load())
3333},
3334},
3335{
3336name: "priority is Priority_LEVEL2",
3337config: config.Config{
3338SeedPeer: config.SeedPeerConfig{
3339Enable: true,
3340},
3341},
3342run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3343peer.Priority = commonv2.Priority_LEVEL2
3344
3345assert := assert.New(t)
3346assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String()))
3347},
3348},
3349{
3350name: "priority is Priority_LEVEL1",
3351config: config.Config{
3352SeedPeer: config.SeedPeerConfig{
3353Enable: true,
3354},
3355},
3356run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3357peer.Priority = commonv2.Priority_LEVEL1
3358
3359assert := assert.New(t)
3360assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String()))
3361},
3362},
3363{
3364name: "priority is Priority_LEVEL0",
3365config: config.Config{
3366SeedPeer: config.SeedPeerConfig{
3367Enable: true,
3368},
3369},
3370run: func(t *testing.T, svc *V2, peer *resource.Peer, seedPeerClient resource.SeedPeer, mr *resource.MockResourceMockRecorder, ms *resource.MockSeedPeerMockRecorder) {
3371peer.Priority = commonv2.Priority(100)
3372
3373assert := assert.New(t)
3374assert.ErrorIs(svc.downloadTaskBySeedPeer(context.Background(), mockTaskID, &commonv2.Download{}, peer), status.Errorf(codes.InvalidArgument, "invalid priority %#v", peer.Priority))
3375},
3376},
3377}
3378
3379for _, tc := range tests {
3380t.Run(tc.name, func(t *testing.T) {
3381ctl := gomock.NewController(t)
3382defer ctl.Finish()
3383scheduling := schedulingmocks.NewMockScheduling(ctl)
3384res := resource.NewMockResource(ctl)
3385dynconfig := configmocks.NewMockDynconfigInterface(ctl)
3386storage := storagemocks.NewMockStorage(ctl)
3387networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
3388seedPeerClient := resource.NewMockSeedPeer(ctl)
3389
3390mockHost := resource.NewHost(
3391mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
3392mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
3393mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
3394peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
3395svc := NewV2(&tc.config, res, scheduling, dynconfig, storage, networkTopology)
3396
3397tc.run(t, svc, peer, seedPeerClient, res.EXPECT(), seedPeerClient.EXPECT())
3398})
3399}
3400}
3401