Dragonfly2
505 строк · 14.1 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination peertask_manager_mock.go -source peertask_manager.go -package peer
18
19package peer20
21import (22"context"23"errors"24"fmt"25"io"26"sync"27
28"github.com/go-http-utils/headers"29"go.opentelemetry.io/otel"30"go.opentelemetry.io/otel/trace"31"golang.org/x/time/rate"32"google.golang.org/grpc/status"33
34commonv1 "d7y.io/api/v2/pkg/apis/common/v1"35schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"36
37"d7y.io/dragonfly/v2/client/daemon/metrics"38"d7y.io/dragonfly/v2/client/daemon/storage"39logger "d7y.io/dragonfly/v2/internal/dflog"40"d7y.io/dragonfly/v2/internal/util"41"d7y.io/dragonfly/v2/pkg/idgen"42nethttp "d7y.io/dragonfly/v2/pkg/net/http"43schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"44)
45
46// TaskManager processes all peer tasks request
47type TaskManager interface {48// StartFileTask starts a peer task to download a file49// return a progress channel for request download progress50// tiny stands task file is tiny and task is done51StartFileTask(ctx context.Context, req *FileTaskRequest) (52progress chan *FileTaskProgress, err error)53// StartStreamTask starts a peer task with stream io54StartStreamTask(ctx context.Context, req *StreamTaskRequest) (55readCloser io.ReadCloser, attribute map[string]string, err error)56// StartSeedTask starts a seed peer task57StartSeedTask(ctx context.Context, req *SeedTaskRequest) (58seedTaskResult *SeedTaskResponse, reuse bool, err error)59
60Subscribe(request *commonv1.PieceTaskRequest) (*SubscribeResponse, bool)61
62IsPeerTaskRunning(taskID string, peerID string) (Task, bool)63
64// StatTask checks whether the given task exists in P2P network65StatTask(ctx context.Context, taskID string) (*schedulerv1.Task, error)66
67// AnnouncePeerTask announces peer task info to P2P network68AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType commonv1.TaskType, urlMeta *commonv1.UrlMeta) error69
70GetPieceManager() PieceManager71
72// Stop stops the PeerTaskManager73Stop(ctx context.Context) error74}
75
76// Task represents common interface to operate a peer task
77type Task interface {78Logger
79Context() context.Context80Log() *logger.SugaredLoggerOnWith81
82GetStorage() storage.TaskStorageDriver83
84GetPeerID() string85GetTaskID() string86
87GetTotalPieces() int3288SetTotalPieces(int32)89
90GetContentLength() int6491SetContentLength(int64)92
93AddTraffic(uint64)94GetTraffic() uint6495
96SetPieceMd5Sign(string)97GetPieceMd5Sign() string98
99PublishPieceInfo(pieceNum int32, size uint32)100ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, err error)101
102UpdateSourceErrorStatus(st *status.Status)103}
104
105type Logger interface {106Log() *logger.SugaredLoggerOnWith107}
108
109type TinyData struct {110TaskID string111PeerID string112Content []byte113}
114
115var tracer trace.Tracer116
117func init() {118tracer = otel.Tracer("dfget-daemon")119}
120
121type peerTaskManager struct {122TaskManagerOption
123conductorLock sync.Locker124runningPeerTasks sync.Map125trafficShaper TrafficShaper
126}
127
128type TaskManagerOption struct {129TaskOption
130SchedulerClient schedulerclient.V1131PerPeerRateLimit rate.Limit132TotalRateLimit rate.Limit133TrafficShaperType string134// Multiplex indicates to reuse the data of completed peer tasks135Multiplex bool136// Prefetch indicates to prefetch the whole files of ranged requests137Prefetch bool138GetPiecesMaxRetry int139SplitRunningTasks bool140}
141
142func NewPeerTaskManager(opt *TaskManagerOption) (TaskManager, error) {143ptm := &peerTaskManager{144TaskManagerOption: *opt,145runningPeerTasks: sync.Map{},146conductorLock: &sync.Mutex{},147trafficShaper: NewTrafficShaper(opt.TrafficShaperType, opt.TotalRateLimit, util.ComputePieceSize),148}149ptm.trafficShaper.Start()150return ptm, nil151}
152
153func (ptm *peerTaskManager) findPeerTaskConductor(key string) (*peerTaskConductor, bool) {154pt, ok := ptm.runningPeerTasks.Load(key)155if !ok {156return nil, false157}158return pt.(*peerTaskConductor), true159}
160
161func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,162taskID string,163request *schedulerv1.PeerTaskRequest,164limit rate.Limit,165parent *peerTaskConductor,166rg *nethttp.Range,167desiredLocation string,168seed bool) (*peerTaskConductor, error) {169var (170ptc *peerTaskConductor171created bool172err error173)174
175if ptm.SplitRunningTasks {176ptc, created, err = ptm.createSplitedPeerTaskConductor(177ctx, taskID, request, limit, parent, rg, desiredLocation, seed)178} else {179ptc, created, err = ptm.getOrCreatePeerTaskConductor(180ctx, taskID, request, limit, parent, rg, desiredLocation, seed)181}182
183if err != nil {184return nil, err185}186
187if created {188if err = ptc.start(); err != nil {189return nil, err190}191}192return ptc, err193}
194
195// getOrCreatePeerTaskConductor will get or create a peerTaskConductor,
196// if created, return (ptc, true, nil), otherwise return (ptc, false, nil)
197func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(198ctx context.Context,199taskID string,200request *schedulerv1.PeerTaskRequest,201limit rate.Limit,202parent *peerTaskConductor,203rg *nethttp.Range,204desiredLocation string,205seed bool) (*peerTaskConductor, bool, error) {206if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {207logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)208return ptc, false, nil209}210ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)211
212ptm.conductorLock.Lock()213// double check214if p, ok := ptm.findPeerTaskConductor(taskID); ok {215ptm.conductorLock.Unlock()216logger.Debugf("peer task found: %s/%s", p.taskID, p.peerID)217if seed && !p.seed && !p.needBackSource.Load() {218p.Warnf("new seed request received, switch to back source, may be produced by multiple schedulers")219p.markBackSource()220}221metrics.PeerTaskCacheHitCount.Add(1)222return p, false, nil223}224ptm.runningPeerTasks.Store(taskID, ptc)225ptm.conductorLock.Unlock()226metrics.PeerTaskCount.Add(1)227logger.Debugf("peer task created: %s/%s", ptc.taskID, ptc.peerID)228
229err := ptc.initStorage(desiredLocation)230if err != nil {231ptc.Errorf("init storage error: %s", err)232ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())233return nil, false, err234}235return ptc, true, nil236}
237
238func (ptm *peerTaskManager) createSplitedPeerTaskConductor(239ctx context.Context,240taskID string,241request *schedulerv1.PeerTaskRequest,242limit rate.Limit,243parent *peerTaskConductor,244rg *nethttp.Range,245desiredLocation string,246seed bool) (*peerTaskConductor, bool, error) {247ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)248
249ptm.runningPeerTasks.Store(taskID+"/"+ptc.peerID, ptc)250metrics.PeerTaskCount.Add(1)251logger.Debugf("standalone peer task created: %s/%s", ptc.taskID, ptc.peerID)252
253err := ptc.initStorage(desiredLocation)254if err != nil {255ptc.Errorf("init storage error: %s", err)256ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())257return nil, false, err258}259return ptc, true, nil260}
261
262func (ptm *peerTaskManager) enabledPrefetch(rg *nethttp.Range) bool {263return ptm.Prefetch && rg != nil264}
265
266func (ptm *peerTaskManager) prefetchParentTask(request *schedulerv1.PeerTaskRequest, desiredLocation string) *peerTaskConductor {267req := &schedulerv1.PeerTaskRequest{268Url: request.Url,269PeerId: request.PeerId,270PeerHost: ptm.PeerHost,271IsMigrating: request.IsMigrating,272UrlMeta: &commonv1.UrlMeta{273Digest: request.UrlMeta.Digest,274Tag: request.UrlMeta.Tag,275Filter: request.UrlMeta.Filter,276Header: map[string]string{},277},278}279for k, v := range request.UrlMeta.Header {280if k == headers.Range {281continue282}283req.UrlMeta.Header[k] = v284}285taskID := idgen.TaskIDV1(req.Url, req.UrlMeta)286req.PeerId = idgen.PeerIDV1(req.PeerHost.Ip)287
288var limit = rate.Inf289if ptm.PerPeerRateLimit > 0 {290limit = ptm.PerPeerRateLimit291}292
293logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)294prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false)295if err != nil {296logger.Errorf("prefetch peer task %s error: %s", taskID, err)297return nil298}299
300if prefetch != nil && prefetch.peerID == req.PeerId {301metrics.PrefetchTaskCount.Add(1)302}303return prefetch304}
305
306func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, error) {307if req.KeepOriginalOffset && !ptm.Prefetch {308return nil, fmt.Errorf("please enable prefetch when use original offset feature")309}310if ptm.Multiplex {311progress, ok := ptm.tryReuseFilePeerTask(ctx, req)312if ok {313metrics.PeerTaskCacheHitCount.Add(1)314return progress, nil315}316}317// TODO ensure scheduler is ok first318var limit = rate.Inf319if ptm.PerPeerRateLimit > 0 {320limit = ptm.PerPeerRateLimit321}322if req.Limit > 0 {323limit = rate.Limit(req.Limit)324}325ctx, pt, err := ptm.newFileTask(ctx, req, limit)326if err != nil {327return nil, err328}329
330// FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry331progress, err := pt.Start(ctx)332return progress, err333}
334
335func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTaskRequest) (io.ReadCloser, map[string]string, error) {336peerTaskRequest := &schedulerv1.PeerTaskRequest{337Url: req.URL,338UrlMeta: req.URLMeta,339PeerId: req.PeerID,340PeerHost: ptm.PeerHost,341IsMigrating: false,342}343
344taskID := idgen.TaskIDV1(req.URL, req.URLMeta)345if ptm.Multiplex {346// try breakpoint resume for task has range header347if req.Range != nil && !ptm.SplitRunningTasks {348// find running parent task349parentTaskID := idgen.ParentTaskIDV1(req.URL, req.URLMeta)350parentTask, ok := ptm.findPeerTaskConductor(parentTaskID)351if ok && parentTask.GetContentLength() > 0 {352// only allow resume for range from breakpoint to end353if req.Range.Start+req.Range.Length == parentTask.GetContentLength() {354pt := ptm.newResumeStreamTask(ctx, parentTask, req.Range)355return pt.Start(ctx)356}357}358}359
360// reuse by completed task361r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, taskID, req)362if ok {363metrics.PeerTaskCacheHitCount.Add(1)364return r, attr, nil365}366}367
368pt, err := ptm.newStreamTask(ctx, taskID, peerTaskRequest, req.Range)369if err != nil {370return nil, nil, err371}372
373// FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry374readCloser, attribute, err := pt.Start(ctx)375return readCloser, attribute, err376}
377
378func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, reuse bool, err error) {379response, ok := ptm.tryReuseSeedPeerTask(ctx, req)380if ok {381metrics.PeerTaskCacheHitCount.Add(1)382return response, true, nil383}384
385var limit = rate.Inf386if ptm.PerPeerRateLimit > 0 {387limit = ptm.PerPeerRateLimit388}389if req.Limit > 0 {390limit = rate.Limit(req.Limit)391}392
393response, err = ptm.newSeedTask(ctx, req, limit)394if err != nil {395return nil, false, err396}397
398return response, false, nil399}
400
401type SubscribeResponse struct {402Storage storage.TaskStorageDriver403PieceInfoChannel chan *PieceInfo404Success chan struct{}405Fail chan struct{}406FailReason func() error407}
408
409func (ptm *peerTaskManager) getRunningTaskKey(taskID, peerID string) string {410if ptm.SplitRunningTasks {411return taskID + "/" + peerID412}413return taskID414}
415
416func (ptm *peerTaskManager) Subscribe(request *commonv1.PieceTaskRequest) (*SubscribeResponse, bool) {417ptc, ok := ptm.findPeerTaskConductor(ptm.getRunningTaskKey(request.TaskId, request.DstPid))418if !ok {419return nil, false420}421
422result := &SubscribeResponse{423Storage: ptc.storage,424PieceInfoChannel: ptc.broker.Subscribe(),425Success: ptc.successCh,426Fail: ptc.failCh,427FailReason: ptc.getFailedError,428}429return result, true430}
431
432func (ptm *peerTaskManager) Stop(ctx context.Context) error {433// TODO434if ptm.trafficShaper != nil {435ptm.trafficShaper.Stop()436}437return nil438}
439
440func (ptm *peerTaskManager) PeerTaskDone(taskID, peerID string) {441key := ptm.getRunningTaskKey(taskID, peerID)442logger.Debugf("delete done task %s in running tasks", key)443ptm.runningPeerTasks.Delete(key)444if ptm.trafficShaper != nil {445ptm.trafficShaper.RemoveTask(key)446}447}
448
449func (ptm *peerTaskManager) IsPeerTaskRunning(taskID, peerID string) (Task, bool) {450ptc, ok := ptm.runningPeerTasks.Load(ptm.getRunningTaskKey(taskID, peerID))451if ok {452return ptc.(*peerTaskConductor), ok453}454return nil, ok455}
456
457func (ptm *peerTaskManager) StatTask(ctx context.Context, taskID string) (*schedulerv1.Task, error) {458req := &schedulerv1.StatTaskRequest{459TaskId: taskID,460}461
462return ptm.SchedulerClient.StatTask(ctx, req)463}
464
465func (ptm *peerTaskManager) GetPieceManager() PieceManager {466return ptm.PieceManager467}
468
469func (ptm *peerTaskManager) AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType commonv1.TaskType, urlMeta *commonv1.UrlMeta) error {470// Check if the given task is completed in local StorageManager.471if ptm.StorageManager.FindCompletedTask(meta.TaskID) == nil {472return errors.New("task not found in local storage")473}474
475// Prepare AnnounceTaskRequest.476totalPieces, err := ptm.StorageManager.GetTotalPieces(ctx, &meta)477if err != nil {478return err479}480
481piecePacket, err := ptm.StorageManager.GetPieces(ctx, &commonv1.PieceTaskRequest{482TaskId: meta.TaskID,483DstPid: meta.PeerID,484StartNum: 0,485Limit: uint32(totalPieces),486})487if err != nil {488return err489}490piecePacket.DstAddr = fmt.Sprintf("%s:%d", ptm.PeerHost.Ip, ptm.PeerHost.DownPort)491
492// Announce peer task to scheduler493if err := ptm.SchedulerClient.AnnounceTask(ctx, &schedulerv1.AnnounceTaskRequest{494TaskId: meta.TaskID,495TaskType: taskType,496Url: url,497UrlMeta: urlMeta,498PeerHost: ptm.PeerHost,499PiecePacket: piecePacket,500}); err != nil {501return err502}503
504return nil505}
506