Dragonfly2
494 строки · 15.4 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"context"
21"errors"
22"fmt"
23"io"
24"sync"
25"time"
26
27"go.opentelemetry.io/otel/trace"
28"go.uber.org/atomic"
29"google.golang.org/grpc"
30"google.golang.org/grpc/codes"
31"google.golang.org/grpc/status"
32
33commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
34dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
35schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
36
37"d7y.io/dragonfly/v2/client/config"
38"d7y.io/dragonfly/v2/internal/dferrors"
39logger "d7y.io/dragonfly/v2/internal/dflog"
40"d7y.io/dragonfly/v2/pkg/dfnet"
41"d7y.io/dragonfly/v2/pkg/net/ip"
42dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
43)
44
45type pieceTaskSyncManager struct {
46sync.RWMutex
47ctx context.Context
48ctxCancel context.CancelFunc
49peerTaskConductor *peerTaskConductor
50pieceRequestQueue PieceDispatcher
51workers map[string]*pieceTaskSynchronizer
52watchdog *synchronizerWatchdog
53}
54
55type pieceTaskSynchronizer struct {
56*logger.SugaredLoggerOnWith
57ctx context.Context
58ctxCancel context.CancelFunc
59span trace.Span
60syncPiecesStream dfdaemonv1.Daemon_SyncPieceTasksClient
61grpcClient dfdaemonclient.V1
62dstPeer *schedulerv1.PeerPacket_DestPeer
63error atomic.Value
64grpcInitialized *atomic.Bool
65grpcInitError atomic.Value
66peerTaskConductor *peerTaskConductor
67pieceRequestQueue PieceDispatcher
68}
69
70type synchronizerWatchdog struct {
71done chan struct{}
72mainPeer atomic.Value // save *schedulerv1.PeerPacket_DestPeer
73syncSuccess *atomic.Bool
74peerTaskConductor *peerTaskConductor
75}
76
77type pieceTaskSynchronizerError struct {
78err error
79}
80
81// FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed
82func (s *pieceTaskSyncManager) syncPeers(destPeers []*schedulerv1.PeerPacket_DestPeer, desiredPiece int32) {
83s.Lock()
84defer func() {
85if s.peerTaskConductor.WatchdogTimeout > 0 {
86s.resetWatchdog(destPeers[0])
87}
88s.Unlock()
89}()
90
91peersToKeep, peersToAdd, peersToClose := s.diffPeers(destPeers)
92
93for _, peer := range peersToAdd {
94s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
95}
96
97for _, peer := range peersToKeep {
98worker := s.workers[peer.PeerId]
99// worker is working, keep it going on
100if worker.error.Load() == nil {
101s.peerTaskConductor.Infof("reuse working PieceTaskSynchronizer %s", peer.PeerId)
102} else {
103s.peerTaskConductor.Infof("close stale PieceTaskSynchronizer %s and re-initialize it", peer.PeerId)
104// clean error worker
105worker.close()
106delete(s.workers, peer.PeerId)
107// reconnect and retry
108s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece)
109}
110}
111
112// close stale workers
113for _, p := range peersToClose {
114s.workers[p].close()
115delete(s.workers, p)
116}
117
118return
119}
120
121func (s *pieceTaskSyncManager) diffPeers(peers []*schedulerv1.PeerPacket_DestPeer) (
122peersToKeep []*schedulerv1.PeerPacket_DestPeer, peersToAdd []*schedulerv1.PeerPacket_DestPeer, peersToClose []string) {
123if len(s.workers) == 0 {
124return nil, peers, nil
125}
126
127cache := make(map[string]bool)
128for _, p := range peers {
129cache[p.PeerId] = true
130if _, ok := s.workers[p.PeerId]; ok {
131peersToKeep = append(peersToKeep, p)
132} else {
133peersToAdd = append(peersToAdd, p)
134}
135}
136
137for p := range s.workers {
138if !cache[p] {
139peersToClose = append(peersToClose, p)
140}
141}
142return
143}
144
145func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
146ctx context.Context,
147dstPeer *schedulerv1.PeerPacket_DestPeer,
148desiredPiece int32) {
149_, span := tracer.Start(s.ctx, config.SpanSyncPieceTasks)
150span.SetAttributes(config.AttributeTargetPeerID.String(dstPeer.PeerId))
151request := &commonv1.PieceTaskRequest{
152TaskId: s.peerTaskConductor.taskID,
153SrcPid: s.peerTaskConductor.peerID,
154DstPid: dstPeer.PeerId,
155StartNum: uint32(desiredPiece),
156Limit: 16,
157}
158ctx, cancel := context.WithCancel(ctx)
159synchronizer := &pieceTaskSynchronizer{
160ctx: ctx,
161ctxCancel: cancel,
162span: span,
163peerTaskConductor: s.peerTaskConductor,
164pieceRequestQueue: s.pieceRequestQueue,
165dstPeer: dstPeer,
166error: atomic.Value{},
167grpcInitialized: atomic.NewBool(false),
168grpcInitError: atomic.Value{},
169SugaredLoggerOnWith: s.peerTaskConductor.With("targetPeerID", request.DstPid),
170}
171s.workers[dstPeer.PeerId] = synchronizer
172go synchronizer.start(request, dstPeer)
173return
174}
175
176func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) {
177if s.watchdog != nil {
178close(s.watchdog.done)
179s.peerTaskConductor.Debugf("close old watchdog")
180}
181s.watchdog = &synchronizerWatchdog{
182done: make(chan struct{}),
183mainPeer: atomic.Value{},
184syncSuccess: atomic.NewBool(false),
185peerTaskConductor: s.peerTaskConductor,
186}
187s.watchdog.mainPeer.Store(mainPeer)
188s.peerTaskConductor.Infof("start new watchdog")
189go s.watchdog.watch(s.peerTaskConductor.WatchdogTimeout)
190}
191
192func compositePieceResult(peerTaskConductor *peerTaskConductor, destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) *schedulerv1.PieceResult {
193return &schedulerv1.PieceResult{
194TaskId: peerTaskConductor.taskID,
195SrcPid: peerTaskConductor.peerID,
196DstPid: destPeer.PeerId,
197PieceInfo: &commonv1.PieceInfo{},
198Success: false,
199Code: code,
200FinishedCount: peerTaskConductor.readyPieces.Settled(),
201}
202}
203
204func (s *pieceTaskSyncManager) reportInvalidPeer(destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) {
205sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, destPeer, code))
206if sendError != nil {
207s.peerTaskConductor.Errorf("connect peer %s failed and send piece result with error: %s", destPeer.PeerId, sendError)
208go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
209} else {
210s.peerTaskConductor.Debugf("report invalid peer %s/%d to scheduler", destPeer.PeerId, code)
211}
212}
213
214// acquire send the target piece to other peers
215func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (attempt int, success int) {
216s.RLock()
217for _, p := range s.workers {
218attempt++
219if p.grpcInitialized.Load() && p.acquire(request) == nil {
220success++
221}
222}
223s.RUnlock()
224return
225}
226
227func (s *pieceTaskSyncManager) cancel() {
228s.ctxCancel()
229s.pieceRequestQueue.Close()
230s.Lock()
231for _, p := range s.workers {
232p.close()
233}
234s.workers = map[string]*pieceTaskSynchronizer{}
235s.Unlock()
236}
237
238func (s *pieceTaskSynchronizer) start(request *commonv1.PieceTaskRequest, dstPeer *schedulerv1.PeerPacket_DestPeer) {
239var startError error
240defer func() {
241if startError != nil {
242s.grpcInitError.Store(&pieceTaskSynchronizerError{startError})
243s.peerTaskConductor.Errorf("connect peer %s error: %s", dstPeer.PeerId, startError)
244if errors.Is(startError, context.DeadlineExceeded) {
245// connect timeout error, report to scheduler to get more available peers
246s.peerTaskConductor.pieceTaskSyncManager.reportInvalidPeer(dstPeer, commonv1.Code_ClientConnectionError)
247} else {
248// other errors, report to scheduler to get more available peers
249s.peerTaskConductor.pieceTaskSyncManager.reportInvalidPeer(dstPeer, commonv1.Code_ClientPieceRequestFail)
250}
251}
252}()
253
254formatIP, ok := ip.FormatIP(dstPeer.Ip)
255if !ok {
256startError = errors.New("format ip failed")
257return
258}
259
260netAddr := &dfnet.NetAddr{
261Type: dfnet.TCP,
262Addr: fmt.Sprintf("%s:%d", formatIP, dstPeer.RpcPort),
263}
264
265credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials)
266
267dialCtx, cancel := context.WithTimeout(s.ctx, s.peerTaskConductor.GRPCDialTimeout)
268grpcClient, err := dfdaemonclient.GetV1(dialCtx, netAddr.String(), credentialOpt, grpc.WithBlock())
269cancel()
270
271if err != nil {
272startError = err
273return
274}
275
276stream, err := grpcClient.SyncPieceTasks(s.ctx, request)
277// Refer: https://github.com/grpc/grpc-go/blob/v1.44.0/stream.go#L104
278// When receive io.EOF, the real error should be discovered using RecvMsg, here is client.Recv()
279if err == io.EOF && stream != nil {
280_, err = stream.Recv()
281}
282if err != nil {
283// grpc client must be close, Refer: https://github.com/grpc/grpc-go/issues/5321
284_ = grpcClient.Close()
285if stream != nil {
286_ = stream.CloseSend()
287}
288s.peerTaskConductor.Errorf("call SyncPieceTasks error: %s, dest peer: %s", err, dstPeer.PeerId)
289startError = err
290return
291}
292
293s.syncPiecesStream = stream
294s.grpcClient = grpcClient
295
296s.grpcInitialized.Store(true)
297s.receive()
298}
299
300func (s *pieceTaskSynchronizer) close() {
301s.ctxCancel()
302if s.grpcInitialized.Load() {
303s.closeGRPC()
304s.Infof("pieceTaskSynchronizer grpc closed")
305} else {
306go s.waitAndClose()
307}
308}
309
310// one of grpcInitialized and grpcInitError must be true, otherwise the pieceTaskSynchronizer is initializing, wait it
311func (s *pieceTaskSynchronizer) waitAndClose() {
312for {
313// grpc is ready, just close
314if s.grpcInitialized.Load() {
315s.closeGRPC()
316s.Infof("pieceTaskSynchronizer grpc closed and exit in background")
317return
318}
319// grpc init error
320if s.grpcInitError.Load() != nil {
321s.Infof("pieceTaskSynchronizer grpc init error and exit in background")
322return
323}
324s.Infof("pieceTaskSynchronizer grpc is initializing, wait it completed in background")
325time.Sleep(time.Minute)
326}
327}
328
329func (s *pieceTaskSynchronizer) closeGRPC() {
330if err := s.syncPiecesStream.CloseSend(); err != nil {
331s.error.Store(&pieceTaskSynchronizerError{err})
332s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId)
333s.span.RecordError(err)
334}
335if err := s.grpcClient.Close(); err != nil {
336s.error.Store(&pieceTaskSynchronizerError{err})
337s.Debugf("close grpc client error: %s, dest peer: %s", err, s.dstPeer.PeerId)
338s.span.RecordError(err)
339}
340s.span.End()
341}
342
343func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.PiecePacket) {
344s.peerTaskConductor.updateMetadata(piecePacket)
345
346pieceCount := len(piecePacket.PieceInfos)
347s.Debugf("dispatch piece request, piece count: %d, dest peer: %s", pieceCount, s.dstPeer.PeerId)
348// peers maybe send zero piece info, but with total piece count and content length
349if pieceCount == 0 {
350finished := s.peerTaskConductor.isCompleted()
351if finished {
352s.peerTaskConductor.Done()
353}
354return
355}
356for _, piece := range piecePacket.PieceInfos {
357s.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d",
358piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
359// FIXME when set total piece but no total digest, fetch again
360s.peerTaskConductor.requestedPiecesLock.Lock()
361if !s.peerTaskConductor.requestedPieces.IsSet(piece.PieceNum) {
362s.peerTaskConductor.requestedPieces.Set(piece.PieceNum)
363}
364s.peerTaskConductor.requestedPiecesLock.Unlock()
365req := &DownloadPieceRequest{
366storage: s.peerTaskConductor.GetStorage(),
367piece: piece,
368log: s.peerTaskConductor.Log(),
369TaskID: s.peerTaskConductor.GetTaskID(),
370PeerID: s.peerTaskConductor.GetPeerID(),
371DstPid: piecePacket.DstPid,
372DstAddr: piecePacket.DstAddr,
373}
374
375s.pieceRequestQueue.Put(req)
376s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum))
377
378select {
379case <-s.peerTaskConductor.successCh:
380s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
381case <-s.peerTaskConductor.failCh:
382s.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
383default:
384}
385}
386}
387
388func (s *pieceTaskSynchronizer) receive() {
389var (
390piecePacket *commonv1.PiecePacket
391err error
392)
393for {
394piecePacket, err = s.syncPiecesStream.Recv()
395if err != nil {
396break
397}
398s.dispatchPieceRequest(piecePacket)
399}
400
401if err == io.EOF {
402s.Debugf("synchronizer receives io.EOF")
403} else if s.canceled(err) {
404s.Debugf("synchronizer receives canceled")
405s.error.Store(&pieceTaskSynchronizerError{err})
406} else {
407s.Errorf("synchronizer receives with error: %s", err)
408s.error.Store(&pieceTaskSynchronizerError{err})
409s.reportError(err)
410}
411}
412
413func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) error {
414if s.error.Load() != nil {
415err := s.error.Load().(*pieceTaskSynchronizerError).err
416s.Debugf("synchronizer already error %s, skip acquire more pieces", err)
417return err
418}
419request.DstPid = s.dstPeer.PeerId
420err := s.syncPiecesStream.Send(request)
421s.span.AddEvent(fmt.Sprintf("send piece #%d request", request.StartNum))
422if err != nil {
423// send should always ok
424s.error.Store(&pieceTaskSynchronizerError{err})
425s.Errorf("synchronizer sends with error: %s", err)
426s.reportError(err)
427}
428return err
429}
430
431func (s *pieceTaskSynchronizer) reportError(err error) {
432s.span.RecordError(err)
433errCode := commonv1.Code_ClientPieceRequestFail
434
435// extract DfError for grpc status
436de, ok := dferrors.IsGRPCDfError(err)
437if ok {
438errCode = de.Code
439s.Errorf("report error with convert code from grpc error, code: %d, message: %s", de.Code, de.Message)
440}
441
442sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, errCode))
443if sendError != nil {
444s.Errorf("sync piece info failed and send piece result with error: %s", sendError)
445go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
446} else {
447s.Debugf("report sync piece error to scheduler")
448}
449}
450
451func (s *pieceTaskSynchronizer) canceled(err error) bool {
452if errors.Is(err, context.Canceled) {
453s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId)
454return true
455}
456if stat, ok := err.(interface{ GRPCStatus() *status.Status }); ok {
457if stat.GRPCStatus().Code() == codes.Canceled {
458s.Debugf("grpc canceled, dst peer: %s", s.dstPeer.PeerId)
459return true
460}
461}
462return false
463}
464
465func (s *synchronizerWatchdog) watch(timeout time.Duration) {
466select {
467case <-time.After(timeout):
468if s.peerTaskConductor.readyPieces.Settled() == 0 {
469s.peerTaskConductor.Warnf("watch sync pieces timeout, may be a bug, " +
470"please file a issue in https://github.com/dragonflyoss/Dragonfly2/issues")
471s.syncSuccess.Store(false)
472s.reportWatchFailed()
473} else {
474s.peerTaskConductor.Infof("watch sync pieces ok")
475}
476case <-s.peerTaskConductor.successCh:
477s.peerTaskConductor.Debugf("peer task success, watchdog exit")
478case <-s.peerTaskConductor.failCh:
479s.peerTaskConductor.Debugf("peer task fail, watchdog exit")
480case <-s.done:
481s.peerTaskConductor.Debugf("watchdog done, exit")
482}
483}
484
485func (s *synchronizerWatchdog) reportWatchFailed() {
486sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(
487s.peerTaskConductor, s.mainPeer.Load().(*schedulerv1.PeerPacket_DestPeer), commonv1.Code_ClientPieceRequestFail))
488if sendError != nil {
489s.peerTaskConductor.Errorf("watchdog sync piece info failed and send piece result with error: %s", sendError)
490go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
491} else {
492s.peerTaskConductor.Debugf("report watchdog sync piece error to scheduler")
493}
494}
495