Dragonfly2

Форк
0
/
peertask_piecetask_synchronizer.go 
494 строки · 15.4 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"context"
21
	"errors"
22
	"fmt"
23
	"io"
24
	"sync"
25
	"time"
26

27
	"go.opentelemetry.io/otel/trace"
28
	"go.uber.org/atomic"
29
	"google.golang.org/grpc"
30
	"google.golang.org/grpc/codes"
31
	"google.golang.org/grpc/status"
32

33
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
34
	dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
35
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
36

37
	"d7y.io/dragonfly/v2/client/config"
38
	"d7y.io/dragonfly/v2/internal/dferrors"
39
	logger "d7y.io/dragonfly/v2/internal/dflog"
40
	"d7y.io/dragonfly/v2/pkg/dfnet"
41
	"d7y.io/dragonfly/v2/pkg/net/ip"
42
	dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
43
)
44

45
type pieceTaskSyncManager struct {
46
	sync.RWMutex
47
	ctx               context.Context
48
	ctxCancel         context.CancelFunc
49
	peerTaskConductor *peerTaskConductor
50
	pieceRequestQueue PieceDispatcher
51
	workers           map[string]*pieceTaskSynchronizer
52
	watchdog          *synchronizerWatchdog
53
}
54

55
type pieceTaskSynchronizer struct {
56
	*logger.SugaredLoggerOnWith
57
	ctx               context.Context
58
	ctxCancel         context.CancelFunc
59
	span              trace.Span
60
	syncPiecesStream  dfdaemonv1.Daemon_SyncPieceTasksClient
61
	grpcClient        dfdaemonclient.V1
62
	dstPeer           *schedulerv1.PeerPacket_DestPeer
63
	error             atomic.Value
64
	grpcInitialized   *atomic.Bool
65
	grpcInitError     atomic.Value
66
	peerTaskConductor *peerTaskConductor
67
	pieceRequestQueue PieceDispatcher
68
}
69

70
type synchronizerWatchdog struct {
71
	done              chan struct{}
72
	mainPeer          atomic.Value // save *schedulerv1.PeerPacket_DestPeer
73
	syncSuccess       *atomic.Bool
74
	peerTaskConductor *peerTaskConductor
75
}
76

77
type pieceTaskSynchronizerError struct {
78
	err error
79
}
80

81
// FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed
82
func (s *pieceTaskSyncManager) syncPeers(destPeers []*schedulerv1.PeerPacket_DestPeer, desiredPiece int32) {
83
	s.Lock()
84
	defer func() {
85
		if s.peerTaskConductor.WatchdogTimeout > 0 {
86
			s.resetWatchdog(destPeers[0])
87
		}
88
		s.Unlock()
89
	}()
90

91
	peersToKeep, peersToAdd, peersToClose := s.diffPeers(destPeers)
92

93
	for _, peer := range peersToAdd {
94
		s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
95
	}
96

97
	for _, peer := range peersToKeep {
98
		worker := s.workers[peer.PeerId]
99
		// worker is working, keep it going on
100
		if worker.error.Load() == nil {
101
			s.peerTaskConductor.Infof("reuse working PieceTaskSynchronizer %s", peer.PeerId)
102
		} else {
103
			s.peerTaskConductor.Infof("close stale PieceTaskSynchronizer %s and re-initialize it", peer.PeerId)
104
			// clean error worker
105
			worker.close()
106
			delete(s.workers, peer.PeerId)
107
			// reconnect and retry
108
			s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
109
		}
110
	}
111

112
	// close stale workers
113
	for _, p := range peersToClose {
114
		s.workers[p].close()
115
		delete(s.workers, p)
116
	}
117

118
	return
119
}
120

121
func (s *pieceTaskSyncManager) diffPeers(peers []*schedulerv1.PeerPacket_DestPeer) (
122
	peersToKeep []*schedulerv1.PeerPacket_DestPeer, peersToAdd []*schedulerv1.PeerPacket_DestPeer, peersToClose []string) {
123
	if len(s.workers) == 0 {
124
		return nil, peers, nil
125
	}
126

127
	cache := make(map[string]bool)
128
	for _, p := range peers {
129
		cache[p.PeerId] = true
130
		if _, ok := s.workers[p.PeerId]; ok {
131
			peersToKeep = append(peersToKeep, p)
132
		} else {
133
			peersToAdd = append(peersToAdd, p)
134
		}
135
	}
136

137
	for p := range s.workers {
138
		if !cache[p] {
139
			peersToClose = append(peersToClose, p)
140
		}
141
	}
142
	return
143
}
144

145
func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
146
	ctx context.Context,
147
	dstPeer *schedulerv1.PeerPacket_DestPeer,
148
	desiredPiece int32) {
149
	_, span := tracer.Start(s.ctx, config.SpanSyncPieceTasks)
150
	span.SetAttributes(config.AttributeTargetPeerID.String(dstPeer.PeerId))
151
	request := &commonv1.PieceTaskRequest{
152
		TaskId:   s.peerTaskConductor.taskID,
153
		SrcPid:   s.peerTaskConductor.peerID,
154
		DstPid:   dstPeer.PeerId,
155
		StartNum: uint32(desiredPiece),
156
		Limit:    16,
157
	}
158
	ctx, cancel := context.WithCancel(ctx)
159
	synchronizer := &pieceTaskSynchronizer{
160
		ctx:                 ctx,
161
		ctxCancel:           cancel,
162
		span:                span,
163
		peerTaskConductor:   s.peerTaskConductor,
164
		pieceRequestQueue:   s.pieceRequestQueue,
165
		dstPeer:             dstPeer,
166
		error:               atomic.Value{},
167
		grpcInitialized:     atomic.NewBool(false),
168
		grpcInitError:       atomic.Value{},
169
		SugaredLoggerOnWith: s.peerTaskConductor.With("targetPeerID", request.DstPid),
170
	}
171
	s.workers[dstPeer.PeerId] = synchronizer
172
	go synchronizer.start(request, dstPeer)
173
	return
174
}
175

176
func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) {
177
	if s.watchdog != nil {
178
		close(s.watchdog.done)
179
		s.peerTaskConductor.Debugf("close old watchdog")
180
	}
181
	s.watchdog = &synchronizerWatchdog{
182
		done:              make(chan struct{}),
183
		mainPeer:          atomic.Value{},
184
		syncSuccess:       atomic.NewBool(false),
185
		peerTaskConductor: s.peerTaskConductor,
186
	}
187
	s.watchdog.mainPeer.Store(mainPeer)
188
	s.peerTaskConductor.Infof("start new watchdog")
189
	go s.watchdog.watch(s.peerTaskConductor.WatchdogTimeout)
190
}
191

192
func compositePieceResult(peerTaskConductor *peerTaskConductor, destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) *schedulerv1.PieceResult {
193
	return &schedulerv1.PieceResult{
194
		TaskId:        peerTaskConductor.taskID,
195
		SrcPid:        peerTaskConductor.peerID,
196
		DstPid:        destPeer.PeerId,
197
		PieceInfo:     &commonv1.PieceInfo{},
198
		Success:       false,
199
		Code:          code,
200
		FinishedCount: peerTaskConductor.readyPieces.Settled(),
201
	}
202
}
203

204
func (s *pieceTaskSyncManager) reportInvalidPeer(destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) {
205
	sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, destPeer, code))
206
	if sendError != nil {
207
		s.peerTaskConductor.Errorf("connect peer %s failed and send piece result with error: %s", destPeer.PeerId, sendError)
208
		go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
209
	} else {
210
		s.peerTaskConductor.Debugf("report invalid peer %s/%d to scheduler", destPeer.PeerId, code)
211
	}
212
}
213

214
// acquire send the target piece to other peers
215
func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (attempt int, success int) {
216
	s.RLock()
217
	for _, p := range s.workers {
218
		attempt++
219
		if p.grpcInitialized.Load() && p.acquire(request) == nil {
220
			success++
221
		}
222
	}
223
	s.RUnlock()
224
	return
225
}
226

227
func (s *pieceTaskSyncManager) cancel() {
228
	s.ctxCancel()
229
	s.pieceRequestQueue.Close()
230
	s.Lock()
231
	for _, p := range s.workers {
232
		p.close()
233
	}
234
	s.workers = map[string]*pieceTaskSynchronizer{}
235
	s.Unlock()
236
}
237

238
func (s *pieceTaskSynchronizer) start(request *commonv1.PieceTaskRequest, dstPeer *schedulerv1.PeerPacket_DestPeer) {
239
	var startError error
240
	defer func() {
241
		if startError != nil {
242
			s.grpcInitError.Store(&pieceTaskSynchronizerError{startError})
243
			s.peerTaskConductor.Errorf("connect peer %s error: %s", dstPeer.PeerId, startError)
244
			if errors.Is(startError, context.DeadlineExceeded) {
245
				// connect timeout error, report to scheduler to get more available peers
246
				s.peerTaskConductor.pieceTaskSyncManager.reportInvalidPeer(dstPeer, commonv1.Code_ClientConnectionError)
247
			} else {
248
				// other errors, report to scheduler to get more available peers
249
				s.peerTaskConductor.pieceTaskSyncManager.reportInvalidPeer(dstPeer, commonv1.Code_ClientPieceRequestFail)
250
			}
251
		}
252
	}()
253

254
	formatIP, ok := ip.FormatIP(dstPeer.Ip)
255
	if !ok {
256
		startError = errors.New("format ip failed")
257
		return
258
	}
259

260
	netAddr := &dfnet.NetAddr{
261
		Type: dfnet.TCP,
262
		Addr: fmt.Sprintf("%s:%d", formatIP, dstPeer.RpcPort),
263
	}
264

265
	credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials)
266

267
	dialCtx, cancel := context.WithTimeout(s.ctx, s.peerTaskConductor.GRPCDialTimeout)
268
	grpcClient, err := dfdaemonclient.GetV1(dialCtx, netAddr.String(), credentialOpt, grpc.WithBlock())
269
	cancel()
270

271
	if err != nil {
272
		startError = err
273
		return
274
	}
275

276
	stream, err := grpcClient.SyncPieceTasks(s.ctx, request)
277
	// Refer: https://github.com/grpc/grpc-go/blob/v1.44.0/stream.go#L104
278
	// When receive io.EOF, the real error should be discovered using RecvMsg, here is client.Recv()
279
	if err == io.EOF && stream != nil {
280
		_, err = stream.Recv()
281
	}
282
	if err != nil {
283
		// grpc client must be close, Refer: https://github.com/grpc/grpc-go/issues/5321
284
		_ = grpcClient.Close()
285
		if stream != nil {
286
			_ = stream.CloseSend()
287
		}
288
		s.peerTaskConductor.Errorf("call SyncPieceTasks error: %s, dest peer: %s", err, dstPeer.PeerId)
289
		startError = err
290
		return
291
	}
292

293
	s.syncPiecesStream = stream
294
	s.grpcClient = grpcClient
295

296
	s.grpcInitialized.Store(true)
297
	s.receive()
298
}
299

300
func (s *pieceTaskSynchronizer) close() {
301
	s.ctxCancel()
302
	if s.grpcInitialized.Load() {
303
		s.closeGRPC()
304
		s.Infof("pieceTaskSynchronizer grpc closed")
305
	} else {
306
		go s.waitAndClose()
307
	}
308
}
309

310
// one of grpcInitialized and grpcInitError must be true, otherwise the pieceTaskSynchronizer is initializing, wait it
311
func (s *pieceTaskSynchronizer) waitAndClose() {
312
	for {
313
		// grpc is ready, just close
314
		if s.grpcInitialized.Load() {
315
			s.closeGRPC()
316
			s.Infof("pieceTaskSynchronizer grpc closed and exit in background")
317
			return
318
		}
319
		// grpc init error
320
		if s.grpcInitError.Load() != nil {
321
			s.Infof("pieceTaskSynchronizer grpc init error and exit in background")
322
			return
323
		}
324
		s.Infof("pieceTaskSynchronizer grpc is initializing, wait it completed in background")
325
		time.Sleep(time.Minute)
326
	}
327
}
328

329
func (s *pieceTaskSynchronizer) closeGRPC() {
330
	if err := s.syncPiecesStream.CloseSend(); err != nil {
331
		s.error.Store(&pieceTaskSynchronizerError{err})
332
		s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId)
333
		s.span.RecordError(err)
334
	}
335
	if err := s.grpcClient.Close(); err != nil {
336
		s.error.Store(&pieceTaskSynchronizerError{err})
337
		s.Debugf("close grpc client error: %s, dest peer: %s", err, s.dstPeer.PeerId)
338
		s.span.RecordError(err)
339
	}
340
	s.span.End()
341
}
342

343
func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.PiecePacket) {
344
	s.peerTaskConductor.updateMetadata(piecePacket)
345

346
	pieceCount := len(piecePacket.PieceInfos)
347
	s.Debugf("dispatch piece request, piece count: %d, dest peer: %s", pieceCount, s.dstPeer.PeerId)
348
	// peers maybe send zero piece info, but with total piece count and content length
349
	if pieceCount == 0 {
350
		finished := s.peerTaskConductor.isCompleted()
351
		if finished {
352
			s.peerTaskConductor.Done()
353
		}
354
		return
355
	}
356
	for _, piece := range piecePacket.PieceInfos {
357
		s.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d",
358
			piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
359
		// FIXME when set total piece but no total digest, fetch again
360
		s.peerTaskConductor.requestedPiecesLock.Lock()
361
		if !s.peerTaskConductor.requestedPieces.IsSet(piece.PieceNum) {
362
			s.peerTaskConductor.requestedPieces.Set(piece.PieceNum)
363
		}
364
		s.peerTaskConductor.requestedPiecesLock.Unlock()
365
		req := &DownloadPieceRequest{
366
			storage: s.peerTaskConductor.GetStorage(),
367
			piece:   piece,
368
			log:     s.peerTaskConductor.Log(),
369
			TaskID:  s.peerTaskConductor.GetTaskID(),
370
			PeerID:  s.peerTaskConductor.GetPeerID(),
371
			DstPid:  piecePacket.DstPid,
372
			DstAddr: piecePacket.DstAddr,
373
		}
374

375
		s.pieceRequestQueue.Put(req)
376
		s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum))
377

378
		select {
379
		case <-s.peerTaskConductor.successCh:
380
			s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
381
		case <-s.peerTaskConductor.failCh:
382
			s.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
383
		default:
384
		}
385
	}
386
}
387

388
func (s *pieceTaskSynchronizer) receive() {
389
	var (
390
		piecePacket *commonv1.PiecePacket
391
		err         error
392
	)
393
	for {
394
		piecePacket, err = s.syncPiecesStream.Recv()
395
		if err != nil {
396
			break
397
		}
398
		s.dispatchPieceRequest(piecePacket)
399
	}
400

401
	if err == io.EOF {
402
		s.Debugf("synchronizer receives io.EOF")
403
	} else if s.canceled(err) {
404
		s.Debugf("synchronizer receives canceled")
405
		s.error.Store(&pieceTaskSynchronizerError{err})
406
	} else {
407
		s.Errorf("synchronizer receives with error: %s", err)
408
		s.error.Store(&pieceTaskSynchronizerError{err})
409
		s.reportError(err)
410
	}
411
}
412

413
func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) error {
414
	if s.error.Load() != nil {
415
		err := s.error.Load().(*pieceTaskSynchronizerError).err
416
		s.Debugf("synchronizer already error %s, skip acquire more pieces", err)
417
		return err
418
	}
419
	request.DstPid = s.dstPeer.PeerId
420
	err := s.syncPiecesStream.Send(request)
421
	s.span.AddEvent(fmt.Sprintf("send piece #%d request", request.StartNum))
422
	if err != nil {
423
		// send should always ok
424
		s.error.Store(&pieceTaskSynchronizerError{err})
425
		s.Errorf("synchronizer sends with error: %s", err)
426
		s.reportError(err)
427
	}
428
	return err
429
}
430

431
func (s *pieceTaskSynchronizer) reportError(err error) {
432
	s.span.RecordError(err)
433
	errCode := commonv1.Code_ClientPieceRequestFail
434

435
	// extract DfError for grpc status
436
	de, ok := dferrors.IsGRPCDfError(err)
437
	if ok {
438
		errCode = de.Code
439
		s.Errorf("report error with convert code from grpc error, code: %d, message: %s", de.Code, de.Message)
440
	}
441

442
	sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, errCode))
443
	if sendError != nil {
444
		s.Errorf("sync piece info failed and send piece result with error: %s", sendError)
445
		go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
446
	} else {
447
		s.Debugf("report sync piece error to scheduler")
448
	}
449
}
450

451
func (s *pieceTaskSynchronizer) canceled(err error) bool {
452
	if errors.Is(err, context.Canceled) {
453
		s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId)
454
		return true
455
	}
456
	if stat, ok := err.(interface{ GRPCStatus() *status.Status }); ok {
457
		if stat.GRPCStatus().Code() == codes.Canceled {
458
			s.Debugf("grpc canceled, dst peer: %s", s.dstPeer.PeerId)
459
			return true
460
		}
461
	}
462
	return false
463
}
464

465
func (s *synchronizerWatchdog) watch(timeout time.Duration) {
466
	select {
467
	case <-time.After(timeout):
468
		if s.peerTaskConductor.readyPieces.Settled() == 0 {
469
			s.peerTaskConductor.Warnf("watch sync pieces timeout, may be a bug, " +
470
				"please file a issue in https://github.com/dragonflyoss/Dragonfly2/issues")
471
			s.syncSuccess.Store(false)
472
			s.reportWatchFailed()
473
		} else {
474
			s.peerTaskConductor.Infof("watch sync pieces ok")
475
		}
476
	case <-s.peerTaskConductor.successCh:
477
		s.peerTaskConductor.Debugf("peer task success, watchdog exit")
478
	case <-s.peerTaskConductor.failCh:
479
		s.peerTaskConductor.Debugf("peer task fail, watchdog exit")
480
	case <-s.done:
481
		s.peerTaskConductor.Debugf("watchdog done, exit")
482
	}
483
}
484

485
func (s *synchronizerWatchdog) reportWatchFailed() {
486
	sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(
487
		s.peerTaskConductor, s.mainPeer.Load().(*schedulerv1.PeerPacket_DestPeer), commonv1.Code_ClientPieceRequestFail))
488
	if sendError != nil {
489
		s.peerTaskConductor.Errorf("watchdog sync piece info failed and send piece result with error: %s", sendError)
490
		go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
491
	} else {
492
		s.peerTaskConductor.Debugf("report watchdog sync piece error to scheduler")
493
	}
494
}
495

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.