Dragonfly2
1584 строки · 47.2 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"bytes"
21"context"
22"errors"
23"fmt"
24"io"
25"runtime/debug"
26"sync"
27"time"
28
29semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
30"go.opentelemetry.io/otel/trace"
31"go.uber.org/atomic"
32"golang.org/x/time/rate"
33"google.golang.org/grpc/codes"
34"google.golang.org/grpc/credentials"
35"google.golang.org/grpc/status"
36
37commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
38errordetailsv1 "d7y.io/api/v2/pkg/apis/errordetails/v1"
39schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
40
41"d7y.io/dragonfly/v2/client/config"
42"d7y.io/dragonfly/v2/client/daemon/metrics"
43"d7y.io/dragonfly/v2/client/daemon/storage"
44"d7y.io/dragonfly/v2/internal/dferrors"
45logger "d7y.io/dragonfly/v2/internal/dflog"
46"d7y.io/dragonfly/v2/pkg/digest"
47"d7y.io/dragonfly/v2/pkg/idgen"
48nethttp "d7y.io/dragonfly/v2/pkg/net/http"
49"d7y.io/dragonfly/v2/pkg/rpc/common"
50schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
51"d7y.io/dragonfly/v2/pkg/source"
52)
53
54const (
55// TODO implement peer task health check
56// reasonContextCanceled = "context canceled"
57// reasonReScheduleTimeout = "wait more available peers from scheduler timeout"
58reasonScheduleTimeout = "wait first peer packet from scheduler timeout"
59reasonPeerGoneFromScheduler = "scheduler says client should disconnect"
60reasonBackSourceDisabled = "download from source disabled"
61
62failedReasonNotSet = "unknown"
63)
64
65var _ Task = (*peerTaskConductor)(nil)
66
67// peerTaskConductor will fetch all pieces from other peers and send pieces info to broker
68type peerTaskConductor struct {
69TaskOption
70*logger.SugaredLoggerOnWith
71
72// ctx is with span info for tracing
73// we use successCh and failCh mark task success or fail
74ctx context.Context
75ctxCancel context.CancelFunc
76// piece download uses this context
77pieceDownloadCtx context.Context
78// when back source, cancel all piece download action
79pieceDownloadCancel context.CancelFunc
80
81// request is the original PeerTaskRequest
82request *schedulerv1.PeerTaskRequest
83
84// needBackSource indicates downloading resource from instead of other peers
85needBackSource *atomic.Bool
86seed bool
87
88peerTaskManager *peerTaskManager
89
90storage storage.TaskStorageDriver
91
92schedulerClient schedulerclient.V1
93
94// peer task meta info
95peerID string
96taskID string
97totalPiece *atomic.Int32
98digest *atomic.String
99contentLength *atomic.Int64
100completedLength *atomic.Int64
101usedTraffic *atomic.Uint64
102header atomic.Value
103
104broker *pieceBroker
105
106sizeScope commonv1.SizeScope
107singlePiece *schedulerv1.SinglePiece
108tinyData *TinyData
109
110// peerPacketStream stands schedulerclient.PeerPacketStream from scheduler
111peerPacketStream schedulerv1.Scheduler_ReportPieceResultClient
112legacyPeerCount *atomic.Int64
113// pieceTaskSyncManager syncs piece task from other peers
114pieceTaskSyncManager *pieceTaskSyncManager
115
116// same actions must be done only once, like close done channel and so on
117statusOnce sync.Once
118// done channel will be closed when peer task success
119successCh chan struct{}
120// fail channel will be closed after peer task fail
121failCh chan struct{}
122
123// span stands open telemetry trace span
124span trace.Span
125
126// failedReason will be set when peer task failed
127failedReason string
128// failedReason will be set when peer task failed
129failedCode commonv1.Code
130
131// readyPieces stands all downloaded pieces
132readyPieces *Bitmap
133// lock used by piece result manage, when update readyPieces, lock first
134readyPiecesLock sync.RWMutex
135// runningPieces stands all downloading pieces
136runningPieces *Bitmap
137// lock used by piece download worker
138runningPiecesLock sync.Mutex
139// requestedPieces stands all pieces requested from peers
140requestedPieces *Bitmap
141// lock used by piece download worker
142requestedPiecesLock sync.RWMutex
143// lock used by send piece result
144sendPieceResultLock sync.Mutex
145// trafficShaper used to automatically allocate bandwidth for every peer task
146trafficShaper TrafficShaper
147// limiter will be used when enable per peer task rate limit
148limiter *rate.Limiter
149
150startTime time.Time
151
152// subtask only
153parent *peerTaskConductor
154rg *nethttp.Range
155
156sourceErrorStatus *status.Status
157}
158
159type TaskOption struct {
160// PeerHost info about current PeerHost
161PeerHost *schedulerv1.PeerHost
162// PieceManager will be used for downloading piece
163PieceManager PieceManager
164StorageManager storage.Manager
165// schedule options
166SchedulerOption config.SchedulerOption
167CalculateDigest bool
168GRPCCredentials credentials.TransportCredentials
169GRPCDialTimeout time.Duration
170// WatchdogTimeout > 0 indicates to start watch dog for every single peer task
171WatchdogTimeout time.Duration
172}
173
174func (ptm *peerTaskManager) newPeerTaskConductor(
175ctx context.Context,
176request *schedulerv1.PeerTaskRequest,
177limit rate.Limit,
178parent *peerTaskConductor,
179rg *nethttp.Range,
180seed bool) *peerTaskConductor {
181// use a new context with span info
182ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
183ctx, span := tracer.Start(ctx, config.SpanPeerTask, trace.WithSpanKind(trace.SpanKindClient))
184span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))
185span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))
186span.SetAttributes(config.AttributePeerID.String(request.PeerId))
187span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
188
189taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
190request.TaskId = taskID
191
192// init log with values
193var (
194log *logger.SugaredLoggerOnWith
195traceID = span.SpanContext().TraceID()
196)
197
198logKV := []any{
199"peer", request.PeerId,
200"task", taskID,
201"component", "PeerTask",
202}
203if traceID.IsValid() {
204logKV = append(logKV, "trace", traceID.String())
205}
206log = logger.With(logKV...)
207
208span.SetAttributes(config.AttributeTaskID.String(taskID))
209
210ctx, cancel := context.WithCancel(ctx)
211ptc := &peerTaskConductor{
212TaskOption: ptm.TaskOption,
213peerTaskManager: ptm,
214request: request,
215startTime: time.Now(),
216ctx: ctx,
217ctxCancel: cancel,
218broker: newPieceBroker(),
219peerID: request.PeerId,
220taskID: taskID,
221successCh: make(chan struct{}),
222failCh: make(chan struct{}),
223legacyPeerCount: atomic.NewInt64(0),
224span: span,
225readyPieces: NewBitmap(),
226runningPieces: NewBitmap(),
227requestedPieces: NewBitmap(),
228failedReason: failedReasonNotSet,
229failedCode: commonv1.Code_UnknownError,
230contentLength: atomic.NewInt64(-1),
231totalPiece: atomic.NewInt32(-1),
232digest: atomic.NewString(""),
233trafficShaper: ptm.trafficShaper,
234limiter: rate.NewLimiter(limit, int(limit)),
235completedLength: atomic.NewInt64(0),
236usedTraffic: atomic.NewUint64(0),
237SugaredLoggerOnWith: log,
238seed: seed,
239parent: parent,
240rg: rg,
241}
242
243ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx)
244
245return ptc
246}
247
248// register to scheduler, if error and disable auto back source, return error, otherwise return nil
249func (pt *peerTaskConductor) register() error {
250pt.Debugf("request overview, pid: %s, url: %s, filter: %s, tag: %s, range: %s, digest: %s, header: %#v",
251pt.request.PeerId, pt.request.Url, pt.request.UrlMeta.Filter, pt.request.UrlMeta.Tag, pt.request.UrlMeta.Range, pt.request.UrlMeta.Digest, pt.request.UrlMeta.Header)
252// trace register
253regCtx, cancel := context.WithTimeout(pt.ctx, pt.SchedulerOption.ScheduleTimeout.Duration)
254defer cancel()
255regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask)
256
257var (
258needBackSource bool
259sizeScope commonv1.SizeScope
260singlePiece *schedulerv1.SinglePiece
261tinyData *TinyData
262)
263
264pt.Infof("step 1: peer %s start to register", pt.request.PeerId)
265pt.schedulerClient = pt.peerTaskManager.SchedulerClient
266
267result, err := pt.schedulerClient.RegisterPeerTask(regCtx, pt.request)
268regSpan.RecordError(err)
269regSpan.End()
270
271if err != nil {
272if errors.Is(err, context.DeadlineExceeded) {
273pt.Errorf("scheduler did not response in %s", pt.SchedulerOption.ScheduleTimeout.Duration)
274}
275pt.Errorf("step 1: peer %s register failed: %s", pt.request.PeerId, err)
276if pt.SchedulerOption.DisableAutoBackSource {
277// when peer register failed, some actions need to do with peerPacketStream
278pt.peerPacketStream = &dummyPeerPacketStream{}
279pt.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, pt.request.PeerId)
280pt.span.RecordError(err)
281pt.cancel(commonv1.Code_SchedError, err.Error())
282return err
283}
284needBackSource = true
285// can not detect source or scheduler error, create a new dummy scheduler client
286pt.schedulerClient = &dummySchedulerClient{}
287result = &schedulerv1.RegisterResult{TaskId: pt.taskID}
288pt.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, pt.request.PeerId)
289} else {
290pt.Infof("register task success, SizeScope: %s", commonv1.SizeScope_name[int32(result.SizeScope)])
291}
292
293var header map[string]string
294if !needBackSource {
295sizeScope = result.SizeScope
296switch result.SizeScope {
297case commonv1.SizeScope_NORMAL:
298pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("normal"))
299case commonv1.SizeScope_SMALL:
300pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("small"))
301if piece, ok := result.DirectPiece.(*schedulerv1.RegisterResult_SinglePiece); ok {
302singlePiece = piece.SinglePiece
303}
304if result.ExtendAttribute != nil {
305header = result.ExtendAttribute.Header
306}
307case commonv1.SizeScope_TINY:
308pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("tiny"))
309if piece, ok := result.DirectPiece.(*schedulerv1.RegisterResult_PieceContent); ok {
310tinyData = &TinyData{
311TaskID: result.TaskId,
312PeerID: pt.request.PeerId,
313Content: piece.PieceContent,
314}
315} else {
316err = errors.New("scheduler return tiny piece but can not parse piece content")
317// when peer register failed, some actions need to do with peerPacketStream
318pt.peerPacketStream = &dummyPeerPacketStream{}
319pt.span.RecordError(err)
320pt.Errorf("%s", err)
321pt.cancel(commonv1.Code_SchedError, err.Error())
322return err
323}
324if result.ExtendAttribute != nil {
325header = result.ExtendAttribute.Header
326}
327case commonv1.SizeScope_EMPTY:
328tinyData = &TinyData{
329TaskID: result.TaskId,
330PeerID: pt.request.PeerId,
331Content: []byte{},
332}
333pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("empty"))
334if result.ExtendAttribute != nil {
335header = result.ExtendAttribute.Header
336}
337}
338}
339
340peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.request)
341pt.Infof("step 2: start report piece result")
342if err != nil {
343// when peer register failed, some actions need to do with peerPacketStream
344pt.peerPacketStream = &dummyPeerPacketStream{}
345pt.span.RecordError(err)
346pt.cancel(commonv1.Code_SchedError, err.Error())
347return err
348}
349
350pt.peerPacketStream = peerPacketStream
351pt.sizeScope = sizeScope
352pt.singlePiece = singlePiece
353pt.tinyData = tinyData
354pt.needBackSource = atomic.NewBool(needBackSource)
355
356if len(header) > 0 {
357pt.SetHeader(header)
358}
359return nil
360}
361
362func (pt *peerTaskConductor) start() error {
363// when is seed task, setup back source
364if pt.seed {
365pt.peerPacketStream = &dummyPeerPacketStream{}
366pt.schedulerClient = &dummySchedulerClient{}
367pt.sizeScope = commonv1.SizeScope_NORMAL
368pt.needBackSource = atomic.NewBool(true)
369} else {
370// register to scheduler
371if err := pt.register(); err != nil {
372return err
373}
374}
375
376pt.trafficShaper.AddTask(pt.peerTaskManager.getRunningTaskKey(pt.taskID, pt.peerID), pt)
377go pt.broker.Start()
378go pt.pullPieces()
379return nil
380}
381
382func (pt *peerTaskConductor) GetPeerID() string {
383return pt.peerID
384}
385
386func (pt *peerTaskConductor) GetTaskID() string {
387return pt.taskID
388}
389
390func (pt *peerTaskConductor) GetStorage() storage.TaskStorageDriver {
391return pt.storage
392}
393
394func (pt *peerTaskConductor) GetContentLength() int64 {
395return pt.contentLength.Load()
396}
397
398func (pt *peerTaskConductor) SetContentLength(i int64) {
399pt.contentLength.Store(i)
400}
401
402func (pt *peerTaskConductor) AddTraffic(n uint64) {
403pt.usedTraffic.Add(n)
404}
405
406func (pt *peerTaskConductor) GetTraffic() uint64 {
407return pt.usedTraffic.Load()
408}
409
410func (pt *peerTaskConductor) GetTotalPieces() int32 {
411return pt.totalPiece.Load()
412}
413
414func (pt *peerTaskConductor) SetTotalPieces(i int32) {
415pt.totalPiece.Store(i)
416}
417
418func (pt *peerTaskConductor) SetPieceMd5Sign(md5 string) {
419pt.digest.Store(md5)
420}
421
422func (pt *peerTaskConductor) GetPieceMd5Sign() string {
423return pt.digest.Load()
424}
425
426func (pt *peerTaskConductor) SetHeader(header map[string]string) {
427var hdr = &source.Header{}
428for k, v := range header {
429hdr.Set(k, v)
430}
431pt.header.Store(hdr)
432}
433
434func (pt *peerTaskConductor) GetHeader() *source.Header {
435hdr := pt.header.Load()
436if hdr != nil {
437return hdr.(*source.Header)
438}
439return nil
440}
441
442func (pt *peerTaskConductor) Context() context.Context {
443return pt.ctx
444}
445
446func (pt *peerTaskConductor) Log() *logger.SugaredLoggerOnWith {
447return pt.SugaredLoggerOnWith
448}
449
450func (pt *peerTaskConductor) UpdateSourceErrorStatus(st *status.Status) {
451pt.sourceErrorStatus = st
452}
453
454func (pt *peerTaskConductor) cancel(code commonv1.Code, reason string) {
455pt.statusOnce.Do(func() {
456pt.failedCode = code
457pt.failedReason = reason
458pt.fail()
459})
460}
461
462func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason string) {
463pt.statusOnce.Do(func() {
464pt.failedCode = code
465pt.failedReason = reason
466
467metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeInit).Add(1)
468
469pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
470pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
471pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
472pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
473
474close(pt.failCh)
475pt.broker.Stop()
476pt.span.End()
477pt.pieceDownloadCancel()
478if pt.pieceTaskSyncManager != nil {
479pt.pieceTaskSyncManager.cancel()
480}
481})
482}
483
484// only use when receive back source code from scheduler
485func (pt *peerTaskConductor) markBackSource() {
486pt.needBackSource.Store(true)
487}
488
489// only use when legacy get piece from peers schedule timeout
490func (pt *peerTaskConductor) forceBackSource() {
491pt.needBackSource.Store(true)
492pt.backSource()
493}
494
495func (pt *peerTaskConductor) backSource() {
496// cancel all piece download
497pt.pieceDownloadCancel()
498// cancel all sync pieces
499if pt.pieceTaskSyncManager != nil {
500pt.pieceTaskSyncManager.cancel()
501}
502
503ctx, span := tracer.Start(pt.ctx, config.SpanBackSource)
504pt.SetContentLength(-1)
505err := pt.PieceManager.DownloadSource(ctx, pt, pt.request, pt.rg)
506if err != nil {
507pt.Errorf("download from source error: %s", err)
508span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
509span.RecordError(err)
510if isBackSourceError(err) {
511pt.cancel(commonv1.Code_ClientBackSourceError, err.Error())
512} else {
513pt.cancel(commonv1.Code_ClientError, err.Error())
514}
515span.End()
516return
517}
518pt.Done()
519pt.Infof("download from source ok")
520span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
521span.End()
522return
523}
524
525func (pt *peerTaskConductor) pullPieces() {
526if pt.needBackSource.Load() {
527pt.backSource()
528return
529}
530switch pt.sizeScope {
531case commonv1.SizeScope_EMPTY:
532pt.storeEmptyPeerTask()
533case commonv1.SizeScope_TINY:
534pt.storeTinyPeerTask()
535case commonv1.SizeScope_SMALL:
536pt.pullSinglePiece()
537case commonv1.SizeScope_NORMAL:
538pt.pullPiecesWithP2P()
539default:
540pt.cancel(commonv1.Code_ClientError, fmt.Sprintf("unknown size scope: %d", pt.sizeScope))
541}
542}
543
544func (pt *peerTaskConductor) pullPiecesWithP2P() {
545var (
546// keep same size with pt.failedPieceCh for avoiding deadlock
547pieceRequestQueue = NewPieceDispatcher(config.DefaultPieceDispatcherRandomRatio, pt.Log())
548)
549ctx, cancel := context.WithCancel(pt.ctx)
550
551pt.pieceTaskSyncManager = &pieceTaskSyncManager{
552ctx: ctx,
553ctxCancel: cancel,
554peerTaskConductor: pt,
555pieceRequestQueue: pieceRequestQueue,
556workers: map[string]*pieceTaskSynchronizer{},
557}
558pt.receivePeerPacket(pieceRequestQueue)
559}
560
561func (pt *peerTaskConductor) storeEmptyPeerTask() {
562pt.SetContentLength(0)
563pt.SetTotalPieces(0)
564ctx := pt.ctx
565var err error
566storageDriver, err := pt.StorageManager.RegisterTask(ctx,
567&storage.RegisterTaskRequest{
568PeerTaskMetadata: storage.PeerTaskMetadata{
569PeerID: pt.peerID,
570TaskID: pt.taskID,
571},
572DesiredLocation: "",
573ContentLength: 0,
574TotalPieces: 0,
575})
576pt.storage = storageDriver
577if err != nil {
578pt.Errorf("register tiny data storage failed: %s", err)
579pt.cancel(commonv1.Code_ClientError, err.Error())
580return
581}
582
583if err = pt.UpdateStorage(); err != nil {
584pt.Errorf("update tiny data storage failed: %s", err)
585pt.cancel(commonv1.Code_ClientError, err.Error())
586return
587}
588pt.Debug("store empty metadata")
589pt.Done()
590}
591
592func (pt *peerTaskConductor) storeTinyPeerTask() {
593contentLength := int64(len(pt.tinyData.Content))
594pt.SetContentLength(contentLength)
595pt.SetTotalPieces(1)
596ctx := pt.ctx
597var err error
598storageDriver, err := pt.StorageManager.RegisterTask(ctx,
599&storage.RegisterTaskRequest{
600PeerTaskMetadata: storage.PeerTaskMetadata{
601PeerID: pt.tinyData.PeerID,
602TaskID: pt.tinyData.TaskID,
603},
604DesiredLocation: "",
605ContentLength: contentLength,
606TotalPieces: 1,
607// TODO check digest
608})
609pt.storage = storageDriver
610if err != nil {
611pt.Errorf("register tiny data storage failed: %s", err)
612pt.cancel(commonv1.Code_ClientError, err.Error())
613return
614}
615n, err := pt.GetStorage().WritePiece(ctx,
616&storage.WritePieceRequest{
617PeerTaskMetadata: storage.PeerTaskMetadata{
618PeerID: pt.tinyData.PeerID,
619TaskID: pt.tinyData.TaskID,
620},
621PieceMetadata: storage.PieceMetadata{
622Num: 0,
623Md5: "",
624Offset: 0,
625Range: nethttp.Range{
626Start: 0,
627Length: contentLength,
628},
629Style: 0,
630},
631UnknownLength: false,
632Reader: bytes.NewBuffer(pt.tinyData.Content),
633NeedGenMetadata: func(n int64) (int32, int64, bool) {
634return 1, contentLength, true
635},
636})
637if err != nil {
638pt.Errorf("write tiny data storage failed: %s", err)
639pt.cancel(commonv1.Code_ClientError, err.Error())
640return
641}
642if n != contentLength {
643pt.Errorf("write tiny data storage failed, want: %d, wrote: %d", contentLength, n)
644pt.cancel(commonv1.Code_ClientError, err.Error())
645return
646}
647
648err = pt.UpdateStorage()
649if err != nil {
650pt.Errorf("update tiny data storage failed: %s", err)
651pt.cancel(commonv1.Code_ClientError, err.Error())
652return
653}
654
655pt.Debugf("store tiny data, len: %d", contentLength)
656pt.PublishPieceInfo(0, uint32(contentLength))
657}
658
659func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue PieceDispatcher) {
660var (
661lastNotReadyPiece int32 = 0
662peerPacket *schedulerv1.PeerPacket
663err error
664firstPacketReceived bool
665firstPacketDone = make(chan bool)
666)
667// only record first schedule result
668// other schedule result will record as an event in peer task span
669_, firstPeerSpan := tracer.Start(pt.ctx, config.SpanFirstSchedule)
670defer func() {
671if !firstPacketReceived {
672firstPeerSpan.End()
673}
674if pt.needBackSource.Load() {
675return
676}
677select {
678case <-pt.successCh:
679case <-pt.failCh:
680default:
681pt.Errorf("receivePeerPacket exit, but peer task not success or fail")
682pt.Fail()
683}
684}()
685
686go pt.waitFirstPeerPacket(firstPacketDone)
687loop:
688for {
689select {
690case <-pt.successCh:
691pt.Infof("peer task success, stop wait peer packet from scheduler")
692break loop
693case <-pt.failCh:
694pt.Infof("peer task fail, stop wait peer packet from scheduler")
695break loop
696default:
697}
698
699peerPacket, err = pt.peerPacketStream.Recv()
700if err == io.EOF {
701pt.Debugf("peerPacketStream closed")
702break loop
703}
704if err != nil {
705// some errors, like commonv1.Code_SchedReregister, after reregister success,
706// we can continue to receive peer packet from the new scheduler
707cont := pt.confirmReceivePeerPacketError(err)
708if cont {
709continue
710}
711if !firstPacketReceived {
712firstPeerSpan.RecordError(err)
713}
714break loop
715}
716
717pt.Debugf("receive peerPacket %v", peerPacket)
718if peerPacket.Code != commonv1.Code_Success {
719if peerPacket.Code == commonv1.Code_SchedNeedBackSource {
720// fix back source directly, then waitFirstPeerPacket timeout
721if !firstPacketReceived {
722close(firstPacketDone)
723}
724pt.forceBackSource()
725pt.Infof("receive back source code")
726return
727}
728pt.Errorf("receive peer packet with error: %d", peerPacket.Code)
729if pt.isExitPeerPacketCode(peerPacket) {
730pt.Errorf(pt.failedReason)
731pt.cancel(pt.failedCode, pt.failedReason)
732if !firstPacketReceived {
733firstPeerSpan.RecordError(fmt.Errorf(pt.failedReason))
734}
735pt.span.AddEvent("receive exit peer packet",
736trace.WithAttributes(config.AttributePeerPacketCode.Int(int(peerPacket.Code))))
737pt.span.RecordError(fmt.Errorf(pt.failedReason))
738break
739} else {
740pt.span.AddEvent("receive not success peer packet",
741trace.WithAttributes(config.AttributePeerPacketCode.Int(int(peerPacket.Code))))
742}
743continue
744}
745
746if peerPacket.MainPeer == nil && peerPacket.CandidatePeers == nil {
747pt.Warnf("scheduler client send a peerPacket with empty peers")
748continue
749}
750pt.Infof("receive new peer packet, main peer: %s", peerPacket.MainPeer.PeerId)
751pt.span.AddEvent("receive new peer packet",
752trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)))
753
754if !firstPacketReceived {
755pt.initDownloadPieceWorkers(pieceRequestQueue)
756firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
757firstPeerSpan.End()
758}
759
760lastNotReadyPiece = pt.updateSynchronizers(lastNotReadyPiece, peerPacket)
761if !firstPacketReceived {
762// trigger legacy get piece once to avoid first schedule timeout
763firstPacketReceived = true
764close(firstPacketDone)
765}
766}
767
768// double check to avoid waitFirstPeerPacket timeout
769if !firstPacketReceived {
770close(firstPacketDone)
771}
772}
773
774// updateSynchronizers will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket
775func (pt *peerTaskConductor) updateSynchronizers(lastNum int32, p *schedulerv1.PeerPacket) int32 {
776desiredPiece, ok := pt.getNextNotReadyPieceNum(lastNum)
777if !ok {
778pt.Infof("all pieces is ready, peer task completed, skip to synchronize")
779p.MainPeer = nil
780p.CandidatePeers = nil
781return desiredPiece
782}
783var peers = []*schedulerv1.PeerPacket_DestPeer{p.MainPeer}
784peers = append(peers, p.CandidatePeers...)
785
786pt.pieceTaskSyncManager.syncPeers(peers, desiredPiece)
787return desiredPiece
788}
789
790/*
791When one scheduler goes away(force killed or broken tcp connection) before the peer task done, we will receive the following error message:
792
793receive peer packet failed: rpc error: code = Unavailable desc = closing transport due to: connection error: desc = "error reading from server: EOF", received prior goaway: code: NO_ERROR, debug data: "graceful_stop"
794
795The underlay error is "connection error: desc = \"error reading from server: EOF\"", the grpc code is codes.Unavailable.
796
797refer grpc-go link:
798https://github.com/grpc/grpc-go/blob/v1.60.1/test/goaway_test.go#L118
799https://github.com/grpc/grpc-go/blob/v1.60.1/internal/transport/http2_client.go#L987
800*/
801func isSchedulerUnavailable(err error) bool {
802return status.Code(err) == codes.Unavailable
803}
804
805func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool) {
806select {
807case <-pt.successCh:
808return false
809case <-pt.failCh:
810return false
811default:
812}
813var (
814failedCode = commonv1.Code_UnknownError
815failedReason string
816)
817// extract DfError for grpc status
818de, ok := dferrors.IsGRPCDfError(err)
819if ok {
820switch de.Code {
821case commonv1.Code_SchedNeedBackSource:
822pt.forceBackSource()
823pt.Infof("receive back source code")
824return false
825case commonv1.Code_SchedReregister:
826pt.Infof("receive reregister code")
827regErr := pt.register()
828if regErr == nil {
829pt.Infof("reregister ok")
830return true
831}
832pt.Errorf("reregister to scheduler error: %s", regErr)
833fallthrough
834default:
835failedCode = de.Code
836failedReason = de.Message
837pt.Errorf("receive peer packet failed: %s", pt.failedReason)
838}
839} else {
840pt.Errorf("receive peer packet failed: %s", err)
841if isSchedulerUnavailable(err) {
842regErr := pt.register()
843if regErr == nil {
844pt.Infof("reregister ok")
845return true
846}
847pt.Errorf("reregister to scheduler error: %s", regErr)
848}
849}
850pt.cancel(failedCode, failedReason)
851return false
852}
853
854func (pt *peerTaskConductor) isExitPeerPacketCode(pp *schedulerv1.PeerPacket) bool {
855switch pp.Code {
856case commonv1.Code_ResourceLacked, commonv1.Code_BadRequest,
857commonv1.Code_PeerTaskNotFound, commonv1.Code_UnknownError, commonv1.Code_RequestTimeOut:
858// 1xxx
859pt.failedCode = pp.Code
860pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
861return true
862case commonv1.Code_SchedError, commonv1.Code_SchedTaskStatusError, commonv1.Code_SchedPeerNotFound, commonv1.Code_SchedForbidden:
863// 5xxx
864pt.failedCode = pp.Code
865pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
866return true
867case commonv1.Code_SchedPeerGone:
868pt.failedReason = reasonPeerGoneFromScheduler
869pt.failedCode = commonv1.Code_SchedPeerGone
870return true
871case commonv1.Code_CDNTaskRegistryFail:
872// 6xxx
873pt.failedCode = pp.Code
874pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
875return true
876case commonv1.Code_BackToSourceAborted:
877st := status.Newf(codes.Aborted, "source response is not valid")
878st, err := st.WithDetails(pp.GetSourceError())
879if err != nil {
880pt.Errorf("convert source error details error: %s", err.Error())
881return false
882}
883
884pt.sourceErrorStatus = st
885return true
886}
887return false
888}
889
890func (pt *peerTaskConductor) pullSinglePiece() {
891pt.Infof("single piece, dest peer id: %s, piece num: %d, size: %d",
892pt.singlePiece.DstPid, pt.singlePiece.PieceInfo.PieceNum, pt.singlePiece.PieceInfo.RangeSize)
893
894ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, pt.singlePiece.PieceInfo.PieceNum))
895span.SetAttributes(config.AttributePiece.Int(int(pt.singlePiece.PieceInfo.PieceNum)))
896
897pt.SetContentLength(int64(pt.singlePiece.PieceInfo.RangeSize))
898pt.SetTotalPieces(1)
899pt.SetPieceMd5Sign(digest.SHA256FromStrings(pt.singlePiece.PieceInfo.PieceMd5))
900
901request := &DownloadPieceRequest{
902storage: pt.GetStorage(),
903piece: pt.singlePiece.PieceInfo,
904log: pt.Log(),
905TaskID: pt.GetTaskID(),
906PeerID: pt.GetPeerID(),
907DstPid: pt.singlePiece.DstPid,
908DstAddr: pt.singlePiece.DstAddr,
909}
910
911if result, err := pt.PieceManager.DownloadPiece(ctx, request); err == nil {
912pt.reportSuccessResult(request, result)
913pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
914
915span.SetAttributes(config.AttributePieceSuccess.Bool(true))
916span.End()
917pt.Infof("single piece download success")
918} else {
919// fallback to download from other peers
920span.RecordError(err)
921span.SetAttributes(config.AttributePieceSuccess.Bool(false))
922span.End()
923
924pt.Warnf("single piece download failed, switch to download from other peers")
925pt.ReportPieceResult(request, result, err)
926
927pt.pullPiecesWithP2P()
928}
929}
930
931func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
932// update total piece
933var metadataChanged bool
934if piecePacket.TotalPiece > pt.GetTotalPieces() {
935metadataChanged = true
936pt.SetTotalPieces(piecePacket.TotalPiece)
937pt.Debugf("update total piece count: %d, dst peer %s", piecePacket.TotalPiece, piecePacket.DstPid)
938}
939
940// update digest
941if len(piecePacket.PieceMd5Sign) > 0 && len(pt.GetPieceMd5Sign()) == 0 {
942metadataChanged = true
943pt.SetPieceMd5Sign(piecePacket.PieceMd5Sign)
944pt.Debugf("update digest: %s, dst peer %s", piecePacket.PieceMd5Sign, piecePacket.DstPid)
945}
946
947// update content length
948if piecePacket.ContentLength > -1 && pt.GetContentLength() == -1 {
949metadataChanged = true
950pt.SetContentLength(piecePacket.ContentLength)
951pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength))
952pt.Debugf("update content length: %d, dst peer %s", piecePacket.ContentLength, piecePacket.DstPid)
953} else if piecePacket.ContentLength > -1 && piecePacket.ContentLength != pt.GetContentLength() {
954// corrupt data check
955reason := fmt.Sprintf("corrupt data - content length did not match, current: %d, from piece packet: %d",
956pt.GetContentLength(), piecePacket.ContentLength)
957pt.Errorf(reason)
958pt.cancel(commonv1.Code_ClientError, reason)
959return
960}
961
962if piecePacket.ExtendAttribute != nil && len(piecePacket.ExtendAttribute.Header) > 0 && pt.GetHeader() == nil {
963metadataChanged = true
964pt.SetHeader(piecePacket.ExtendAttribute.Header)
965pt.Debugf("update response header: %#v, dst peer %s", piecePacket.ExtendAttribute.Header, piecePacket.DstPid)
966}
967
968if metadataChanged {
969err := pt.UpdateStorage()
970if err != nil {
971pt.Errorf("update storage error: %s", err)
972}
973}
974}
975
976func (pt *peerTaskConductor) initDownloadPieceWorkers(pieceRequestQueue PieceDispatcher) {
977count := 4
978for i := int32(0); i < int32(count); i++ {
979go pt.downloadPieceWorker(i, pieceRequestQueue)
980}
981}
982
983func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) {
984// wait first available peer
985select {
986case <-pt.successCh:
987pt.Infof("peer task succeed, no need to wait first peer")
988return
989case <-pt.failCh:
990pt.Warnf("peer task failed, no need to wait first peer")
991return
992case <-done:
993pt.Debugf("first peer packet received")
994return
995case <-time.After(pt.SchedulerOption.ScheduleTimeout.Duration):
996if pt.SchedulerOption.DisableAutoBackSource {
997pt.cancel(commonv1.Code_ClientScheduleTimeout, reasonBackSourceDisabled)
998err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
999pt.span.RecordError(err)
1000pt.Errorf(err.Error())
1001return
1002}
1003pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
1004pt.span.AddEvent("back source due to schedule timeout")
1005pt.forceBackSource()
1006return
1007}
1008}
1009
1010func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests PieceDispatcher) {
1011for {
1012request, err := requests.Get()
1013if errors.Is(err, ErrNoValidPieceTemporarily) {
1014continue
1015}
1016if err != nil {
1017pt.Infof("piece download queue cancelled, peer download worker #%d exit, err: %v", id, err)
1018return
1019}
1020pt.readyPiecesLock.RLock()
1021if pt.readyPieces.IsSet(request.piece.PieceNum) {
1022pt.readyPiecesLock.RUnlock()
1023pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum)
1024continue
1025}
1026pt.readyPiecesLock.RUnlock()
1027result := pt.downloadPiece(id, request)
1028if result != nil {
1029requests.Report(result)
1030}
1031select {
1032case <-pt.pieceDownloadCtx.Done():
1033pt.Infof("piece download cancelled, peer download worker #%d exit", id)
1034return
1035case <-pt.successCh:
1036pt.Infof("peer task success, peer download worker #%d exit", id)
1037return
1038case <-pt.failCh:
1039pt.Errorf("peer task fail, peer download worker #%d exit", id)
1040return
1041default:
1042}
1043}
1044}
1045
1046func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) *DownloadPieceResult {
1047// only downloading piece in one worker at same time
1048pt.runningPiecesLock.Lock()
1049if pt.runningPieces.IsSet(request.piece.PieceNum) {
1050pt.runningPiecesLock.Unlock()
1051pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum)
1052// TODO save to queue for failed pieces
1053return nil
1054}
1055pt.runningPieces.Set(request.piece.PieceNum)
1056pt.runningPiecesLock.Unlock()
1057
1058defer func() {
1059pt.runningPiecesLock.Lock()
1060pt.runningPieces.Clean(request.piece.PieceNum)
1061pt.runningPiecesLock.Unlock()
1062}()
1063
1064ctx, span := tracer.Start(pt.pieceDownloadCtx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum))
1065span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum)))
1066span.SetAttributes(config.AttributePieceWorker.Int(int(workerID)))
1067
1068// wait limit
1069if pt.limiter != nil && !pt.waitLimit(ctx, request) {
1070span.SetAttributes(config.AttributePieceSuccess.Bool(false))
1071span.End()
1072return nil
1073}
1074
1075pt.Debugf("peer download worker #%d receive piece task, "+
1076"dest peer id: %s, piece num: %d, range start: %d, range size: %d",
1077workerID, request.DstPid, request.piece.PieceNum, request.piece.RangeStart, request.piece.RangeSize)
1078// download piece
1079// result is always not nil, PieceManager will report begin and end time
1080result, err := pt.PieceManager.DownloadPiece(ctx, request)
1081if err != nil {
1082pt.ReportPieceResult(request, result, err)
1083span.SetAttributes(config.AttributePieceSuccess.Bool(false))
1084span.End()
1085if pt.needBackSource.Load() {
1086pt.Infof("switch to back source, skip send failed piece")
1087return result
1088}
1089attempt, success := pt.pieceTaskSyncManager.acquire(
1090&commonv1.PieceTaskRequest{
1091Limit: 1,
1092TaskId: pt.taskID,
1093SrcPid: pt.peerID,
1094StartNum: uint32(request.piece.PieceNum),
1095})
1096pt.Infof("send failed piece %d to remote, attempt: %d, success: %d",
1097request.piece.PieceNum, attempt, success)
1098return result
1099}
1100// broadcast success piece
1101pt.reportSuccessResult(request, result)
1102pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
1103
1104span.SetAttributes(config.AttributePieceSuccess.Bool(true))
1105span.End()
1106return result
1107}
1108
1109func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPieceRequest) bool {
1110_, waitSpan := tracer.Start(ctx, config.SpanWaitPieceLimit)
1111pt.trafficShaper.Record(pt.peerTaskManager.getRunningTaskKey(request.TaskID, request.PeerID), int(request.piece.RangeSize))
1112err := pt.limiter.WaitN(pt.ctx, int(request.piece.RangeSize))
1113if err == nil {
1114waitSpan.End()
1115return true
1116}
1117
1118pt.Errorf("request limiter error: %s", err)
1119waitSpan.RecordError(err)
1120waitSpan.End()
1121
1122// send error piece result
1123sendError := pt.sendPieceResult(&schedulerv1.PieceResult{
1124TaskId: pt.GetTaskID(),
1125SrcPid: pt.GetPeerID(),
1126DstPid: request.DstPid,
1127PieceInfo: request.piece,
1128Success: false,
1129Code: commonv1.Code_ClientRequestLimitFail,
1130FinishedCount: 0, // update by peer task
1131})
1132if sendError != nil {
1133pt.Errorf("report piece result failed %s", err)
1134}
1135
1136pt.cancel(commonv1.Code_ClientRequestLimitFail, err.Error())
1137return false
1138}
1139
1140func (pt *peerTaskConductor) isCompleted() bool {
1141if pt.completedLength.Load() == pt.GetContentLength() {
1142pt.Infof("completed content length: %d", pt.completedLength.Load())
1143return true
1144}
1145
1146// corrupt data check and avoid hang for mismatch completed length
1147if pt.readyPieces.Settled() == pt.totalPiece.Load() {
1148msg := fmt.Sprintf("corrupt data - ready piece count %d seems finished, but completed length %d is not match with content length: %d",
1149pt.totalPiece.Load(), pt.completedLength.Load(), pt.GetContentLength())
1150pt.Errorf(msg)
1151pt.cancel(commonv1.Code_ClientError, msg)
1152return true
1153}
1154
1155return false
1156}
1157
1158// for legacy peers only
1159func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) {
1160if pt.isCompleted() {
1161return -1, false
1162}
1163i := cur
1164// try to find next not requested piece
1165pt.requestedPiecesLock.RLock()
1166defer pt.requestedPiecesLock.RUnlock()
1167
1168for ; pt.requestedPieces.IsSet(i); i++ {
1169}
1170totalPiece := pt.GetTotalPieces()
1171if totalPiece > 0 && i >= totalPiece {
1172// double check, re-search not success or not requested pieces
1173for i = int32(0); pt.requestedPieces.IsSet(i); i++ {
1174}
1175if totalPiece > 0 && i >= totalPiece {
1176return -1, false
1177}
1178}
1179return i, true
1180}
1181
1182func (pt *peerTaskConductor) getNextNotReadyPieceNum(cur int32) (int32, bool) {
1183if pt.isCompleted() {
1184return 0, false
1185}
1186i := cur
1187// try to find next not ready piece
1188pt.readyPiecesLock.RLock()
1189defer pt.readyPiecesLock.RUnlock()
1190
1191for ; pt.readyPieces.IsSet(i); i++ {
1192}
1193totalPiece := pt.GetTotalPieces()
1194if totalPiece > 0 && i >= totalPiece {
1195// double check, re-search
1196for i = int32(0); pt.readyPieces.IsSet(i); i++ {
1197}
1198if totalPiece > 0 && i >= totalPiece {
1199return 0, false
1200}
1201}
1202return i, true
1203}
1204
1205func (pt *peerTaskConductor) recoverFromPanic() {
1206if r := recover(); r != nil {
1207pt.Errorf("recovered from panic %q. Call stack:\n%v", r, string(debug.Stack()))
1208}
1209}
1210
1211func (pt *peerTaskConductor) ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, err error) {
1212if err == nil {
1213pt.reportSuccessResult(request, result)
1214return
1215}
1216code := commonv1.Code_ClientPieceDownloadFail
1217if isConnectionError(err) {
1218code = commonv1.Code_ClientConnectionError
1219} else if isPieceNotFound(err) {
1220code = commonv1.Code_ClientPieceNotFound
1221} else if isBackSourceError(err) {
1222code = commonv1.Code_ClientBackSourceError
1223}
1224pt.reportFailResult(request, result, code)
1225}
1226
1227func (pt *peerTaskConductor) reportSuccessResult(request *DownloadPieceRequest, result *DownloadPieceResult) {
1228metrics.PieceTaskCount.Add(1)
1229_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
1230span.SetAttributes(config.AttributeWritePieceSuccess.Bool(true))
1231
1232err := pt.sendPieceResult(
1233&schedulerv1.PieceResult{
1234TaskId: pt.GetTaskID(),
1235SrcPid: pt.GetPeerID(),
1236DstPid: request.DstPid,
1237PieceInfo: request.piece,
1238BeginTime: uint64(result.BeginTime),
1239EndTime: uint64(result.FinishTime),
1240Success: true,
1241Code: commonv1.Code_Success,
1242FinishedCount: pt.readyPieces.Settled(),
1243// TODO range_start, range_size, piece_md5, piece_offset, piece_style
1244})
1245if err != nil {
1246pt.Errorf("report piece task error: %v", err)
1247span.RecordError(err)
1248}
1249
1250span.End()
1251}
1252
1253func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, result *DownloadPieceResult, code commonv1.Code) {
1254metrics.PieceTaskFailedCount.Add(1)
1255_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
1256span.SetAttributes(config.AttributeWritePieceSuccess.Bool(false))
1257
1258err := pt.sendPieceResult(&schedulerv1.PieceResult{
1259TaskId: pt.GetTaskID(),
1260SrcPid: pt.GetPeerID(),
1261DstPid: request.DstPid,
1262PieceInfo: request.piece,
1263BeginTime: uint64(result.BeginTime),
1264EndTime: uint64(result.FinishTime),
1265Success: false,
1266Code: code,
1267FinishedCount: pt.readyPieces.Settled(),
1268})
1269if err != nil {
1270pt.Errorf("report piece task error: %v", err)
1271}
1272span.End()
1273}
1274
1275func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) {
1276// prepare storage
1277if pt.parent == nil {
1278pt.storage, err = pt.StorageManager.RegisterTask(pt.ctx,
1279&storage.RegisterTaskRequest{
1280PeerTaskMetadata: storage.PeerTaskMetadata{
1281PeerID: pt.GetPeerID(),
1282TaskID: pt.GetTaskID(),
1283},
1284DesiredLocation: desiredLocation,
1285ContentLength: pt.GetContentLength(),
1286TotalPieces: pt.GetTotalPieces(),
1287PieceMd5Sign: pt.GetPieceMd5Sign(),
1288})
1289} else {
1290pt.storage, err = pt.StorageManager.RegisterSubTask(pt.ctx,
1291&storage.RegisterSubTaskRequest{
1292Parent: storage.PeerTaskMetadata{
1293PeerID: pt.parent.GetPeerID(),
1294TaskID: pt.parent.GetTaskID(),
1295},
1296SubTask: storage.PeerTaskMetadata{
1297PeerID: pt.GetPeerID(),
1298TaskID: pt.GetTaskID(),
1299},
1300Range: pt.rg,
1301})
1302}
1303if err != nil {
1304pt.Log().Errorf("register task to storage manager failed: %s", err)
1305}
1306return err
1307}
1308
1309func (pt *peerTaskConductor) UpdateStorage() error {
1310// update storage
1311err := pt.GetStorage().UpdateTask(pt.ctx,
1312&storage.UpdateTaskRequest{
1313PeerTaskMetadata: storage.PeerTaskMetadata{
1314PeerID: pt.GetPeerID(),
1315TaskID: pt.GetTaskID(),
1316},
1317ContentLength: pt.GetContentLength(),
1318TotalPieces: pt.GetTotalPieces(),
1319PieceMd5Sign: pt.GetPieceMd5Sign(),
1320Header: pt.GetHeader(),
1321})
1322if err != nil {
1323pt.Log().Errorf("update task to storage manager failed: %s", err)
1324return err
1325}
1326
1327return nil
1328}
1329
1330func (pt *peerTaskConductor) Done() {
1331pt.statusOnce.Do(pt.done)
1332}
1333
1334func (pt *peerTaskConductor) done() {
1335defer func() {
1336pt.broker.Stop()
1337pt.span.End()
1338pt.pieceDownloadCancel()
1339if pt.pieceTaskSyncManager != nil {
1340pt.pieceTaskSyncManager.cancel()
1341}
1342pt.ctxCancel()
1343}()
1344var (
1345cost = time.Since(pt.startTime).Milliseconds()
1346success = true
1347code = commonv1.Code_Success
1348)
1349pt.Log().Infof("peer task done, cost: %dms", cost)
1350// TODO merge error handle
1351// update storage metadata
1352if err := pt.UpdateStorage(); err == nil {
1353// validate digest
1354if err = pt.Validate(); err == nil {
1355close(pt.successCh)
1356pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
1357} else {
1358close(pt.failCh)
1359success = false
1360code = commonv1.Code_ClientError
1361pt.failedCode = commonv1.Code_ClientError
1362pt.failedReason = err.Error()
1363
1364pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1365pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1366pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1367pt.Errorf("validate digest failed: %s", err)
1368metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1369}
1370} else {
1371close(pt.failCh)
1372success = false
1373code = commonv1.Code_ClientError
1374pt.failedCode = commonv1.Code_ClientError
1375pt.failedReason = err.Error()
1376
1377pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1378pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1379pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1380pt.Errorf("update storage error: %v", err)
1381metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1382}
1383
1384pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
1385peerResultCtx, peerResultSpan := tracer.Start(pt.ctx, config.SpanReportPeerResult)
1386defer peerResultSpan.End()
1387
1388// Send EOF piece result to scheduler.
1389err := pt.sendPieceResult(
1390&schedulerv1.PieceResult{
1391TaskId: pt.taskID,
1392SrcPid: pt.peerID,
1393FinishedCount: pt.readyPieces.Settled(),
1394PieceInfo: &commonv1.PieceInfo{
1395PieceNum: common.EndOfPiece,
1396},
1397})
1398pt.Debugf("peer task finished, end piece result sent result: %v", err)
1399
1400err = pt.peerPacketStream.CloseSend()
1401pt.Debugf("close stream result: %v", err)
1402
1403err = pt.schedulerClient.ReportPeerResult(
1404peerResultCtx,
1405&schedulerv1.PeerResult{
1406TaskId: pt.GetTaskID(),
1407PeerId: pt.GetPeerID(),
1408SrcIp: pt.PeerHost.Ip,
1409Idc: pt.PeerHost.Idc,
1410Url: pt.request.Url,
1411ContentLength: pt.GetContentLength(),
1412Traffic: pt.GetTraffic(),
1413TotalPieceCount: pt.GetTotalPieces(),
1414Cost: uint32(cost),
1415Success: success,
1416Code: code,
1417})
1418if err != nil {
1419peerResultSpan.RecordError(err)
1420pt.Errorf("step 3: report successful peer result, error: %v", err)
1421} else {
1422pt.Infof("step 3: report successful peer result ok")
1423}
1424}
1425
1426func (pt *peerTaskConductor) Fail() {
1427pt.statusOnce.Do(pt.fail)
1428}
1429
1430func (pt *peerTaskConductor) fail() {
1431if pt.failedCode == commonv1.Code_ClientBackSourceError {
1432metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeBackSource).Add(1)
1433} else {
1434metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1435}
1436defer func() {
1437close(pt.failCh)
1438pt.broker.Stop()
1439pt.span.End()
1440pt.pieceDownloadCancel()
1441if pt.pieceTaskSyncManager != nil {
1442pt.pieceTaskSyncManager.cancel()
1443}
1444// mark storage to reclaim
1445_ = pt.StorageManager.UnregisterTask(
1446pt.ctx,
1447storage.CommonTaskRequest{
1448PeerID: pt.peerID,
1449TaskID: pt.taskID,
1450})
1451pt.ctxCancel()
1452}()
1453pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
1454var end = time.Now()
1455pt.Log().Errorf("peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason)
1456
1457// Send EOF piece result to scheduler.
1458err := pt.sendPieceResult(&schedulerv1.PieceResult{
1459TaskId: pt.taskID,
1460SrcPid: pt.peerID,
1461FinishedCount: pt.readyPieces.Settled(),
1462PieceInfo: &commonv1.PieceInfo{
1463PieceNum: common.EndOfPiece,
1464},
1465})
1466pt.Debugf("end piece result sent: %v, peer task finished", err)
1467
1468err = pt.peerPacketStream.CloseSend()
1469pt.Debugf("close stream result: %v", err)
1470
1471ctx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(pt.ctx))
1472peerResultCtx, peerResultSpan := tracer.Start(ctx, config.SpanReportPeerResult)
1473defer peerResultSpan.End()
1474
1475var sourceError *errordetailsv1.SourceError
1476if pt.sourceErrorStatus != nil {
1477for _, detail := range pt.sourceErrorStatus.Details() {
1478switch d := detail.(type) {
1479case *errordetailsv1.SourceError:
1480sourceError = d
1481}
1482}
1483}
1484peerResult := &schedulerv1.PeerResult{
1485TaskId: pt.GetTaskID(),
1486PeerId: pt.GetPeerID(),
1487SrcIp: pt.PeerHost.Ip,
1488Idc: pt.PeerHost.Idc,
1489Url: pt.request.Url,
1490ContentLength: pt.GetContentLength(),
1491Traffic: pt.GetTraffic(),
1492TotalPieceCount: pt.GetTotalPieces(),
1493Cost: uint32(end.Sub(pt.startTime).Milliseconds()),
1494Success: false,
1495Code: pt.failedCode,
1496}
1497if sourceError != nil {
1498peerResult.Errordetails = &schedulerv1.PeerResult_SourceError{
1499SourceError: sourceError,
1500}
1501}
1502err = pt.schedulerClient.ReportPeerResult(peerResultCtx, peerResult)
1503if err != nil {
1504peerResultSpan.RecordError(err)
1505pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
1506} else {
1507pt.Log().Infof("step 3: report fail peer result ok")
1508}
1509
1510pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1511pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1512pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1513}
1514
1515// Validate stores metadata and validates digest
1516func (pt *peerTaskConductor) Validate() error {
1517err := pt.GetStorage().Store(pt.ctx,
1518&storage.StoreRequest{
1519CommonTaskRequest: storage.CommonTaskRequest{
1520PeerID: pt.peerID,
1521TaskID: pt.taskID,
1522},
1523MetadataOnly: true,
1524TotalPieces: pt.GetTotalPieces(),
1525})
1526if err != nil {
1527pt.Errorf("store metadata error: %s", err)
1528return err
1529}
1530
1531if !pt.CalculateDigest {
1532return nil
1533}
1534err = pt.GetStorage().ValidateDigest(
1535&storage.PeerTaskMetadata{
1536PeerID: pt.GetPeerID(),
1537TaskID: pt.GetTaskID(),
1538})
1539if err != nil {
1540pt.Errorf("validate digest error: %s", err)
1541return err
1542}
1543pt.Debugf("validate digest ok")
1544
1545return err
1546}
1547
1548func (pt *peerTaskConductor) PublishPieceInfo(pieceNum int32, size uint32) {
1549// mark piece ready
1550pt.readyPiecesLock.Lock()
1551if pt.readyPieces.IsSet(pieceNum) {
1552pt.readyPiecesLock.Unlock()
1553pt.Warnf("piece %d is already reported, skipped", pieceNum)
1554return
1555}
1556// mark piece processed
1557pt.readyPieces.Set(pieceNum)
1558pt.completedLength.Add(int64(size))
1559pt.readyPiecesLock.Unlock()
1560
1561finished := pt.isCompleted()
1562if finished {
1563pt.Done()
1564}
1565pt.broker.Publish(
1566&PieceInfo{
1567Num: pieceNum,
1568Finished: finished,
1569})
1570}
1571
1572func (pt *peerTaskConductor) sendPieceResult(pr *schedulerv1.PieceResult) error {
1573pt.sendPieceResultLock.Lock()
1574err := pt.peerPacketStream.Send(pr)
1575pt.sendPieceResultLock.Unlock()
1576return err
1577}
1578
1579func (pt *peerTaskConductor) getFailedError() error {
1580if pt.sourceErrorStatus != nil {
1581return pt.sourceErrorStatus.Err()
1582}
1583return fmt.Errorf("peer task failed: %d/%s", pt.failedCode, pt.failedReason)
1584}
1585