Dragonfly2

Форк
0
/
peertask_conductor.go 
1584 строки · 47.2 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"bytes"
21
	"context"
22
	"errors"
23
	"fmt"
24
	"io"
25
	"runtime/debug"
26
	"sync"
27
	"time"
28

29
	semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
30
	"go.opentelemetry.io/otel/trace"
31
	"go.uber.org/atomic"
32
	"golang.org/x/time/rate"
33
	"google.golang.org/grpc/codes"
34
	"google.golang.org/grpc/credentials"
35
	"google.golang.org/grpc/status"
36

37
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
38
	errordetailsv1 "d7y.io/api/v2/pkg/apis/errordetails/v1"
39
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
40

41
	"d7y.io/dragonfly/v2/client/config"
42
	"d7y.io/dragonfly/v2/client/daemon/metrics"
43
	"d7y.io/dragonfly/v2/client/daemon/storage"
44
	"d7y.io/dragonfly/v2/internal/dferrors"
45
	logger "d7y.io/dragonfly/v2/internal/dflog"
46
	"d7y.io/dragonfly/v2/pkg/digest"
47
	"d7y.io/dragonfly/v2/pkg/idgen"
48
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
49
	"d7y.io/dragonfly/v2/pkg/rpc/common"
50
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
51
	"d7y.io/dragonfly/v2/pkg/source"
52
)
53

54
const (
55
	// TODO implement peer task health check
56
	// reasonContextCanceled       = "context canceled"
57
	// reasonReScheduleTimeout     = "wait more available peers from scheduler timeout"
58
	reasonScheduleTimeout       = "wait first peer packet from scheduler timeout"
59
	reasonPeerGoneFromScheduler = "scheduler says client should disconnect"
60
	reasonBackSourceDisabled    = "download from source disabled"
61

62
	failedReasonNotSet = "unknown"
63
)
64

65
var _ Task = (*peerTaskConductor)(nil)
66

67
// peerTaskConductor will fetch all pieces from other peers and send pieces info to broker
68
type peerTaskConductor struct {
69
	TaskOption
70
	*logger.SugaredLoggerOnWith
71

72
	// ctx is with span info for tracing
73
	// we use successCh and failCh mark task success or fail
74
	ctx       context.Context
75
	ctxCancel context.CancelFunc
76
	// piece download uses this context
77
	pieceDownloadCtx context.Context
78
	// when back source, cancel all piece download action
79
	pieceDownloadCancel context.CancelFunc
80

81
	// request is the original PeerTaskRequest
82
	request *schedulerv1.PeerTaskRequest
83

84
	// needBackSource indicates downloading resource from instead of other peers
85
	needBackSource *atomic.Bool
86
	seed           bool
87

88
	peerTaskManager *peerTaskManager
89

90
	storage storage.TaskStorageDriver
91

92
	schedulerClient schedulerclient.V1
93

94
	// peer task meta info
95
	peerID          string
96
	taskID          string
97
	totalPiece      *atomic.Int32
98
	digest          *atomic.String
99
	contentLength   *atomic.Int64
100
	completedLength *atomic.Int64
101
	usedTraffic     *atomic.Uint64
102
	header          atomic.Value
103

104
	broker *pieceBroker
105

106
	sizeScope   commonv1.SizeScope
107
	singlePiece *schedulerv1.SinglePiece
108
	tinyData    *TinyData
109

110
	// peerPacketStream stands schedulerclient.PeerPacketStream from scheduler
111
	peerPacketStream schedulerv1.Scheduler_ReportPieceResultClient
112
	legacyPeerCount  *atomic.Int64
113
	// pieceTaskSyncManager syncs piece task from other peers
114
	pieceTaskSyncManager *pieceTaskSyncManager
115

116
	// same actions must be done only once, like close done channel and so on
117
	statusOnce sync.Once
118
	// done channel will be closed when peer task success
119
	successCh chan struct{}
120
	// fail channel will be closed after peer task fail
121
	failCh chan struct{}
122

123
	// span stands open telemetry trace span
124
	span trace.Span
125

126
	// failedReason will be set when peer task failed
127
	failedReason string
128
	// failedReason will be set when peer task failed
129
	failedCode commonv1.Code
130

131
	// readyPieces stands all downloaded pieces
132
	readyPieces *Bitmap
133
	// lock used by piece result manage, when update readyPieces, lock first
134
	readyPiecesLock sync.RWMutex
135
	// runningPieces stands all downloading pieces
136
	runningPieces *Bitmap
137
	// lock used by piece download worker
138
	runningPiecesLock sync.Mutex
139
	// requestedPieces stands all pieces requested from peers
140
	requestedPieces *Bitmap
141
	// lock used by piece download worker
142
	requestedPiecesLock sync.RWMutex
143
	// lock used by send piece result
144
	sendPieceResultLock sync.Mutex
145
	// trafficShaper used to automatically allocate bandwidth for every peer task
146
	trafficShaper TrafficShaper
147
	// limiter will be used when enable per peer task rate limit
148
	limiter *rate.Limiter
149

150
	startTime time.Time
151

152
	// subtask only
153
	parent *peerTaskConductor
154
	rg     *nethttp.Range
155

156
	sourceErrorStatus *status.Status
157
}
158

159
type TaskOption struct {
160
	// PeerHost info about current PeerHost
161
	PeerHost *schedulerv1.PeerHost
162
	// PieceManager will be used for downloading piece
163
	PieceManager   PieceManager
164
	StorageManager storage.Manager
165
	// schedule options
166
	SchedulerOption config.SchedulerOption
167
	CalculateDigest bool
168
	GRPCCredentials credentials.TransportCredentials
169
	GRPCDialTimeout time.Duration
170
	// WatchdogTimeout > 0 indicates to start watch dog for every single peer task
171
	WatchdogTimeout time.Duration
172
}
173

174
func (ptm *peerTaskManager) newPeerTaskConductor(
175
	ctx context.Context,
176
	request *schedulerv1.PeerTaskRequest,
177
	limit rate.Limit,
178
	parent *peerTaskConductor,
179
	rg *nethttp.Range,
180
	seed bool) *peerTaskConductor {
181
	// use a new context with span info
182
	ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
183
	ctx, span := tracer.Start(ctx, config.SpanPeerTask, trace.WithSpanKind(trace.SpanKindClient))
184
	span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))
185
	span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))
186
	span.SetAttributes(config.AttributePeerID.String(request.PeerId))
187
	span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
188

189
	taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
190
	request.TaskId = taskID
191

192
	// init log with values
193
	var (
194
		log     *logger.SugaredLoggerOnWith
195
		traceID = span.SpanContext().TraceID()
196
	)
197

198
	logKV := []any{
199
		"peer", request.PeerId,
200
		"task", taskID,
201
		"component", "PeerTask",
202
	}
203
	if traceID.IsValid() {
204
		logKV = append(logKV, "trace", traceID.String())
205
	}
206
	log = logger.With(logKV...)
207

208
	span.SetAttributes(config.AttributeTaskID.String(taskID))
209

210
	ctx, cancel := context.WithCancel(ctx)
211
	ptc := &peerTaskConductor{
212
		TaskOption:          ptm.TaskOption,
213
		peerTaskManager:     ptm,
214
		request:             request,
215
		startTime:           time.Now(),
216
		ctx:                 ctx,
217
		ctxCancel:           cancel,
218
		broker:              newPieceBroker(),
219
		peerID:              request.PeerId,
220
		taskID:              taskID,
221
		successCh:           make(chan struct{}),
222
		failCh:              make(chan struct{}),
223
		legacyPeerCount:     atomic.NewInt64(0),
224
		span:                span,
225
		readyPieces:         NewBitmap(),
226
		runningPieces:       NewBitmap(),
227
		requestedPieces:     NewBitmap(),
228
		failedReason:        failedReasonNotSet,
229
		failedCode:          commonv1.Code_UnknownError,
230
		contentLength:       atomic.NewInt64(-1),
231
		totalPiece:          atomic.NewInt32(-1),
232
		digest:              atomic.NewString(""),
233
		trafficShaper:       ptm.trafficShaper,
234
		limiter:             rate.NewLimiter(limit, int(limit)),
235
		completedLength:     atomic.NewInt64(0),
236
		usedTraffic:         atomic.NewUint64(0),
237
		SugaredLoggerOnWith: log,
238
		seed:                seed,
239
		parent:              parent,
240
		rg:                  rg,
241
	}
242

243
	ptc.pieceDownloadCtx, ptc.pieceDownloadCancel = context.WithCancel(ptc.ctx)
244

245
	return ptc
246
}
247

248
// register to scheduler, if error and disable auto back source, return error, otherwise return nil
249
func (pt *peerTaskConductor) register() error {
250
	pt.Debugf("request overview, pid: %s, url: %s, filter: %s, tag: %s, range: %s, digest: %s, header: %#v",
251
		pt.request.PeerId, pt.request.Url, pt.request.UrlMeta.Filter, pt.request.UrlMeta.Tag, pt.request.UrlMeta.Range, pt.request.UrlMeta.Digest, pt.request.UrlMeta.Header)
252
	// trace register
253
	regCtx, cancel := context.WithTimeout(pt.ctx, pt.SchedulerOption.ScheduleTimeout.Duration)
254
	defer cancel()
255
	regCtx, regSpan := tracer.Start(regCtx, config.SpanRegisterTask)
256

257
	var (
258
		needBackSource bool
259
		sizeScope      commonv1.SizeScope
260
		singlePiece    *schedulerv1.SinglePiece
261
		tinyData       *TinyData
262
	)
263

264
	pt.Infof("step 1: peer %s start to register", pt.request.PeerId)
265
	pt.schedulerClient = pt.peerTaskManager.SchedulerClient
266

267
	result, err := pt.schedulerClient.RegisterPeerTask(regCtx, pt.request)
268
	regSpan.RecordError(err)
269
	regSpan.End()
270

271
	if err != nil {
272
		if errors.Is(err, context.DeadlineExceeded) {
273
			pt.Errorf("scheduler did not response in %s", pt.SchedulerOption.ScheduleTimeout.Duration)
274
		}
275
		pt.Errorf("step 1: peer %s register failed: %s", pt.request.PeerId, err)
276
		if pt.SchedulerOption.DisableAutoBackSource {
277
			// when peer register failed, some actions need to do with peerPacketStream
278
			pt.peerPacketStream = &dummyPeerPacketStream{}
279
			pt.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, pt.request.PeerId)
280
			pt.span.RecordError(err)
281
			pt.cancel(commonv1.Code_SchedError, err.Error())
282
			return err
283
		}
284
		needBackSource = true
285
		// can not detect source or scheduler error, create a new dummy scheduler client
286
		pt.schedulerClient = &dummySchedulerClient{}
287
		result = &schedulerv1.RegisterResult{TaskId: pt.taskID}
288
		pt.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, pt.request.PeerId)
289
	} else {
290
		pt.Infof("register task success, SizeScope: %s", commonv1.SizeScope_name[int32(result.SizeScope)])
291
	}
292

293
	var header map[string]string
294
	if !needBackSource {
295
		sizeScope = result.SizeScope
296
		switch result.SizeScope {
297
		case commonv1.SizeScope_NORMAL:
298
			pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("normal"))
299
		case commonv1.SizeScope_SMALL:
300
			pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("small"))
301
			if piece, ok := result.DirectPiece.(*schedulerv1.RegisterResult_SinglePiece); ok {
302
				singlePiece = piece.SinglePiece
303
			}
304
			if result.ExtendAttribute != nil {
305
				header = result.ExtendAttribute.Header
306
			}
307
		case commonv1.SizeScope_TINY:
308
			pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("tiny"))
309
			if piece, ok := result.DirectPiece.(*schedulerv1.RegisterResult_PieceContent); ok {
310
				tinyData = &TinyData{
311
					TaskID:  result.TaskId,
312
					PeerID:  pt.request.PeerId,
313
					Content: piece.PieceContent,
314
				}
315
			} else {
316
				err = errors.New("scheduler return tiny piece but can not parse piece content")
317
				// when peer register failed, some actions need to do with peerPacketStream
318
				pt.peerPacketStream = &dummyPeerPacketStream{}
319
				pt.span.RecordError(err)
320
				pt.Errorf("%s", err)
321
				pt.cancel(commonv1.Code_SchedError, err.Error())
322
				return err
323
			}
324
			if result.ExtendAttribute != nil {
325
				header = result.ExtendAttribute.Header
326
			}
327
		case commonv1.SizeScope_EMPTY:
328
			tinyData = &TinyData{
329
				TaskID:  result.TaskId,
330
				PeerID:  pt.request.PeerId,
331
				Content: []byte{},
332
			}
333
			pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("empty"))
334
			if result.ExtendAttribute != nil {
335
				header = result.ExtendAttribute.Header
336
			}
337
		}
338
	}
339

340
	peerPacketStream, err := pt.schedulerClient.ReportPieceResult(pt.ctx, pt.request)
341
	pt.Infof("step 2: start report piece result")
342
	if err != nil {
343
		// when peer register failed, some actions need to do with peerPacketStream
344
		pt.peerPacketStream = &dummyPeerPacketStream{}
345
		pt.span.RecordError(err)
346
		pt.cancel(commonv1.Code_SchedError, err.Error())
347
		return err
348
	}
349

350
	pt.peerPacketStream = peerPacketStream
351
	pt.sizeScope = sizeScope
352
	pt.singlePiece = singlePiece
353
	pt.tinyData = tinyData
354
	pt.needBackSource = atomic.NewBool(needBackSource)
355

356
	if len(header) > 0 {
357
		pt.SetHeader(header)
358
	}
359
	return nil
360
}
361

362
func (pt *peerTaskConductor) start() error {
363
	// when is seed task, setup back source
364
	if pt.seed {
365
		pt.peerPacketStream = &dummyPeerPacketStream{}
366
		pt.schedulerClient = &dummySchedulerClient{}
367
		pt.sizeScope = commonv1.SizeScope_NORMAL
368
		pt.needBackSource = atomic.NewBool(true)
369
	} else {
370
		// register to scheduler
371
		if err := pt.register(); err != nil {
372
			return err
373
		}
374
	}
375

376
	pt.trafficShaper.AddTask(pt.peerTaskManager.getRunningTaskKey(pt.taskID, pt.peerID), pt)
377
	go pt.broker.Start()
378
	go pt.pullPieces()
379
	return nil
380
}
381

382
func (pt *peerTaskConductor) GetPeerID() string {
383
	return pt.peerID
384
}
385

386
func (pt *peerTaskConductor) GetTaskID() string {
387
	return pt.taskID
388
}
389

390
func (pt *peerTaskConductor) GetStorage() storage.TaskStorageDriver {
391
	return pt.storage
392
}
393

394
func (pt *peerTaskConductor) GetContentLength() int64 {
395
	return pt.contentLength.Load()
396
}
397

398
func (pt *peerTaskConductor) SetContentLength(i int64) {
399
	pt.contentLength.Store(i)
400
}
401

402
func (pt *peerTaskConductor) AddTraffic(n uint64) {
403
	pt.usedTraffic.Add(n)
404
}
405

406
func (pt *peerTaskConductor) GetTraffic() uint64 {
407
	return pt.usedTraffic.Load()
408
}
409

410
func (pt *peerTaskConductor) GetTotalPieces() int32 {
411
	return pt.totalPiece.Load()
412
}
413

414
func (pt *peerTaskConductor) SetTotalPieces(i int32) {
415
	pt.totalPiece.Store(i)
416
}
417

418
func (pt *peerTaskConductor) SetPieceMd5Sign(md5 string) {
419
	pt.digest.Store(md5)
420
}
421

422
func (pt *peerTaskConductor) GetPieceMd5Sign() string {
423
	return pt.digest.Load()
424
}
425

426
func (pt *peerTaskConductor) SetHeader(header map[string]string) {
427
	var hdr = &source.Header{}
428
	for k, v := range header {
429
		hdr.Set(k, v)
430
	}
431
	pt.header.Store(hdr)
432
}
433

434
func (pt *peerTaskConductor) GetHeader() *source.Header {
435
	hdr := pt.header.Load()
436
	if hdr != nil {
437
		return hdr.(*source.Header)
438
	}
439
	return nil
440
}
441

442
func (pt *peerTaskConductor) Context() context.Context {
443
	return pt.ctx
444
}
445

446
func (pt *peerTaskConductor) Log() *logger.SugaredLoggerOnWith {
447
	return pt.SugaredLoggerOnWith
448
}
449

450
func (pt *peerTaskConductor) UpdateSourceErrorStatus(st *status.Status) {
451
	pt.sourceErrorStatus = st
452
}
453

454
func (pt *peerTaskConductor) cancel(code commonv1.Code, reason string) {
455
	pt.statusOnce.Do(func() {
456
		pt.failedCode = code
457
		pt.failedReason = reason
458
		pt.fail()
459
	})
460
}
461

462
func (pt *peerTaskConductor) cancelNotRegisterred(code commonv1.Code, reason string) {
463
	pt.statusOnce.Do(func() {
464
		pt.failedCode = code
465
		pt.failedReason = reason
466

467
		metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeInit).Add(1)
468

469
		pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
470
		pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
471
		pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
472
		pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
473

474
		close(pt.failCh)
475
		pt.broker.Stop()
476
		pt.span.End()
477
		pt.pieceDownloadCancel()
478
		if pt.pieceTaskSyncManager != nil {
479
			pt.pieceTaskSyncManager.cancel()
480
		}
481
	})
482
}
483

484
// only use when receive back source code from scheduler
485
func (pt *peerTaskConductor) markBackSource() {
486
	pt.needBackSource.Store(true)
487
}
488

489
// only use when legacy get piece from peers schedule timeout
490
func (pt *peerTaskConductor) forceBackSource() {
491
	pt.needBackSource.Store(true)
492
	pt.backSource()
493
}
494

495
func (pt *peerTaskConductor) backSource() {
496
	// cancel all piece download
497
	pt.pieceDownloadCancel()
498
	// cancel all sync pieces
499
	if pt.pieceTaskSyncManager != nil {
500
		pt.pieceTaskSyncManager.cancel()
501
	}
502

503
	ctx, span := tracer.Start(pt.ctx, config.SpanBackSource)
504
	pt.SetContentLength(-1)
505
	err := pt.PieceManager.DownloadSource(ctx, pt, pt.request, pt.rg)
506
	if err != nil {
507
		pt.Errorf("download from source error: %s", err)
508
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
509
		span.RecordError(err)
510
		if isBackSourceError(err) {
511
			pt.cancel(commonv1.Code_ClientBackSourceError, err.Error())
512
		} else {
513
			pt.cancel(commonv1.Code_ClientError, err.Error())
514
		}
515
		span.End()
516
		return
517
	}
518
	pt.Done()
519
	pt.Infof("download from source ok")
520
	span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
521
	span.End()
522
	return
523
}
524

525
func (pt *peerTaskConductor) pullPieces() {
526
	if pt.needBackSource.Load() {
527
		pt.backSource()
528
		return
529
	}
530
	switch pt.sizeScope {
531
	case commonv1.SizeScope_EMPTY:
532
		pt.storeEmptyPeerTask()
533
	case commonv1.SizeScope_TINY:
534
		pt.storeTinyPeerTask()
535
	case commonv1.SizeScope_SMALL:
536
		pt.pullSinglePiece()
537
	case commonv1.SizeScope_NORMAL:
538
		pt.pullPiecesWithP2P()
539
	default:
540
		pt.cancel(commonv1.Code_ClientError, fmt.Sprintf("unknown size scope: %d", pt.sizeScope))
541
	}
542
}
543

544
func (pt *peerTaskConductor) pullPiecesWithP2P() {
545
	var (
546
		// keep same size with pt.failedPieceCh for avoiding deadlock
547
		pieceRequestQueue = NewPieceDispatcher(config.DefaultPieceDispatcherRandomRatio, pt.Log())
548
	)
549
	ctx, cancel := context.WithCancel(pt.ctx)
550

551
	pt.pieceTaskSyncManager = &pieceTaskSyncManager{
552
		ctx:               ctx,
553
		ctxCancel:         cancel,
554
		peerTaskConductor: pt,
555
		pieceRequestQueue: pieceRequestQueue,
556
		workers:           map[string]*pieceTaskSynchronizer{},
557
	}
558
	pt.receivePeerPacket(pieceRequestQueue)
559
}
560

561
func (pt *peerTaskConductor) storeEmptyPeerTask() {
562
	pt.SetContentLength(0)
563
	pt.SetTotalPieces(0)
564
	ctx := pt.ctx
565
	var err error
566
	storageDriver, err := pt.StorageManager.RegisterTask(ctx,
567
		&storage.RegisterTaskRequest{
568
			PeerTaskMetadata: storage.PeerTaskMetadata{
569
				PeerID: pt.peerID,
570
				TaskID: pt.taskID,
571
			},
572
			DesiredLocation: "",
573
			ContentLength:   0,
574
			TotalPieces:     0,
575
		})
576
	pt.storage = storageDriver
577
	if err != nil {
578
		pt.Errorf("register tiny data storage failed: %s", err)
579
		pt.cancel(commonv1.Code_ClientError, err.Error())
580
		return
581
	}
582

583
	if err = pt.UpdateStorage(); err != nil {
584
		pt.Errorf("update tiny data storage failed: %s", err)
585
		pt.cancel(commonv1.Code_ClientError, err.Error())
586
		return
587
	}
588
	pt.Debug("store empty metadata")
589
	pt.Done()
590
}
591

592
func (pt *peerTaskConductor) storeTinyPeerTask() {
593
	contentLength := int64(len(pt.tinyData.Content))
594
	pt.SetContentLength(contentLength)
595
	pt.SetTotalPieces(1)
596
	ctx := pt.ctx
597
	var err error
598
	storageDriver, err := pt.StorageManager.RegisterTask(ctx,
599
		&storage.RegisterTaskRequest{
600
			PeerTaskMetadata: storage.PeerTaskMetadata{
601
				PeerID: pt.tinyData.PeerID,
602
				TaskID: pt.tinyData.TaskID,
603
			},
604
			DesiredLocation: "",
605
			ContentLength:   contentLength,
606
			TotalPieces:     1,
607
			// TODO check digest
608
		})
609
	pt.storage = storageDriver
610
	if err != nil {
611
		pt.Errorf("register tiny data storage failed: %s", err)
612
		pt.cancel(commonv1.Code_ClientError, err.Error())
613
		return
614
	}
615
	n, err := pt.GetStorage().WritePiece(ctx,
616
		&storage.WritePieceRequest{
617
			PeerTaskMetadata: storage.PeerTaskMetadata{
618
				PeerID: pt.tinyData.PeerID,
619
				TaskID: pt.tinyData.TaskID,
620
			},
621
			PieceMetadata: storage.PieceMetadata{
622
				Num:    0,
623
				Md5:    "",
624
				Offset: 0,
625
				Range: nethttp.Range{
626
					Start:  0,
627
					Length: contentLength,
628
				},
629
				Style: 0,
630
			},
631
			UnknownLength: false,
632
			Reader:        bytes.NewBuffer(pt.tinyData.Content),
633
			NeedGenMetadata: func(n int64) (int32, int64, bool) {
634
				return 1, contentLength, true
635
			},
636
		})
637
	if err != nil {
638
		pt.Errorf("write tiny data storage failed: %s", err)
639
		pt.cancel(commonv1.Code_ClientError, err.Error())
640
		return
641
	}
642
	if n != contentLength {
643
		pt.Errorf("write tiny data storage failed, want: %d, wrote: %d", contentLength, n)
644
		pt.cancel(commonv1.Code_ClientError, err.Error())
645
		return
646
	}
647

648
	err = pt.UpdateStorage()
649
	if err != nil {
650
		pt.Errorf("update tiny data storage failed: %s", err)
651
		pt.cancel(commonv1.Code_ClientError, err.Error())
652
		return
653
	}
654

655
	pt.Debugf("store tiny data, len: %d", contentLength)
656
	pt.PublishPieceInfo(0, uint32(contentLength))
657
}
658

659
func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue PieceDispatcher) {
660
	var (
661
		lastNotReadyPiece   int32 = 0
662
		peerPacket          *schedulerv1.PeerPacket
663
		err                 error
664
		firstPacketReceived bool
665
		firstPacketDone     = make(chan bool)
666
	)
667
	// only record first schedule result
668
	// other schedule result will record as an event in peer task span
669
	_, firstPeerSpan := tracer.Start(pt.ctx, config.SpanFirstSchedule)
670
	defer func() {
671
		if !firstPacketReceived {
672
			firstPeerSpan.End()
673
		}
674
		if pt.needBackSource.Load() {
675
			return
676
		}
677
		select {
678
		case <-pt.successCh:
679
		case <-pt.failCh:
680
		default:
681
			pt.Errorf("receivePeerPacket exit, but peer task not success or fail")
682
			pt.Fail()
683
		}
684
	}()
685

686
	go pt.waitFirstPeerPacket(firstPacketDone)
687
loop:
688
	for {
689
		select {
690
		case <-pt.successCh:
691
			pt.Infof("peer task success, stop wait peer packet from scheduler")
692
			break loop
693
		case <-pt.failCh:
694
			pt.Infof("peer task fail, stop wait peer packet from scheduler")
695
			break loop
696
		default:
697
		}
698

699
		peerPacket, err = pt.peerPacketStream.Recv()
700
		if err == io.EOF {
701
			pt.Debugf("peerPacketStream closed")
702
			break loop
703
		}
704
		if err != nil {
705
			// some errors, like commonv1.Code_SchedReregister, after reregister success,
706
			// we can continue to receive peer packet from the new scheduler
707
			cont := pt.confirmReceivePeerPacketError(err)
708
			if cont {
709
				continue
710
			}
711
			if !firstPacketReceived {
712
				firstPeerSpan.RecordError(err)
713
			}
714
			break loop
715
		}
716

717
		pt.Debugf("receive peerPacket %v", peerPacket)
718
		if peerPacket.Code != commonv1.Code_Success {
719
			if peerPacket.Code == commonv1.Code_SchedNeedBackSource {
720
				// fix back source directly, then waitFirstPeerPacket timeout
721
				if !firstPacketReceived {
722
					close(firstPacketDone)
723
				}
724
				pt.forceBackSource()
725
				pt.Infof("receive back source code")
726
				return
727
			}
728
			pt.Errorf("receive peer packet with error: %d", peerPacket.Code)
729
			if pt.isExitPeerPacketCode(peerPacket) {
730
				pt.Errorf(pt.failedReason)
731
				pt.cancel(pt.failedCode, pt.failedReason)
732
				if !firstPacketReceived {
733
					firstPeerSpan.RecordError(fmt.Errorf(pt.failedReason))
734
				}
735
				pt.span.AddEvent("receive exit peer packet",
736
					trace.WithAttributes(config.AttributePeerPacketCode.Int(int(peerPacket.Code))))
737
				pt.span.RecordError(fmt.Errorf(pt.failedReason))
738
				break
739
			} else {
740
				pt.span.AddEvent("receive not success peer packet",
741
					trace.WithAttributes(config.AttributePeerPacketCode.Int(int(peerPacket.Code))))
742
			}
743
			continue
744
		}
745

746
		if peerPacket.MainPeer == nil && peerPacket.CandidatePeers == nil {
747
			pt.Warnf("scheduler client send a peerPacket with empty peers")
748
			continue
749
		}
750
		pt.Infof("receive new peer packet, main peer: %s", peerPacket.MainPeer.PeerId)
751
		pt.span.AddEvent("receive new peer packet",
752
			trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)))
753

754
		if !firstPacketReceived {
755
			pt.initDownloadPieceWorkers(pieceRequestQueue)
756
			firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
757
			firstPeerSpan.End()
758
		}
759

760
		lastNotReadyPiece = pt.updateSynchronizers(lastNotReadyPiece, peerPacket)
761
		if !firstPacketReceived {
762
			// trigger legacy get piece once to avoid first schedule timeout
763
			firstPacketReceived = true
764
			close(firstPacketDone)
765
		}
766
	}
767

768
	// double check to avoid waitFirstPeerPacket timeout
769
	if !firstPacketReceived {
770
		close(firstPacketDone)
771
	}
772
}
773

774
// updateSynchronizers will convert peers to synchronizer, if failed, will update failed peers to schedulerv1.PeerPacket
775
func (pt *peerTaskConductor) updateSynchronizers(lastNum int32, p *schedulerv1.PeerPacket) int32 {
776
	desiredPiece, ok := pt.getNextNotReadyPieceNum(lastNum)
777
	if !ok {
778
		pt.Infof("all pieces is ready, peer task completed, skip to synchronize")
779
		p.MainPeer = nil
780
		p.CandidatePeers = nil
781
		return desiredPiece
782
	}
783
	var peers = []*schedulerv1.PeerPacket_DestPeer{p.MainPeer}
784
	peers = append(peers, p.CandidatePeers...)
785

786
	pt.pieceTaskSyncManager.syncPeers(peers, desiredPiece)
787
	return desiredPiece
788
}
789

790
/*
791
When one scheduler goes away(force killed or broken tcp connection) before the peer task done, we will receive the following error message:
792

793
	receive peer packet failed: rpc error: code = Unavailable desc = closing transport due to: connection error: desc = "error reading from server: EOF", received prior goaway: code: NO_ERROR, debug data: "graceful_stop"
794

795
The underlay error is "connection error: desc = \"error reading from server: EOF\"", the grpc code is codes.Unavailable.
796

797
refer grpc-go link:
798
https://github.com/grpc/grpc-go/blob/v1.60.1/test/goaway_test.go#L118
799
https://github.com/grpc/grpc-go/blob/v1.60.1/internal/transport/http2_client.go#L987
800
*/
801
func isSchedulerUnavailable(err error) bool {
802
	return status.Code(err) == codes.Unavailable
803
}
804

805
func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool) {
806
	select {
807
	case <-pt.successCh:
808
		return false
809
	case <-pt.failCh:
810
		return false
811
	default:
812
	}
813
	var (
814
		failedCode   = commonv1.Code_UnknownError
815
		failedReason string
816
	)
817
	// extract DfError for grpc status
818
	de, ok := dferrors.IsGRPCDfError(err)
819
	if ok {
820
		switch de.Code {
821
		case commonv1.Code_SchedNeedBackSource:
822
			pt.forceBackSource()
823
			pt.Infof("receive back source code")
824
			return false
825
		case commonv1.Code_SchedReregister:
826
			pt.Infof("receive reregister code")
827
			regErr := pt.register()
828
			if regErr == nil {
829
				pt.Infof("reregister ok")
830
				return true
831
			}
832
			pt.Errorf("reregister to scheduler error: %s", regErr)
833
			fallthrough
834
		default:
835
			failedCode = de.Code
836
			failedReason = de.Message
837
			pt.Errorf("receive peer packet failed: %s", pt.failedReason)
838
		}
839
	} else {
840
		pt.Errorf("receive peer packet failed: %s", err)
841
		if isSchedulerUnavailable(err) {
842
			regErr := pt.register()
843
			if regErr == nil {
844
				pt.Infof("reregister ok")
845
				return true
846
			}
847
			pt.Errorf("reregister to scheduler error: %s", regErr)
848
		}
849
	}
850
	pt.cancel(failedCode, failedReason)
851
	return false
852
}
853

854
func (pt *peerTaskConductor) isExitPeerPacketCode(pp *schedulerv1.PeerPacket) bool {
855
	switch pp.Code {
856
	case commonv1.Code_ResourceLacked, commonv1.Code_BadRequest,
857
		commonv1.Code_PeerTaskNotFound, commonv1.Code_UnknownError, commonv1.Code_RequestTimeOut:
858
		// 1xxx
859
		pt.failedCode = pp.Code
860
		pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
861
		return true
862
	case commonv1.Code_SchedError, commonv1.Code_SchedTaskStatusError, commonv1.Code_SchedPeerNotFound, commonv1.Code_SchedForbidden:
863
		// 5xxx
864
		pt.failedCode = pp.Code
865
		pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
866
		return true
867
	case commonv1.Code_SchedPeerGone:
868
		pt.failedReason = reasonPeerGoneFromScheduler
869
		pt.failedCode = commonv1.Code_SchedPeerGone
870
		return true
871
	case commonv1.Code_CDNTaskRegistryFail:
872
		// 6xxx
873
		pt.failedCode = pp.Code
874
		pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code)
875
		return true
876
	case commonv1.Code_BackToSourceAborted:
877
		st := status.Newf(codes.Aborted, "source response is not valid")
878
		st, err := st.WithDetails(pp.GetSourceError())
879
		if err != nil {
880
			pt.Errorf("convert source error details error: %s", err.Error())
881
			return false
882
		}
883

884
		pt.sourceErrorStatus = st
885
		return true
886
	}
887
	return false
888
}
889

890
func (pt *peerTaskConductor) pullSinglePiece() {
891
	pt.Infof("single piece, dest peer id: %s, piece num: %d, size: %d",
892
		pt.singlePiece.DstPid, pt.singlePiece.PieceInfo.PieceNum, pt.singlePiece.PieceInfo.RangeSize)
893

894
	ctx, span := tracer.Start(pt.ctx, fmt.Sprintf(config.SpanDownloadPiece, pt.singlePiece.PieceInfo.PieceNum))
895
	span.SetAttributes(config.AttributePiece.Int(int(pt.singlePiece.PieceInfo.PieceNum)))
896

897
	pt.SetContentLength(int64(pt.singlePiece.PieceInfo.RangeSize))
898
	pt.SetTotalPieces(1)
899
	pt.SetPieceMd5Sign(digest.SHA256FromStrings(pt.singlePiece.PieceInfo.PieceMd5))
900

901
	request := &DownloadPieceRequest{
902
		storage: pt.GetStorage(),
903
		piece:   pt.singlePiece.PieceInfo,
904
		log:     pt.Log(),
905
		TaskID:  pt.GetTaskID(),
906
		PeerID:  pt.GetPeerID(),
907
		DstPid:  pt.singlePiece.DstPid,
908
		DstAddr: pt.singlePiece.DstAddr,
909
	}
910

911
	if result, err := pt.PieceManager.DownloadPiece(ctx, request); err == nil {
912
		pt.reportSuccessResult(request, result)
913
		pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
914

915
		span.SetAttributes(config.AttributePieceSuccess.Bool(true))
916
		span.End()
917
		pt.Infof("single piece download success")
918
	} else {
919
		// fallback to download from other peers
920
		span.RecordError(err)
921
		span.SetAttributes(config.AttributePieceSuccess.Bool(false))
922
		span.End()
923

924
		pt.Warnf("single piece download failed, switch to download from other peers")
925
		pt.ReportPieceResult(request, result, err)
926

927
		pt.pullPiecesWithP2P()
928
	}
929
}
930

931
func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
932
	// update total piece
933
	var metadataChanged bool
934
	if piecePacket.TotalPiece > pt.GetTotalPieces() {
935
		metadataChanged = true
936
		pt.SetTotalPieces(piecePacket.TotalPiece)
937
		pt.Debugf("update total piece count: %d, dst peer %s", piecePacket.TotalPiece, piecePacket.DstPid)
938
	}
939

940
	// update digest
941
	if len(piecePacket.PieceMd5Sign) > 0 && len(pt.GetPieceMd5Sign()) == 0 {
942
		metadataChanged = true
943
		pt.SetPieceMd5Sign(piecePacket.PieceMd5Sign)
944
		pt.Debugf("update digest: %s, dst peer %s", piecePacket.PieceMd5Sign, piecePacket.DstPid)
945
	}
946

947
	// update content length
948
	if piecePacket.ContentLength > -1 && pt.GetContentLength() == -1 {
949
		metadataChanged = true
950
		pt.SetContentLength(piecePacket.ContentLength)
951
		pt.span.SetAttributes(config.AttributeTaskContentLength.Int64(piecePacket.ContentLength))
952
		pt.Debugf("update content length: %d, dst peer %s", piecePacket.ContentLength, piecePacket.DstPid)
953
	} else if piecePacket.ContentLength > -1 && piecePacket.ContentLength != pt.GetContentLength() {
954
		// corrupt data check
955
		reason := fmt.Sprintf("corrupt data - content length did not match, current: %d, from piece packet: %d",
956
			pt.GetContentLength(), piecePacket.ContentLength)
957
		pt.Errorf(reason)
958
		pt.cancel(commonv1.Code_ClientError, reason)
959
		return
960
	}
961

962
	if piecePacket.ExtendAttribute != nil && len(piecePacket.ExtendAttribute.Header) > 0 && pt.GetHeader() == nil {
963
		metadataChanged = true
964
		pt.SetHeader(piecePacket.ExtendAttribute.Header)
965
		pt.Debugf("update response header: %#v, dst peer %s", piecePacket.ExtendAttribute.Header, piecePacket.DstPid)
966
	}
967

968
	if metadataChanged {
969
		err := pt.UpdateStorage()
970
		if err != nil {
971
			pt.Errorf("update storage error: %s", err)
972
		}
973
	}
974
}
975

976
func (pt *peerTaskConductor) initDownloadPieceWorkers(pieceRequestQueue PieceDispatcher) {
977
	count := 4
978
	for i := int32(0); i < int32(count); i++ {
979
		go pt.downloadPieceWorker(i, pieceRequestQueue)
980
	}
981
}
982

983
func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) {
984
	// wait first available peer
985
	select {
986
	case <-pt.successCh:
987
		pt.Infof("peer task succeed, no need to wait first peer")
988
		return
989
	case <-pt.failCh:
990
		pt.Warnf("peer task failed, no need to wait first peer")
991
		return
992
	case <-done:
993
		pt.Debugf("first peer packet received")
994
		return
995
	case <-time.After(pt.SchedulerOption.ScheduleTimeout.Duration):
996
		if pt.SchedulerOption.DisableAutoBackSource {
997
			pt.cancel(commonv1.Code_ClientScheduleTimeout, reasonBackSourceDisabled)
998
			err := fmt.Errorf("%s, auto back source disabled", pt.failedReason)
999
			pt.span.RecordError(err)
1000
			pt.Errorf(err.Error())
1001
			return
1002
		}
1003
		pt.Warnf("start download from source due to %s", reasonScheduleTimeout)
1004
		pt.span.AddEvent("back source due to schedule timeout")
1005
		pt.forceBackSource()
1006
		return
1007
	}
1008
}
1009

1010
func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests PieceDispatcher) {
1011
	for {
1012
		request, err := requests.Get()
1013
		if errors.Is(err, ErrNoValidPieceTemporarily) {
1014
			continue
1015
		}
1016
		if err != nil {
1017
			pt.Infof("piece download queue cancelled, peer download worker #%d exit, err: %v", id, err)
1018
			return
1019
		}
1020
		pt.readyPiecesLock.RLock()
1021
		if pt.readyPieces.IsSet(request.piece.PieceNum) {
1022
			pt.readyPiecesLock.RUnlock()
1023
			pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum)
1024
			continue
1025
		}
1026
		pt.readyPiecesLock.RUnlock()
1027
		result := pt.downloadPiece(id, request)
1028
		if result != nil {
1029
			requests.Report(result)
1030
		}
1031
		select {
1032
		case <-pt.pieceDownloadCtx.Done():
1033
			pt.Infof("piece download cancelled, peer download worker #%d exit", id)
1034
			return
1035
		case <-pt.successCh:
1036
			pt.Infof("peer task success, peer download worker #%d exit", id)
1037
			return
1038
		case <-pt.failCh:
1039
			pt.Errorf("peer task fail, peer download worker #%d exit", id)
1040
			return
1041
		default:
1042
		}
1043
	}
1044
}
1045

1046
func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) *DownloadPieceResult {
1047
	// only downloading piece in one worker at same time
1048
	pt.runningPiecesLock.Lock()
1049
	if pt.runningPieces.IsSet(request.piece.PieceNum) {
1050
		pt.runningPiecesLock.Unlock()
1051
		pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum)
1052
		// TODO save to queue for failed pieces
1053
		return nil
1054
	}
1055
	pt.runningPieces.Set(request.piece.PieceNum)
1056
	pt.runningPiecesLock.Unlock()
1057

1058
	defer func() {
1059
		pt.runningPiecesLock.Lock()
1060
		pt.runningPieces.Clean(request.piece.PieceNum)
1061
		pt.runningPiecesLock.Unlock()
1062
	}()
1063

1064
	ctx, span := tracer.Start(pt.pieceDownloadCtx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum))
1065
	span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum)))
1066
	span.SetAttributes(config.AttributePieceWorker.Int(int(workerID)))
1067

1068
	// wait limit
1069
	if pt.limiter != nil && !pt.waitLimit(ctx, request) {
1070
		span.SetAttributes(config.AttributePieceSuccess.Bool(false))
1071
		span.End()
1072
		return nil
1073
	}
1074

1075
	pt.Debugf("peer download worker #%d receive piece task, "+
1076
		"dest peer id: %s, piece num: %d, range start: %d, range size: %d",
1077
		workerID, request.DstPid, request.piece.PieceNum, request.piece.RangeStart, request.piece.RangeSize)
1078
	// download piece
1079
	// result is always not nil, PieceManager will report begin and end time
1080
	result, err := pt.PieceManager.DownloadPiece(ctx, request)
1081
	if err != nil {
1082
		pt.ReportPieceResult(request, result, err)
1083
		span.SetAttributes(config.AttributePieceSuccess.Bool(false))
1084
		span.End()
1085
		if pt.needBackSource.Load() {
1086
			pt.Infof("switch to back source, skip send failed piece")
1087
			return result
1088
		}
1089
		attempt, success := pt.pieceTaskSyncManager.acquire(
1090
			&commonv1.PieceTaskRequest{
1091
				Limit:    1,
1092
				TaskId:   pt.taskID,
1093
				SrcPid:   pt.peerID,
1094
				StartNum: uint32(request.piece.PieceNum),
1095
			})
1096
		pt.Infof("send failed piece %d to remote, attempt: %d, success: %d",
1097
			request.piece.PieceNum, attempt, success)
1098
		return result
1099
	}
1100
	// broadcast success piece
1101
	pt.reportSuccessResult(request, result)
1102
	pt.PublishPieceInfo(request.piece.PieceNum, request.piece.RangeSize)
1103

1104
	span.SetAttributes(config.AttributePieceSuccess.Bool(true))
1105
	span.End()
1106
	return result
1107
}
1108

1109
func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPieceRequest) bool {
1110
	_, waitSpan := tracer.Start(ctx, config.SpanWaitPieceLimit)
1111
	pt.trafficShaper.Record(pt.peerTaskManager.getRunningTaskKey(request.TaskID, request.PeerID), int(request.piece.RangeSize))
1112
	err := pt.limiter.WaitN(pt.ctx, int(request.piece.RangeSize))
1113
	if err == nil {
1114
		waitSpan.End()
1115
		return true
1116
	}
1117

1118
	pt.Errorf("request limiter error: %s", err)
1119
	waitSpan.RecordError(err)
1120
	waitSpan.End()
1121

1122
	// send error piece result
1123
	sendError := pt.sendPieceResult(&schedulerv1.PieceResult{
1124
		TaskId:        pt.GetTaskID(),
1125
		SrcPid:        pt.GetPeerID(),
1126
		DstPid:        request.DstPid,
1127
		PieceInfo:     request.piece,
1128
		Success:       false,
1129
		Code:          commonv1.Code_ClientRequestLimitFail,
1130
		FinishedCount: 0, // update by peer task
1131
	})
1132
	if sendError != nil {
1133
		pt.Errorf("report piece result failed %s", err)
1134
	}
1135

1136
	pt.cancel(commonv1.Code_ClientRequestLimitFail, err.Error())
1137
	return false
1138
}
1139

1140
func (pt *peerTaskConductor) isCompleted() bool {
1141
	if pt.completedLength.Load() == pt.GetContentLength() {
1142
		pt.Infof("completed content length: %d", pt.completedLength.Load())
1143
		return true
1144
	}
1145

1146
	// corrupt data check and avoid hang for mismatch completed length
1147
	if pt.readyPieces.Settled() == pt.totalPiece.Load() {
1148
		msg := fmt.Sprintf("corrupt data - ready piece count %d seems finished, but completed length %d is not match with content length: %d",
1149
			pt.totalPiece.Load(), pt.completedLength.Load(), pt.GetContentLength())
1150
		pt.Errorf(msg)
1151
		pt.cancel(commonv1.Code_ClientError, msg)
1152
		return true
1153
	}
1154

1155
	return false
1156
}
1157

1158
// for legacy peers only
1159
func (pt *peerTaskConductor) getNextPieceNum(cur int32) (int32, bool) {
1160
	if pt.isCompleted() {
1161
		return -1, false
1162
	}
1163
	i := cur
1164
	// try to find next not requested piece
1165
	pt.requestedPiecesLock.RLock()
1166
	defer pt.requestedPiecesLock.RUnlock()
1167

1168
	for ; pt.requestedPieces.IsSet(i); i++ {
1169
	}
1170
	totalPiece := pt.GetTotalPieces()
1171
	if totalPiece > 0 && i >= totalPiece {
1172
		// double check, re-search not success or not requested pieces
1173
		for i = int32(0); pt.requestedPieces.IsSet(i); i++ {
1174
		}
1175
		if totalPiece > 0 && i >= totalPiece {
1176
			return -1, false
1177
		}
1178
	}
1179
	return i, true
1180
}
1181

1182
func (pt *peerTaskConductor) getNextNotReadyPieceNum(cur int32) (int32, bool) {
1183
	if pt.isCompleted() {
1184
		return 0, false
1185
	}
1186
	i := cur
1187
	// try to find next not ready piece
1188
	pt.readyPiecesLock.RLock()
1189
	defer pt.readyPiecesLock.RUnlock()
1190

1191
	for ; pt.readyPieces.IsSet(i); i++ {
1192
	}
1193
	totalPiece := pt.GetTotalPieces()
1194
	if totalPiece > 0 && i >= totalPiece {
1195
		// double check, re-search
1196
		for i = int32(0); pt.readyPieces.IsSet(i); i++ {
1197
		}
1198
		if totalPiece > 0 && i >= totalPiece {
1199
			return 0, false
1200
		}
1201
	}
1202
	return i, true
1203
}
1204

1205
func (pt *peerTaskConductor) recoverFromPanic() {
1206
	if r := recover(); r != nil {
1207
		pt.Errorf("recovered from panic %q. Call stack:\n%v", r, string(debug.Stack()))
1208
	}
1209
}
1210

1211
func (pt *peerTaskConductor) ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, err error) {
1212
	if err == nil {
1213
		pt.reportSuccessResult(request, result)
1214
		return
1215
	}
1216
	code := commonv1.Code_ClientPieceDownloadFail
1217
	if isConnectionError(err) {
1218
		code = commonv1.Code_ClientConnectionError
1219
	} else if isPieceNotFound(err) {
1220
		code = commonv1.Code_ClientPieceNotFound
1221
	} else if isBackSourceError(err) {
1222
		code = commonv1.Code_ClientBackSourceError
1223
	}
1224
	pt.reportFailResult(request, result, code)
1225
}
1226

1227
func (pt *peerTaskConductor) reportSuccessResult(request *DownloadPieceRequest, result *DownloadPieceResult) {
1228
	metrics.PieceTaskCount.Add(1)
1229
	_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
1230
	span.SetAttributes(config.AttributeWritePieceSuccess.Bool(true))
1231

1232
	err := pt.sendPieceResult(
1233
		&schedulerv1.PieceResult{
1234
			TaskId:        pt.GetTaskID(),
1235
			SrcPid:        pt.GetPeerID(),
1236
			DstPid:        request.DstPid,
1237
			PieceInfo:     request.piece,
1238
			BeginTime:     uint64(result.BeginTime),
1239
			EndTime:       uint64(result.FinishTime),
1240
			Success:       true,
1241
			Code:          commonv1.Code_Success,
1242
			FinishedCount: pt.readyPieces.Settled(),
1243
			// TODO range_start, range_size, piece_md5, piece_offset, piece_style
1244
		})
1245
	if err != nil {
1246
		pt.Errorf("report piece task error: %v", err)
1247
		span.RecordError(err)
1248
	}
1249

1250
	span.End()
1251
}
1252

1253
func (pt *peerTaskConductor) reportFailResult(request *DownloadPieceRequest, result *DownloadPieceResult, code commonv1.Code) {
1254
	metrics.PieceTaskFailedCount.Add(1)
1255
	_, span := tracer.Start(pt.ctx, config.SpanReportPieceResult)
1256
	span.SetAttributes(config.AttributeWritePieceSuccess.Bool(false))
1257

1258
	err := pt.sendPieceResult(&schedulerv1.PieceResult{
1259
		TaskId:        pt.GetTaskID(),
1260
		SrcPid:        pt.GetPeerID(),
1261
		DstPid:        request.DstPid,
1262
		PieceInfo:     request.piece,
1263
		BeginTime:     uint64(result.BeginTime),
1264
		EndTime:       uint64(result.FinishTime),
1265
		Success:       false,
1266
		Code:          code,
1267
		FinishedCount: pt.readyPieces.Settled(),
1268
	})
1269
	if err != nil {
1270
		pt.Errorf("report piece task error: %v", err)
1271
	}
1272
	span.End()
1273
}
1274

1275
func (pt *peerTaskConductor) initStorage(desiredLocation string) (err error) {
1276
	// prepare storage
1277
	if pt.parent == nil {
1278
		pt.storage, err = pt.StorageManager.RegisterTask(pt.ctx,
1279
			&storage.RegisterTaskRequest{
1280
				PeerTaskMetadata: storage.PeerTaskMetadata{
1281
					PeerID: pt.GetPeerID(),
1282
					TaskID: pt.GetTaskID(),
1283
				},
1284
				DesiredLocation: desiredLocation,
1285
				ContentLength:   pt.GetContentLength(),
1286
				TotalPieces:     pt.GetTotalPieces(),
1287
				PieceMd5Sign:    pt.GetPieceMd5Sign(),
1288
			})
1289
	} else {
1290
		pt.storage, err = pt.StorageManager.RegisterSubTask(pt.ctx,
1291
			&storage.RegisterSubTaskRequest{
1292
				Parent: storage.PeerTaskMetadata{
1293
					PeerID: pt.parent.GetPeerID(),
1294
					TaskID: pt.parent.GetTaskID(),
1295
				},
1296
				SubTask: storage.PeerTaskMetadata{
1297
					PeerID: pt.GetPeerID(),
1298
					TaskID: pt.GetTaskID(),
1299
				},
1300
				Range: pt.rg,
1301
			})
1302
	}
1303
	if err != nil {
1304
		pt.Log().Errorf("register task to storage manager failed: %s", err)
1305
	}
1306
	return err
1307
}
1308

1309
func (pt *peerTaskConductor) UpdateStorage() error {
1310
	// update storage
1311
	err := pt.GetStorage().UpdateTask(pt.ctx,
1312
		&storage.UpdateTaskRequest{
1313
			PeerTaskMetadata: storage.PeerTaskMetadata{
1314
				PeerID: pt.GetPeerID(),
1315
				TaskID: pt.GetTaskID(),
1316
			},
1317
			ContentLength: pt.GetContentLength(),
1318
			TotalPieces:   pt.GetTotalPieces(),
1319
			PieceMd5Sign:  pt.GetPieceMd5Sign(),
1320
			Header:        pt.GetHeader(),
1321
		})
1322
	if err != nil {
1323
		pt.Log().Errorf("update task to storage manager failed: %s", err)
1324
		return err
1325
	}
1326

1327
	return nil
1328
}
1329

1330
func (pt *peerTaskConductor) Done() {
1331
	pt.statusOnce.Do(pt.done)
1332
}
1333

1334
func (pt *peerTaskConductor) done() {
1335
	defer func() {
1336
		pt.broker.Stop()
1337
		pt.span.End()
1338
		pt.pieceDownloadCancel()
1339
		if pt.pieceTaskSyncManager != nil {
1340
			pt.pieceTaskSyncManager.cancel()
1341
		}
1342
		pt.ctxCancel()
1343
	}()
1344
	var (
1345
		cost    = time.Since(pt.startTime).Milliseconds()
1346
		success = true
1347
		code    = commonv1.Code_Success
1348
	)
1349
	pt.Log().Infof("peer task done, cost: %dms", cost)
1350
	// TODO merge error handle
1351
	// update storage metadata
1352
	if err := pt.UpdateStorage(); err == nil {
1353
		// validate digest
1354
		if err = pt.Validate(); err == nil {
1355
			close(pt.successCh)
1356
			pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
1357
		} else {
1358
			close(pt.failCh)
1359
			success = false
1360
			code = commonv1.Code_ClientError
1361
			pt.failedCode = commonv1.Code_ClientError
1362
			pt.failedReason = err.Error()
1363

1364
			pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1365
			pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1366
			pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1367
			pt.Errorf("validate digest failed: %s", err)
1368
			metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1369
		}
1370
	} else {
1371
		close(pt.failCh)
1372
		success = false
1373
		code = commonv1.Code_ClientError
1374
		pt.failedCode = commonv1.Code_ClientError
1375
		pt.failedReason = err.Error()
1376

1377
		pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1378
		pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1379
		pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1380
		pt.Errorf("update storage error: %v", err)
1381
		metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1382
	}
1383

1384
	pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
1385
	peerResultCtx, peerResultSpan := tracer.Start(pt.ctx, config.SpanReportPeerResult)
1386
	defer peerResultSpan.End()
1387

1388
	// Send EOF piece result to scheduler.
1389
	err := pt.sendPieceResult(
1390
		&schedulerv1.PieceResult{
1391
			TaskId:        pt.taskID,
1392
			SrcPid:        pt.peerID,
1393
			FinishedCount: pt.readyPieces.Settled(),
1394
			PieceInfo: &commonv1.PieceInfo{
1395
				PieceNum: common.EndOfPiece,
1396
			},
1397
		})
1398
	pt.Debugf("peer task finished, end piece result sent result: %v", err)
1399

1400
	err = pt.peerPacketStream.CloseSend()
1401
	pt.Debugf("close stream result: %v", err)
1402

1403
	err = pt.schedulerClient.ReportPeerResult(
1404
		peerResultCtx,
1405
		&schedulerv1.PeerResult{
1406
			TaskId:          pt.GetTaskID(),
1407
			PeerId:          pt.GetPeerID(),
1408
			SrcIp:           pt.PeerHost.Ip,
1409
			Idc:             pt.PeerHost.Idc,
1410
			Url:             pt.request.Url,
1411
			ContentLength:   pt.GetContentLength(),
1412
			Traffic:         pt.GetTraffic(),
1413
			TotalPieceCount: pt.GetTotalPieces(),
1414
			Cost:            uint32(cost),
1415
			Success:         success,
1416
			Code:            code,
1417
		})
1418
	if err != nil {
1419
		peerResultSpan.RecordError(err)
1420
		pt.Errorf("step 3: report successful peer result, error: %v", err)
1421
	} else {
1422
		pt.Infof("step 3: report successful peer result ok")
1423
	}
1424
}
1425

1426
func (pt *peerTaskConductor) Fail() {
1427
	pt.statusOnce.Do(pt.fail)
1428
}
1429

1430
func (pt *peerTaskConductor) fail() {
1431
	if pt.failedCode == commonv1.Code_ClientBackSourceError {
1432
		metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeBackSource).Add(1)
1433
	} else {
1434
		metrics.PeerTaskFailedCount.WithLabelValues(metrics.FailTypeP2P).Add(1)
1435
	}
1436
	defer func() {
1437
		close(pt.failCh)
1438
		pt.broker.Stop()
1439
		pt.span.End()
1440
		pt.pieceDownloadCancel()
1441
		if pt.pieceTaskSyncManager != nil {
1442
			pt.pieceTaskSyncManager.cancel()
1443
		}
1444
		// mark storage to reclaim
1445
		_ = pt.StorageManager.UnregisterTask(
1446
			pt.ctx,
1447
			storage.CommonTaskRequest{
1448
				PeerID: pt.peerID,
1449
				TaskID: pt.taskID,
1450
			})
1451
		pt.ctxCancel()
1452
	}()
1453
	pt.peerTaskManager.PeerTaskDone(pt.taskID, pt.peerID)
1454
	var end = time.Now()
1455
	pt.Log().Errorf("peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason)
1456

1457
	// Send EOF piece result to scheduler.
1458
	err := pt.sendPieceResult(&schedulerv1.PieceResult{
1459
		TaskId:        pt.taskID,
1460
		SrcPid:        pt.peerID,
1461
		FinishedCount: pt.readyPieces.Settled(),
1462
		PieceInfo: &commonv1.PieceInfo{
1463
			PieceNum: common.EndOfPiece,
1464
		},
1465
	})
1466
	pt.Debugf("end piece result sent: %v, peer task finished", err)
1467

1468
	err = pt.peerPacketStream.CloseSend()
1469
	pt.Debugf("close stream result: %v", err)
1470

1471
	ctx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(pt.ctx))
1472
	peerResultCtx, peerResultSpan := tracer.Start(ctx, config.SpanReportPeerResult)
1473
	defer peerResultSpan.End()
1474

1475
	var sourceError *errordetailsv1.SourceError
1476
	if pt.sourceErrorStatus != nil {
1477
		for _, detail := range pt.sourceErrorStatus.Details() {
1478
			switch d := detail.(type) {
1479
			case *errordetailsv1.SourceError:
1480
				sourceError = d
1481
			}
1482
		}
1483
	}
1484
	peerResult := &schedulerv1.PeerResult{
1485
		TaskId:          pt.GetTaskID(),
1486
		PeerId:          pt.GetPeerID(),
1487
		SrcIp:           pt.PeerHost.Ip,
1488
		Idc:             pt.PeerHost.Idc,
1489
		Url:             pt.request.Url,
1490
		ContentLength:   pt.GetContentLength(),
1491
		Traffic:         pt.GetTraffic(),
1492
		TotalPieceCount: pt.GetTotalPieces(),
1493
		Cost:            uint32(end.Sub(pt.startTime).Milliseconds()),
1494
		Success:         false,
1495
		Code:            pt.failedCode,
1496
	}
1497
	if sourceError != nil {
1498
		peerResult.Errordetails = &schedulerv1.PeerResult_SourceError{
1499
			SourceError: sourceError,
1500
		}
1501
	}
1502
	err = pt.schedulerClient.ReportPeerResult(peerResultCtx, peerResult)
1503
	if err != nil {
1504
		peerResultSpan.RecordError(err)
1505
		pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
1506
	} else {
1507
		pt.Log().Infof("step 3: report fail peer result ok")
1508
	}
1509

1510
	pt.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
1511
	pt.span.SetAttributes(config.AttributePeerTaskCode.Int(int(pt.failedCode)))
1512
	pt.span.SetAttributes(config.AttributePeerTaskMessage.String(pt.failedReason))
1513
}
1514

1515
// Validate stores metadata and validates digest
1516
func (pt *peerTaskConductor) Validate() error {
1517
	err := pt.GetStorage().Store(pt.ctx,
1518
		&storage.StoreRequest{
1519
			CommonTaskRequest: storage.CommonTaskRequest{
1520
				PeerID: pt.peerID,
1521
				TaskID: pt.taskID,
1522
			},
1523
			MetadataOnly: true,
1524
			TotalPieces:  pt.GetTotalPieces(),
1525
		})
1526
	if err != nil {
1527
		pt.Errorf("store metadata error: %s", err)
1528
		return err
1529
	}
1530

1531
	if !pt.CalculateDigest {
1532
		return nil
1533
	}
1534
	err = pt.GetStorage().ValidateDigest(
1535
		&storage.PeerTaskMetadata{
1536
			PeerID: pt.GetPeerID(),
1537
			TaskID: pt.GetTaskID(),
1538
		})
1539
	if err != nil {
1540
		pt.Errorf("validate digest error: %s", err)
1541
		return err
1542
	}
1543
	pt.Debugf("validate digest ok")
1544

1545
	return err
1546
}
1547

1548
func (pt *peerTaskConductor) PublishPieceInfo(pieceNum int32, size uint32) {
1549
	// mark piece ready
1550
	pt.readyPiecesLock.Lock()
1551
	if pt.readyPieces.IsSet(pieceNum) {
1552
		pt.readyPiecesLock.Unlock()
1553
		pt.Warnf("piece %d is already reported, skipped", pieceNum)
1554
		return
1555
	}
1556
	// mark piece processed
1557
	pt.readyPieces.Set(pieceNum)
1558
	pt.completedLength.Add(int64(size))
1559
	pt.readyPiecesLock.Unlock()
1560

1561
	finished := pt.isCompleted()
1562
	if finished {
1563
		pt.Done()
1564
	}
1565
	pt.broker.Publish(
1566
		&PieceInfo{
1567
			Num:      pieceNum,
1568
			Finished: finished,
1569
		})
1570
}
1571

1572
func (pt *peerTaskConductor) sendPieceResult(pr *schedulerv1.PieceResult) error {
1573
	pt.sendPieceResultLock.Lock()
1574
	err := pt.peerPacketStream.Send(pr)
1575
	pt.sendPieceResultLock.Unlock()
1576
	return err
1577
}
1578

1579
func (pt *peerTaskConductor) getFailedError() error {
1580
	if pt.sourceErrorStatus != nil {
1581
		return pt.sourceErrorStatus.Err()
1582
	}
1583
	return fmt.Errorf("peer task failed: %d/%s", pt.failedCode, pt.failedReason)
1584
}
1585

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.