Dragonfly2

Форк
0
/
peertask_piecetask_synchronizer_test.go 
294 строки · 5.7 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"sync"
21
	"testing"
22
	"time"
23

24
	testifyassert "github.com/stretchr/testify/assert"
25
	"go.uber.org/atomic"
26
	"go.uber.org/mock/gomock"
27

28
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
29
	"d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
30

31
	logger "d7y.io/dragonfly/v2/internal/dflog"
32
)
33

34
func Test_watchdog(t *testing.T) {
35
	ctrl := gomock.NewController(t)
36
	assert := testifyassert.New(t)
37

38
	var testCases = []struct {
39
		name    string
40
		timeout time.Duration
41
		ok      bool
42
	}{
43
		{
44
			name:    "watchdog ok",
45
			timeout: time.Millisecond,
46
			ok:      true,
47
		},
48
		{
49
			name:    "watchdog failed",
50
			timeout: time.Millisecond,
51
			ok:      false,
52
		},
53
	}
54

55
	for _, tt := range testCases {
56
		t.Run(tt.name, func(t *testing.T) {
57
			peer := &schedulerv1.PeerPacket_DestPeer{}
58
			pps := mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
59
			watchdog := &synchronizerWatchdog{
60
				done:        make(chan struct{}),
61
				mainPeer:    atomic.Value{},
62
				syncSuccess: atomic.NewBool(false),
63
				peerTaskConductor: &peerTaskConductor{
64
					SugaredLoggerOnWith: logger.With(
65
						"peer", "test",
66
						"task", "test",
67
						"component", "PeerTask"),
68
					readyPieces:      NewBitmap(),
69
					peerPacketStream: pps,
70
				},
71
			}
72
			if tt.ok {
73
				watchdog.peerTaskConductor.readyPieces.Set(0)
74
			} else {
75
				pps.EXPECT().Send(gomock.Any()).DoAndReturn(func(pr *schedulerv1.PieceResult) error {
76
					assert.Equal(peer.PeerId, pr.DstPid)
77
					return nil
78
				})
79
			}
80
			watchdog.mainPeer.Store(peer)
81

82
			wg := sync.WaitGroup{}
83
			wg.Add(1)
84
			go func() {
85
				watchdog.watch(tt.timeout)
86
				wg.Done()
87
			}()
88

89
			wg.Wait()
90
		})
91
	}
92
}
93

94
func Test_diffPeers(t *testing.T) {
95
	assert := testifyassert.New(t)
96

97
	var testCases = []struct {
98
		name         string
99
		workers      map[string]*pieceTaskSynchronizer
100
		peers        []*schedulerv1.PeerPacket_DestPeer
101
		peersToKeep  []*schedulerv1.PeerPacket_DestPeer
102
		peersToAdd   []*schedulerv1.PeerPacket_DestPeer
103
		peersToClose []string
104
	}{
105
		{
106
			name:    "add new peers with empty workers",
107
			workers: map[string]*pieceTaskSynchronizer{},
108
			peers: []*schedulerv1.PeerPacket_DestPeer{
109
				{
110
					PeerId: "peer-0",
111
				},
112
				{
113
					PeerId: "peer-1",
114
				},
115
				{
116
					PeerId: "peer-2",
117
				},
118
			},
119
			peersToKeep: []*schedulerv1.PeerPacket_DestPeer{},
120
			peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
121
				{
122
					PeerId: "peer-0",
123
				},
124
				{
125
					PeerId: "peer-1",
126
				},
127
				{
128
					PeerId: "peer-2",
129
				},
130
			},
131
			peersToClose: []string{},
132
		},
133
		{
134
			name: "add new peers with some workers",
135
			workers: map[string]*pieceTaskSynchronizer{
136
				"peer-1": {},
137
			},
138
			peers: []*schedulerv1.PeerPacket_DestPeer{
139
				{
140
					PeerId: "peer-0",
141
				},
142
				{
143
					PeerId: "peer-1",
144
				},
145
				{
146
					PeerId: "peer-2",
147
				},
148
			},
149
			peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
150
				{
151
					PeerId: "peer-1",
152
				},
153
			},
154
			peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
155
				{
156
					PeerId: "peer-0",
157
				},
158
				{
159
					PeerId: "peer-2",
160
				},
161
			},
162
			peersToClose: []string{},
163
		},
164
		{
165
			name: "keep peers",
166
			workers: map[string]*pieceTaskSynchronizer{
167
				"peer-0": {},
168
				"peer-1": {},
169
				"peer-2": {},
170
			},
171
			peers: []*schedulerv1.PeerPacket_DestPeer{
172
				{
173
					PeerId: "peer-0",
174
				},
175
				{
176
					PeerId: "peer-1",
177
				},
178
				{
179
					PeerId: "peer-2",
180
				},
181
			},
182
			peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
183
				{
184
					PeerId: "peer-0",
185
				},
186
				{
187
					PeerId: "peer-1",
188
				},
189
				{
190
					PeerId: "peer-2",
191
				},
192
			},
193
			peersToAdd:   []*schedulerv1.PeerPacket_DestPeer{},
194
			peersToClose: []string{},
195
		},
196
		{
197
			name: "close peers",
198
			workers: map[string]*pieceTaskSynchronizer{
199
				"peer-0": {},
200
				"peer-1": {},
201
				"peer-2": {},
202
			},
203
			peers: []*schedulerv1.PeerPacket_DestPeer{
204
				{
205
					PeerId: "peer-3",
206
				},
207
				{
208
					PeerId: "peer-4",
209
				},
210
				{
211
					PeerId: "peer-5",
212
				},
213
			},
214
			peersToKeep: []*schedulerv1.PeerPacket_DestPeer{},
215
			peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
216

217
				{
218
					PeerId: "peer-3",
219
				},
220
				{
221
					PeerId: "peer-4",
222
				},
223
				{
224
					PeerId: "peer-5",
225
				},
226
			},
227
			peersToClose: []string{
228
				"peer-0",
229
				"peer-1",
230
				"peer-2",
231
			},
232
		},
233
		{
234
			name: "mix peers",
235
			workers: map[string]*pieceTaskSynchronizer{
236
				"peer-0": {},
237
				"peer-1": {},
238
				"peer-2": {},
239
			},
240
			peers: []*schedulerv1.PeerPacket_DestPeer{
241
				{
242
					PeerId: "peer-1",
243
				},
244
				{
245
					PeerId: "peer-2",
246
				},
247
				{
248
					PeerId: "peer-3",
249
				},
250
				{
251
					PeerId: "peer-4",
252
				},
253
				{
254
					PeerId: "peer-5",
255
				},
256
			},
257
			peersToKeep: []*schedulerv1.PeerPacket_DestPeer{
258
				{
259
					PeerId: "peer-1",
260
				},
261
				{
262
					PeerId: "peer-2",
263
				},
264
			},
265
			peersToAdd: []*schedulerv1.PeerPacket_DestPeer{
266

267
				{
268
					PeerId: "peer-3",
269
				},
270
				{
271
					PeerId: "peer-4",
272
				},
273
				{
274
					PeerId: "peer-5",
275
				},
276
			},
277
			peersToClose: []string{
278
				"peer-0",
279
			},
280
		},
281
	}
282

283
	for _, tt := range testCases {
284
		t.Run(tt.name, func(t *testing.T) {
285
			s := &pieceTaskSyncManager{
286
				workers: tt.workers,
287
			}
288
			peersToKeep, peersToAdd, peersToClose := s.diffPeers(tt.peers)
289
			assert.ElementsMatch(tt.peersToKeep, peersToKeep)
290
			assert.ElementsMatch(tt.peersToAdd, peersToAdd)
291
			assert.ElementsMatch(tt.peersToClose, peersToClose)
292
		})
293
	}
294
}
295

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.