Dragonfly2
294 строки · 5.7 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"sync"
21"testing"
22"time"
23
24testifyassert "github.com/stretchr/testify/assert"
25"go.uber.org/atomic"
26"go.uber.org/mock/gomock"
27
28schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
29"d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
30
31logger "d7y.io/dragonfly/v2/internal/dflog"
32)
33
34func Test_watchdog(t *testing.T) {
35ctrl := gomock.NewController(t)
36assert := testifyassert.New(t)
37
38var testCases = []struct {
39name string
40timeout time.Duration
41ok bool
42}{
43{
44name: "watchdog ok",
45timeout: time.Millisecond,
46ok: true,
47},
48{
49name: "watchdog failed",
50timeout: time.Millisecond,
51ok: false,
52},
53}
54
55for _, tt := range testCases {
56t.Run(tt.name, func(t *testing.T) {
57peer := &schedulerv1.PeerPacket_DestPeer{}
58pps := mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
59watchdog := &synchronizerWatchdog{
60done: make(chan struct{}),
61mainPeer: atomic.Value{},
62syncSuccess: atomic.NewBool(false),
63peerTaskConductor: &peerTaskConductor{
64SugaredLoggerOnWith: logger.With(
65"peer", "test",
66"task", "test",
67"component", "PeerTask"),
68readyPieces: NewBitmap(),
69peerPacketStream: pps,
70},
71}
72if tt.ok {
73watchdog.peerTaskConductor.readyPieces.Set(0)
74} else {
75pps.EXPECT().Send(gomock.Any()).DoAndReturn(func(pr *schedulerv1.PieceResult) error {
76assert.Equal(peer.PeerId, pr.DstPid)
77return nil
78})
79}
80watchdog.mainPeer.Store(peer)
81
82wg := sync.WaitGroup{}
83wg.Add(1)
84go func() {
85watchdog.watch(tt.timeout)
86wg.Done()
87}()
88
89wg.Wait()
90})
91}
92}
93
94func Test_diffPeers(t *testing.T) {
95assert := testifyassert.New(t)
96
97var testCases = []struct {
98name string
99workers map[string]*pieceTaskSynchronizer
100peers []*schedulerv1.PeerPacket_DestPeer
101peersToKeep []*schedulerv1.PeerPacket_DestPeer
102peersToAdd []*schedulerv1.PeerPacket_DestPeer
103peersToClose []string
104}{
105{
106name: "add new peers with empty workers",
107workers: map[string]*pieceTaskSynchronizer{},
108peers: []*schedulerv1.PeerPacket_DestPeer{
109{
110PeerId: "peer-0",
111},
112{
113PeerId: "peer-1",
114},
115{
116PeerId: "peer-2",
117},
118},
119peersToKeep: []*schedulerv1.PeerPacket_DestPeer{},
120peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
121{
122PeerId: "peer-0",
123},
124{
125PeerId: "peer-1",
126},
127{
128PeerId: "peer-2",
129},
130},
131peersToClose: []string{},
132},
133{
134name: "add new peers with some workers",
135workers: map[string]*pieceTaskSynchronizer{
136"peer-1": {},
137},
138peers: []*schedulerv1.PeerPacket_DestPeer{
139{
140PeerId: "peer-0",
141},
142{
143PeerId: "peer-1",
144},
145{
146PeerId: "peer-2",
147},
148},
149peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
150{
151PeerId: "peer-1",
152},
153},
154peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
155{
156PeerId: "peer-0",
157},
158{
159PeerId: "peer-2",
160},
161},
162peersToClose: []string{},
163},
164{
165name: "keep peers",
166workers: map[string]*pieceTaskSynchronizer{
167"peer-0": {},
168"peer-1": {},
169"peer-2": {},
170},
171peers: []*schedulerv1.PeerPacket_DestPeer{
172{
173PeerId: "peer-0",
174},
175{
176PeerId: "peer-1",
177},
178{
179PeerId: "peer-2",
180},
181},
182peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
183{
184PeerId: "peer-0",
185},
186{
187PeerId: "peer-1",
188},
189{
190PeerId: "peer-2",
191},
192},
193peersToAdd: []*schedulerv1.PeerPacket_DestPeer{},
194peersToClose: []string{},
195},
196{
197name: "close peers",
198workers: map[string]*pieceTaskSynchronizer{
199"peer-0": {},
200"peer-1": {},
201"peer-2": {},
202},
203peers: []*schedulerv1.PeerPacket_DestPeer{
204{
205PeerId: "peer-3",
206},
207{
208PeerId: "peer-4",
209},
210{
211PeerId: "peer-5",
212},
213},
214peersToKeep: []*schedulerv1.PeerPacket_DestPeer{},
215peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
216
217{
218PeerId: "peer-3",
219},
220{
221PeerId: "peer-4",
222},
223{
224PeerId: "peer-5",
225},
226},
227peersToClose: []string{
228"peer-0",
229"peer-1",
230"peer-2",
231},
232},
233{
234name: "mix peers",
235workers: map[string]*pieceTaskSynchronizer{
236"peer-0": {},
237"peer-1": {},
238"peer-2": {},
239},
240peers: []*schedulerv1.PeerPacket_DestPeer{
241{
242PeerId: "peer-1",
243},
244{
245PeerId: "peer-2",
246},
247{
248PeerId: "peer-3",
249},
250{
251PeerId: "peer-4",
252},
253{
254PeerId: "peer-5",
255},
256},
257peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
258{
259PeerId: "peer-1",
260},
261{
262PeerId: "peer-2",
263},
264},
265peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
266
267{
268PeerId: "peer-3",
269},
270{
271PeerId: "peer-4",
272},
273{
274PeerId: "peer-5",
275},
276},
277peersToClose: []string{
278"peer-0",
279},
280},
281}
282
283for _, tt := range testCases {
284t.Run(tt.name, func(t *testing.T) {
285s := &pieceTaskSyncManager{
286workers: tt.workers,
287}
288peersToKeep, peersToAdd, peersToClose := s.diffPeers(tt.peers)
289assert.ElementsMatch(tt.peersToKeep, peersToKeep)
290assert.ElementsMatch(tt.peersToAdd, peersToAdd)
291assert.ElementsMatch(tt.peersToClose, peersToClose)
292})
293}
294}
295