Dragonfly2
1387 строк · 53.1 Кб
1/*
2* Copyright 2023 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package service18
19import (20"context"21"fmt"22"io"23"time"24
25"google.golang.org/grpc/codes"26"google.golang.org/grpc/status"27"google.golang.org/protobuf/proto"28"google.golang.org/protobuf/types/known/durationpb"29"google.golang.org/protobuf/types/known/timestamppb"30
31commonv2 "d7y.io/api/v2/pkg/apis/common/v2"32dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"33schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"34
35logger "d7y.io/dragonfly/v2/internal/dflog"36"d7y.io/dragonfly/v2/pkg/container/set"37"d7y.io/dragonfly/v2/pkg/digest"38"d7y.io/dragonfly/v2/pkg/net/http"39"d7y.io/dragonfly/v2/pkg/types"40"d7y.io/dragonfly/v2/scheduler/config"41"d7y.io/dragonfly/v2/scheduler/metrics"42"d7y.io/dragonfly/v2/scheduler/networktopology"43"d7y.io/dragonfly/v2/scheduler/resource"44"d7y.io/dragonfly/v2/scheduler/scheduling"45"d7y.io/dragonfly/v2/scheduler/storage"46)
47
48// V2 is the interface for v2 version of the service.
49type V2 struct {50// Resource interface.51resource resource.Resource52
53// Scheduling interface.54scheduling scheduling.Scheduling55
56// Scheduler service config.57config *config.Config58
59// Dynamic config.60dynconfig config.DynconfigInterface61
62// Storage interface.63storage storage.Storage64
65// Network topology interface.66networkTopology networktopology.NetworkTopology67}
68
69// New v2 version of service instance.
70func NewV2(71cfg *config.Config,72resource resource.Resource,73scheduling scheduling.Scheduling,74dynconfig config.DynconfigInterface,75storage storage.Storage,76networkTopology networktopology.NetworkTopology,77) *V2 {78return &V2{79resource: resource,80scheduling: scheduling,81config: cfg,82dynconfig: dynconfig,83storage: storage,84networkTopology: networkTopology,85}86}
87
88// AnnouncePeer announces peer to scheduler.
89func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error {90ctx, cancel := context.WithCancel(stream.Context())91defer cancel()92
93for {94select {95case <-ctx.Done():96logger.Info("context was done")97return ctx.Err()98default:99}100
101req, err := stream.Recv()102if err != nil {103if err == io.EOF {104return nil105}106
107logger.Errorf("receive error: %s", err.Error())108return err109}110
111log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())112switch announcePeerRequest := req.GetRequest().(type) {113case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:114registerPeerRequest := announcePeerRequest.RegisterPeerRequest115log.Infof("receive RegisterPeerRequest, url: %s, range: %#v, header: %#v, need back-to-source: %t",116registerPeerRequest.Download.GetUrl(), registerPeerRequest.Download.GetRange(), registerPeerRequest.Download.GetRequestHeader(), registerPeerRequest.Download.GetNeedBackToSource())117if err := v.handleRegisterPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), registerPeerRequest); err != nil {118log.Error(err)119return err120}121case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:122log.Info("receive DownloadPeerStartedRequest")123if err := v.handleDownloadPeerStartedRequest(ctx, req.GetPeerId()); err != nil {124log.Error(err)125return err126}127case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:128log.Info("receive DownloadPeerBackToSourceStartedRequest")129if err := v.handleDownloadPeerBackToSourceStartedRequest(ctx, req.GetPeerId()); err != nil {130log.Error(err)131return err132}133case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:134rescheduleRequest := announcePeerRequest.RescheduleRequest135
136log.Infof("receive RescheduleRequest description: %s", rescheduleRequest.GetDescription())137if err := v.handleRescheduleRequest(ctx, req.GetPeerId(), rescheduleRequest.GetCandidateParents()); err != nil {138log.Error(err)139return err140}141case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:142downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest143log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())144if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {145log.Error(err)146return err147}148case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:149downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest150log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())151if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {152log.Error(err)153return err154}155case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:156log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())157if err := v.handleDownloadPeerFailedRequest(ctx, req.GetPeerId()); err != nil {158log.Error(err)159return err160}161case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:162log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())163if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.GetPeerId()); err != nil {164log.Error(err)165return err166}167case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:168piece := announcePeerRequest.DownloadPieceFinishedRequest.Piece169log.Infof("receive DownloadPieceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())170if err := v.handleDownloadPieceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFinishedRequest); err != nil {171log.Error(err)172return err173}174case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:175piece := announcePeerRequest.DownloadPieceBackToSourceFinishedRequest.Piece176log.Infof("receive DownloadPieceBackToSourceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())177if err := v.handleDownloadPieceBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFinishedRequest); err != nil {178log.Error(err)179return err180}181case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:182downloadPieceFailedRequest := announcePeerRequest.DownloadPieceFailedRequest183log.Infof("receive DownloadPieceFailedRequest, piece number: %d, temporary: %t, parent id: %s", downloadPieceFailedRequest.GetPieceNumber(), downloadPieceFailedRequest.GetTemporary(), downloadPieceFailedRequest.GetParentId())184if err := v.handleDownloadPieceFailedRequest(ctx, req.GetPeerId(), downloadPieceFailedRequest); err != nil {185log.Error(err)186return err187}188case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:189downloadPieceBackToSourceFailedRequest := announcePeerRequest.DownloadPieceBackToSourceFailedRequest190log.Infof("receive DownloadPieceBackToSourceFailedRequest, piece number: %d", downloadPieceBackToSourceFailedRequest.GetPieceNumber())191if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.GetPeerId(), downloadPieceBackToSourceFailedRequest); err != nil {192log.Error(err)193return err194}195default:196msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest)197log.Error(msg)198return status.Error(codes.FailedPrecondition, msg)199}200}201}
202
203// StatPeer checks information of peer.
204func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {205logger.WithTaskID(req.GetTaskId()).Infof("stat peer request: %#v", req)206
207peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())208if !loaded {209return nil, status.Errorf(codes.NotFound, "peer %s not found", req.GetPeerId())210}211
212resp := &commonv2.Peer{213Id: peer.ID,214Priority: peer.Priority,215Cost: durationpb.New(peer.Cost.Load()),216State: peer.FSM.Current(),217NeedBackToSource: peer.NeedBackToSource.Load(),218CreatedAt: timestamppb.New(peer.CreatedAt.Load()),219UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()),220}221
222// Set range to response.223if peer.Range != nil {224resp.Range = &commonv2.Range{225Start: uint64(peer.Range.Start),226Length: uint64(peer.Range.Length),227}228}229
230// Set pieces to response.231peer.Pieces.Range(func(key, value any) bool {232piece, ok := value.(*resource.Piece)233if !ok {234peer.Log.Errorf("invalid piece %s %#v", key, value)235return true236}237
238respPiece := &commonv2.Piece{239Number: uint32(piece.Number),240ParentId: &piece.ParentID,241Offset: piece.Offset,242Length: piece.Length,243TrafficType: &piece.TrafficType,244Cost: durationpb.New(piece.Cost),245CreatedAt: timestamppb.New(piece.CreatedAt),246}247
248if piece.Digest != nil {249respPiece.Digest = piece.Digest.String()250}251
252resp.Pieces = append(resp.Pieces, respPiece)253return true254})255
256// Set task to response.257resp.Task = &commonv2.Task{258Id: peer.Task.ID,259Type: peer.Task.Type,260Url: peer.Task.URL,261Tag: &peer.Task.Tag,262Application: &peer.Task.Application,263FilteredQueryParams: peer.Task.FilteredQueryParams,264RequestHeader: peer.Task.Header,265PieceLength: uint32(peer.Task.PieceLength),266ContentLength: uint64(peer.Task.ContentLength.Load()),267PieceCount: uint32(peer.Task.TotalPieceCount.Load()),268SizeScope: peer.Task.SizeScope(),269State: peer.Task.FSM.Current(),270PeerCount: uint32(peer.Task.PeerCount()),271CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()),272UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()),273}274
275// Set digest to task response.276if peer.Task.Digest != nil {277dgst := peer.Task.Digest.String()278resp.Task.Digest = &dgst279}280
281// Set pieces to task response.282peer.Task.Pieces.Range(func(key, value any) bool {283piece, ok := value.(*resource.Piece)284if !ok {285peer.Task.Log.Errorf("invalid piece %s %#v", key, value)286return true287}288
289respPiece := &commonv2.Piece{290Number: uint32(piece.Number),291ParentId: &piece.ParentID,292Offset: piece.Offset,293Length: piece.Length,294TrafficType: &piece.TrafficType,295Cost: durationpb.New(piece.Cost),296CreatedAt: timestamppb.New(piece.CreatedAt),297}298
299if piece.Digest != nil {300respPiece.Digest = piece.Digest.String()301}302
303resp.Task.Pieces = append(resp.Task.Pieces, respPiece)304return true305})306
307// Set host to response.308resp.Host = &commonv2.Host{309Id: peer.Host.ID,310Type: uint32(peer.Host.Type),311Hostname: peer.Host.Hostname,312Ip: peer.Host.IP,313Port: peer.Host.Port,314DownloadPort: peer.Host.DownloadPort,315Os: peer.Host.OS,316Platform: peer.Host.Platform,317PlatformFamily: peer.Host.PlatformFamily,318PlatformVersion: peer.Host.PlatformVersion,319KernelVersion: peer.Host.KernelVersion,320Cpu: &commonv2.CPU{321LogicalCount: peer.Host.CPU.LogicalCount,322PhysicalCount: peer.Host.CPU.PhysicalCount,323Percent: peer.Host.CPU.Percent,324ProcessPercent: peer.Host.CPU.ProcessPercent,325Times: &commonv2.CPUTimes{326User: peer.Host.CPU.Times.User,327System: peer.Host.CPU.Times.System,328Idle: peer.Host.CPU.Times.Idle,329Nice: peer.Host.CPU.Times.Nice,330Iowait: peer.Host.CPU.Times.Iowait,331Irq: peer.Host.CPU.Times.Irq,332Softirq: peer.Host.CPU.Times.Softirq,333Steal: peer.Host.CPU.Times.Steal,334Guest: peer.Host.CPU.Times.Guest,335GuestNice: peer.Host.CPU.Times.GuestNice,336},337},338Memory: &commonv2.Memory{339Total: peer.Host.Memory.Total,340Available: peer.Host.Memory.Available,341Used: peer.Host.Memory.Used,342UsedPercent: peer.Host.Memory.UsedPercent,343ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,344Free: peer.Host.Memory.Free,345},346Network: &commonv2.Network{347TcpConnectionCount: peer.Host.Network.TCPConnectionCount,348UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,349Location: &peer.Host.Network.Location,350Idc: &peer.Host.Network.IDC,351},352Disk: &commonv2.Disk{353Total: peer.Host.Disk.Total,354Free: peer.Host.Disk.Free,355Used: peer.Host.Disk.Used,356UsedPercent: peer.Host.Disk.UsedPercent,357InodesTotal: peer.Host.Disk.InodesTotal,358InodesUsed: peer.Host.Disk.InodesUsed,359InodesFree: peer.Host.Disk.InodesFree,360InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,361},362Build: &commonv2.Build{363GitVersion: peer.Host.Build.GitVersion,364GitCommit: &peer.Host.Build.GitCommit,365GoVersion: &peer.Host.Build.GoVersion,366Platform: &peer.Host.Build.Platform,367},368}369
370return resp, nil371}
372
373// LeavePeer releases peer in scheduler.
374func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error {375log := logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId())376log.Infof("leave peer request: %#v", req)377
378peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())379if !loaded {380msg := fmt.Sprintf("peer %s not found", req.GetPeerId())381log.Error(msg)382return status.Error(codes.NotFound, msg)383}384
385if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil {386msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())387peer.Log.Error(msg)388return status.Error(codes.FailedPrecondition, msg)389}390
391return nil392}
393
394// TODO Implement function.
395// ExchangePeer exchanges peer information.
396func (v *V2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest) (*schedulerv2.ExchangePeerResponse, error) {397return nil, nil398}
399
400// StatTask checks information of task.
401func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) {402log := logger.WithTaskID(req.GetId())403log.Infof("stat task request: %#v", req)404
405task, loaded := v.resource.TaskManager().Load(req.GetId())406if !loaded {407msg := fmt.Sprintf("task %s not found", req.GetId())408log.Error(msg)409return nil, status.Error(codes.NotFound, msg)410}411
412resp := &commonv2.Task{413Id: task.ID,414Type: task.Type,415Url: task.URL,416Tag: &task.Tag,417Application: &task.Application,418FilteredQueryParams: task.FilteredQueryParams,419RequestHeader: task.Header,420PieceLength: uint32(task.PieceLength),421ContentLength: uint64(task.ContentLength.Load()),422PieceCount: uint32(task.TotalPieceCount.Load()),423SizeScope: task.SizeScope(),424State: task.FSM.Current(),425PeerCount: uint32(task.PeerCount()),426CreatedAt: timestamppb.New(task.CreatedAt.Load()),427UpdatedAt: timestamppb.New(task.UpdatedAt.Load()),428}429
430// Set digest to response.431if task.Digest != nil {432dgst := task.Digest.String()433resp.Digest = &dgst434}435
436// Set pieces to response.437task.Pieces.Range(func(key, value any) bool {438piece, ok := value.(*resource.Piece)439if !ok {440task.Log.Errorf("invalid piece %s %#v", key, value)441return true442}443
444respPiece := &commonv2.Piece{445Number: uint32(piece.Number),446ParentId: &piece.ParentID,447Offset: piece.Offset,448Length: piece.Length,449TrafficType: &piece.TrafficType,450Cost: durationpb.New(piece.Cost),451CreatedAt: timestamppb.New(piece.CreatedAt),452}453
454if piece.Digest != nil {455respPiece.Digest = piece.Digest.String()456}457
458resp.Pieces = append(resp.Pieces, respPiece)459return true460})461
462return resp, nil463}
464
465// AnnounceHost announces host to scheduler.
466func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) error {467logger.WithHostID(req.Host.GetId()).Infof("announce host request: %#v", req.GetHost())468
469// Get scheduler cluster client config by manager.470var concurrentUploadLimit int32471if clientConfig, err := v.dynconfig.GetSchedulerClusterClientConfig(); err == nil {472concurrentUploadLimit = int32(clientConfig.LoadLimit)473}474
475host, loaded := v.resource.HostManager().Load(req.Host.GetId())476if !loaded {477options := []resource.HostOption{478resource.WithOS(req.Host.GetOs()),479resource.WithPlatform(req.Host.GetPlatform()),480resource.WithPlatformFamily(req.Host.GetPlatformFamily()),481resource.WithPlatformVersion(req.Host.GetPlatformVersion()),482resource.WithKernelVersion(req.Host.GetKernelVersion()),483}484
485if concurrentUploadLimit > 0 {486options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit))487}488
489if req.Host.GetCpu() != nil {490options = append(options, resource.WithCPU(resource.CPU{491LogicalCount: req.Host.Cpu.GetLogicalCount(),492PhysicalCount: req.Host.Cpu.GetPhysicalCount(),493Percent: req.Host.Cpu.GetPercent(),494ProcessPercent: req.Host.Cpu.GetProcessPercent(),495Times: resource.CPUTimes{496User: req.Host.Cpu.Times.GetUser(),497System: req.Host.Cpu.Times.GetSystem(),498Idle: req.Host.Cpu.Times.GetIdle(),499Nice: req.Host.Cpu.Times.GetNice(),500Iowait: req.Host.Cpu.Times.GetIowait(),501Irq: req.Host.Cpu.Times.GetIrq(),502Softirq: req.Host.Cpu.Times.GetSoftirq(),503Steal: req.Host.Cpu.Times.GetSteal(),504Guest: req.Host.Cpu.Times.GetGuest(),505GuestNice: req.Host.Cpu.Times.GetGuest(),506},507}))508}509
510if req.Host.GetMemory() != nil {511options = append(options, resource.WithMemory(resource.Memory{512Total: req.Host.Memory.GetTotal(),513Available: req.Host.Memory.GetAvailable(),514Used: req.Host.Memory.GetUsed(),515UsedPercent: req.Host.Memory.GetUsedPercent(),516ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),517Free: req.Host.Memory.GetFree(),518}))519}520
521if req.Host.GetNetwork() != nil {522options = append(options, resource.WithNetwork(resource.Network{523TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),524UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),525Location: req.Host.Network.GetLocation(),526IDC: req.Host.Network.GetIdc(),527}))528}529
530if req.Host.GetDisk() != nil {531options = append(options, resource.WithDisk(resource.Disk{532Total: req.Host.Disk.GetTotal(),533Free: req.Host.Disk.GetFree(),534Used: req.Host.Disk.GetUsed(),535UsedPercent: req.Host.Disk.GetUsedPercent(),536InodesTotal: req.Host.Disk.GetInodesTotal(),537InodesUsed: req.Host.Disk.GetInodesUsed(),538InodesFree: req.Host.Disk.GetInodesFree(),539InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),540}))541}542
543if req.Host.GetBuild() != nil {544options = append(options, resource.WithBuild(resource.Build{545GitVersion: req.Host.Build.GetGitVersion(),546GitCommit: req.Host.Build.GetGitCommit(),547GoVersion: req.Host.Build.GetGoVersion(),548Platform: req.Host.Build.GetPlatform(),549}))550}551
552if req.Host.GetSchedulerClusterId() != 0 {553options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))554}555
556host = resource.NewHost(557req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),558req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),559options...,560)561
562v.resource.HostManager().Store(host)563host.Log.Infof("announce new host: %#v", req)564return nil565}566
567// Host already exists and updates properties.568host.Port = req.Host.GetPort()569host.DownloadPort = req.Host.GetDownloadPort()570host.Type = types.HostType(req.Host.GetType())571host.OS = req.Host.GetOs()572host.Platform = req.Host.GetPlatform()573host.PlatformFamily = req.Host.GetPlatformFamily()574host.PlatformVersion = req.Host.GetPlatformVersion()575host.KernelVersion = req.Host.GetKernelVersion()576host.UpdatedAt.Store(time.Now())577
578if concurrentUploadLimit > 0 {579host.ConcurrentUploadLimit.Store(concurrentUploadLimit)580}581
582if req.Host.GetCpu() != nil {583host.CPU = resource.CPU{584LogicalCount: req.Host.Cpu.GetLogicalCount(),585PhysicalCount: req.Host.Cpu.GetPhysicalCount(),586Percent: req.Host.Cpu.GetPercent(),587ProcessPercent: req.Host.Cpu.GetProcessPercent(),588Times: resource.CPUTimes{589User: req.Host.Cpu.Times.GetUser(),590System: req.Host.Cpu.Times.GetSystem(),591Idle: req.Host.Cpu.Times.GetIdle(),592Nice: req.Host.Cpu.Times.GetNice(),593Iowait: req.Host.Cpu.Times.GetIowait(),594Irq: req.Host.Cpu.Times.GetIrq(),595Softirq: req.Host.Cpu.Times.GetSoftirq(),596Steal: req.Host.Cpu.Times.GetSteal(),597Guest: req.Host.Cpu.Times.GetGuest(),598GuestNice: req.Host.Cpu.Times.GetGuestNice(),599},600}601}602
603if req.Host.GetMemory() != nil {604host.Memory = resource.Memory{605Total: req.Host.Memory.GetTotal(),606Available: req.Host.Memory.GetAvailable(),607Used: req.Host.Memory.GetUsed(),608UsedPercent: req.Host.Memory.GetUsedPercent(),609ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),610Free: req.Host.Memory.GetFree(),611}612}613
614if req.Host.GetNetwork() != nil {615host.Network = resource.Network{616TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),617UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),618Location: req.Host.Network.GetLocation(),619IDC: req.Host.Network.GetIdc(),620}621}622
623if req.Host.GetDisk() != nil {624host.Disk = resource.Disk{625Total: req.Host.Disk.GetTotal(),626Free: req.Host.Disk.GetFree(),627Used: req.Host.Disk.GetUsed(),628UsedPercent: req.Host.Disk.GetUsedPercent(),629InodesTotal: req.Host.Disk.GetInodesTotal(),630InodesUsed: req.Host.Disk.GetInodesUsed(),631InodesFree: req.Host.Disk.GetInodesFree(),632InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),633}634}635
636if req.Host.GetBuild() != nil {637host.Build = resource.Build{638GitVersion: req.Host.Build.GetGitVersion(),639GitCommit: req.Host.Build.GetGitCommit(),640GoVersion: req.Host.Build.GetGoVersion(),641Platform: req.Host.Build.GetPlatform(),642}643}644
645return nil646}
647
648// LeaveHost releases host in scheduler.
649func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) error {650log := logger.WithHostID(req.GetId())651log.Infof("leave host request: %#v", req)652
653host, loaded := v.resource.HostManager().Load(req.GetId())654if !loaded {655msg := fmt.Sprintf("host %s not found", req.GetId())656log.Error(msg)657return status.Error(codes.NotFound, msg)658}659
660// Leave peers in host.661host.LeavePeers()662
663// Delete host from network topology.664if v.networkTopology != nil {665if err := v.networkTopology.DeleteHost(host.ID); err != nil {666log.Errorf("delete network topology host error: %s", err.Error())667return err668}669}670
671return nil672}
673
674// SyncProbes sync probes of the host.
675func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {676if v.networkTopology == nil {677return status.Errorf(codes.Unimplemented, "network topology is not enabled")678}679
680for {681req, err := stream.Recv()682if err != nil {683if err == io.EOF {684return nil685}686
687logger.Errorf("receive error: %s", err.Error())688return err689}690
691log := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp())692switch syncProbesRequest := req.GetRequest().(type) {693case *schedulerv2.SyncProbesRequest_ProbeStartedRequest:694// Find probed hosts in network topology. Based on the source host information,695// the most candidate hosts will be evaluated.696log.Info("receive SyncProbesRequest_ProbeStartedRequest")697hosts, err := v.networkTopology.FindProbedHosts(req.Host.GetId())698if err != nil {699log.Error(err)700return status.Error(codes.FailedPrecondition, err.Error())701}702
703var probedHosts []*commonv2.Host704for _, host := range hosts {705probedHosts = append(probedHosts, &commonv2.Host{706Id: host.ID,707Type: uint32(host.Type),708Hostname: host.Hostname,709Ip: host.IP,710Port: host.Port,711DownloadPort: host.DownloadPort,712Os: host.OS,713Platform: host.Platform,714PlatformFamily: host.PlatformFamily,715PlatformVersion: host.PlatformVersion,716KernelVersion: host.KernelVersion,717Cpu: &commonv2.CPU{718LogicalCount: host.CPU.LogicalCount,719PhysicalCount: host.CPU.PhysicalCount,720Percent: host.CPU.Percent,721ProcessPercent: host.CPU.ProcessPercent,722Times: &commonv2.CPUTimes{723User: host.CPU.Times.User,724System: host.CPU.Times.System,725Idle: host.CPU.Times.Idle,726Nice: host.CPU.Times.Nice,727Iowait: host.CPU.Times.Iowait,728Irq: host.CPU.Times.Irq,729Softirq: host.CPU.Times.Softirq,730Steal: host.CPU.Times.Steal,731Guest: host.CPU.Times.Guest,732GuestNice: host.CPU.Times.GuestNice,733},734},735Memory: &commonv2.Memory{736Total: host.Memory.Total,737Available: host.Memory.Available,738Used: host.Memory.Used,739UsedPercent: host.Memory.UsedPercent,740ProcessUsedPercent: host.Memory.ProcessUsedPercent,741Free: host.Memory.Free,742},743Network: &commonv2.Network{744TcpConnectionCount: host.Network.TCPConnectionCount,745UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,746Location: &host.Network.Location,747Idc: &host.Network.IDC,748},749Disk: &commonv2.Disk{750Total: host.Disk.Total,751Free: host.Disk.Free,752Used: host.Disk.Used,753UsedPercent: host.Disk.UsedPercent,754InodesTotal: host.Disk.InodesTotal,755InodesUsed: host.Disk.InodesUsed,756InodesFree: host.Disk.InodesFree,757InodesUsedPercent: host.Disk.InodesUsedPercent,758},759Build: &commonv2.Build{760GitVersion: host.Build.GitVersion,761GitCommit: &host.Build.GitCommit,762GoVersion: &host.Build.GoVersion,763Platform: &host.Build.Platform,764},765})766}767
768log.Infof("probe started: %#v", probedHosts)769if err := stream.Send(&schedulerv2.SyncProbesResponse{770Hosts: probedHosts,771}); err != nil {772log.Error(err)773return err774}775case *schedulerv2.SyncProbesRequest_ProbeFinishedRequest:776// Store probes in network topology. First create the association between777// source host and destination host, and then store the value of probe.778log.Info("receive SyncProbesRequest_ProbeFinishedRequest")779for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes {780probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id)781if !loaded {782log.Errorf("host %s not found", probe.Host.Id)783continue784}785
786if err := v.networkTopology.Store(req.Host.GetId(), probedHost.ID); err != nil {787log.Errorf("store failed: %s", err.Error())788continue789}790
791if err := v.networkTopology.Probes(req.Host.GetId(), probe.Host.Id).Enqueue(&networktopology.Probe{792Host: probedHost,793RTT: probe.Rtt.AsDuration(),794CreatedAt: probe.CreatedAt.AsTime(),795}); err != nil {796log.Errorf("enqueue failed: %s", err.Error())797continue798}799
800log.Infof("probe finished: %#v", probe)801}802case *schedulerv2.SyncProbesRequest_ProbeFailedRequest:803// Log failed probes.804log.Info("receive SyncProbesRequest_ProbeFailedRequest")805var failedProbedHostIDs []string806for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes {807failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id)808}809
810log.Warnf("probe failed: %#v", failedProbedHostIDs)811default:812msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest)813log.Error(msg)814return status.Error(codes.FailedPrecondition, msg)815}816}817}
818
819// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
820func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {821// Handle resource included host, task, and peer.822_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())823if err != nil {824return err825}826
827// Collect RegisterPeerCount metrics.828priority := peer.CalculatePriority(v.dynconfig)829metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),830peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()831
832blocklist := set.NewSafeSet[string]()833blocklist.Add(peer.ID)834download := proto.Clone(req.Download).(*commonv2.Download)835switch {836// If scheduler trigger seed peer download back-to-source,837// the needBackToSource flag should be true.838case download.GetNeedBackToSource():839peer.Log.Infof("peer need back to source")840peer.NeedBackToSource.Store(true)841// If task is pending, failed, leave, or succeeded and has no available peer,842// scheduler trigger seed peer download back-to-source.843case task.FSM.Is(resource.TaskStatePending) ||844task.FSM.Is(resource.TaskStateFailed) ||845task.FSM.Is(resource.TaskStateLeave) ||846task.FSM.Is(resource.TaskStateSucceeded) &&847!task.HasAvailablePeer(blocklist):848// If trigger the seed peer download back-to-source,849// the need back-to-source flag should be true.850download.NeedBackToSource = true851
852// Output path should be empty, prevent the seed peer853// copy file to output path.854download.OutputPath = nil855if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {856// Collect RegisterPeerFailureCount metrics.857metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),858peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()859return err860}861}862
863// Handle task with peer register request.864if !peer.Task.FSM.Is(resource.TaskStateRunning) {865if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {866// Collect RegisterPeerFailureCount metrics.867metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),868peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()869return status.Error(codes.Internal, err.Error())870}871} else {872peer.Task.UpdatedAt.Store(time.Now())873}874
875// FSM event state transition by size scope.876sizeScope := peer.Task.SizeScope()877switch sizeScope {878case commonv2.SizeScope_EMPTY:879// Return an EmptyTaskResponse directly.880peer.Log.Info("scheduling as SizeScope_EMPTY")881stream, loaded := peer.LoadAnnouncePeerStream()882if !loaded {883return status.Error(codes.NotFound, "AnnouncePeerStream not found")884}885
886if err := peer.FSM.Event(ctx, resource.PeerEventRegisterEmpty); err != nil {887return status.Errorf(codes.Internal, err.Error())888}889
890if err := stream.Send(&schedulerv2.AnnouncePeerResponse{891Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{892EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},893},894}); err != nil {895peer.Log.Error(err)896return status.Error(codes.Internal, err.Error())897}898
899return nil900case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW:901peer.Log.Info("scheduling as SizeScope_NORMAL")902if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil {903return status.Error(codes.Internal, err.Error())904}905
906// Scheduling parent for the peer.907peer.BlockParents.Add(peer.ID)908if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {909// Collect RegisterPeerFailureCount metrics.910metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),911peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()912return status.Error(codes.FailedPrecondition, err.Error())913}914
915return nil916default:917return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)918}919}
920
921// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
922func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string) error {923peer, loaded := v.resource.PeerManager().Load(peerID)924if !loaded {925return status.Errorf(codes.NotFound, "peer %s not found", peerID)926}927
928// Collect DownloadPeerStartedCount metrics.929priority := peer.CalculatePriority(v.dynconfig)930metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),931peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()932
933// Handle peer with peer started request.934if !peer.FSM.Is(resource.PeerStateRunning) {935if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {936// Collect DownloadPeerStartedFailureCount metrics.937metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),938peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()939return status.Error(codes.Internal, err.Error())940}941}942
943return nil944}
945
946// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest.
947func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, peerID string) error {948peer, loaded := v.resource.PeerManager().Load(peerID)949if !loaded {950return status.Errorf(codes.NotFound, "peer %s not found", peerID)951}952
953// Collect DownloadPeerBackToSourceStartedCount metrics.954priority := peer.CalculatePriority(v.dynconfig)955metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),956peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()957
958// Handle peer with peer back-to-source started request.959if !peer.FSM.Is(resource.PeerStateRunning) {960if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {961// Collect DownloadPeerBackToSourceStartedFailureCount metrics.962metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),963peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()964return status.Error(codes.Internal, err.Error())965}966}967
968return nil969}
970
971// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
972func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string, candidateParents []*commonv2.Peer) error {973peer, loaded := v.resource.PeerManager().Load(peerID)974if !loaded {975return status.Errorf(codes.NotFound, "peer %s not found", peerID)976}977
978// Add candidate parent ids to block parents.979for _, candidateParent := range candidateParents {980peer.BlockParents.Add(candidateParent.GetId())981}982
983if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {984return status.Error(codes.FailedPrecondition, err.Error())985}986
987return nil988}
989
990// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
991func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error {992peer, loaded := v.resource.PeerManager().Load(peerID)993if !loaded {994return status.Errorf(codes.NotFound, "peer %s not found", peerID)995}996
997// Handle peer with peer finished request.998peer.Cost.Store(time.Since(peer.CreatedAt.Load()))999if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {1000return status.Error(codes.Internal, err.Error())1001}1002
1003// Collect DownloadPeerCount and DownloadPeerDuration metrics.1004priority := peer.CalculatePriority(v.dynconfig)1005metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1006peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1007// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead1008metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load()))1009
1010return nil1011}
1012
1013// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
1014func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) error {1015peer, loaded := v.resource.PeerManager().Load(peerID)1016if !loaded {1017return status.Errorf(codes.NotFound, "peer %s not found", peerID)1018}1019
1020// Handle peer with peer back-to-source finished request.1021peer.Cost.Store(time.Since(peer.CreatedAt.Load()))1022if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {1023return status.Error(codes.Internal, err.Error())1024}1025
1026// Handle task with peer back-to-source finished request, peer can only represent1027// a successful task after downloading the complete task.1028if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {1029peer.Task.ContentLength.Store(int64(req.GetContentLength()))1030peer.Task.TotalPieceCount.Store(int32(req.GetPieceCount()))1031if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {1032return status.Error(codes.Internal, err.Error())1033}1034}1035
1036// Collect DownloadPeerCount and DownloadPeerDuration metrics.1037priority := peer.CalculatePriority(v.dynconfig)1038metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1039peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1040// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead1041metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load()))1042
1043return nil1044}
1045
1046// handleDownloadPeerFailedRequest handles DownloadPeerFailedRequest of AnnouncePeerRequest.
1047func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string) error {1048peer, loaded := v.resource.PeerManager().Load(peerID)1049if !loaded {1050return status.Errorf(codes.NotFound, "peer %s not found", peerID)1051}1052
1053// Handle peer with peer failed request.1054if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil {1055return status.Error(codes.Internal, err.Error())1056}1057
1058// Handle task with peer failed request.1059peer.Task.UpdatedAt.Store(time.Now())1060
1061// Collect DownloadPeerCount and DownloadPeerFailureCount metrics.1062priority := peer.CalculatePriority(v.dynconfig)1063metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1064peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1065metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1066peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1067
1068return nil1069}
1070
1071// handleDownloadPeerBackToSourceFailedRequest handles DownloadPeerBackToSourceFailedRequest of AnnouncePeerRequest.
1072func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, peerID string) error {1073peer, loaded := v.resource.PeerManager().Load(peerID)1074if !loaded {1075return status.Errorf(codes.NotFound, "peer %s not found", peerID)1076}1077
1078// Handle peer with peer back-to-source failed request.1079if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil {1080return status.Error(codes.Internal, err.Error())1081}1082
1083// Handle task with peer back-to-source failed request.1084peer.Task.ContentLength.Store(-1)1085peer.Task.TotalPieceCount.Store(0)1086peer.Task.DirectPiece = []byte{}1087if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadFailed); err != nil {1088return status.Error(codes.Internal, err.Error())1089}1090
1091// Collect DownloadPeerCount and DownloadPeerBackToSourceFailureCount metrics.1092priority := peer.CalculatePriority(v.dynconfig)1093metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1094peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1095metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),1096peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1097
1098return nil1099}
1100
1101// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
1102func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error {1103// Construct piece.1104piece := &resource.Piece{1105Number: int32(req.Piece.GetNumber()),1106ParentID: req.Piece.GetParentId(),1107Offset: req.Piece.GetOffset(),1108Length: req.Piece.GetLength(),1109TrafficType: req.Piece.GetTrafficType(),1110Cost: req.Piece.GetCost().AsDuration(),1111CreatedAt: req.Piece.GetCreatedAt().AsTime(),1112}1113
1114if len(req.Piece.GetDigest()) > 0 {1115d, err := digest.Parse(req.Piece.GetDigest())1116if err != nil {1117return status.Errorf(codes.InvalidArgument, err.Error())1118}1119
1120piece.Digest = d1121}1122
1123peer, loaded := v.resource.PeerManager().Load(peerID)1124if !loaded {1125return status.Errorf(codes.NotFound, "peer %s not found", peerID)1126}1127
1128// Handle peer with piece finished request. When the piece is downloaded successfully, peer.UpdatedAt needs1129// to be updated to prevent the peer from being GC during the download process.1130peer.StorePiece(piece)1131peer.FinishedPieces.Set(uint(piece.Number))1132peer.AppendPieceCost(piece.Cost)1133peer.PieceUpdatedAt.Store(time.Now())1134peer.UpdatedAt.Store(time.Now())1135
1136// When the piece is downloaded successfully, parent.UpdatedAt needs to be updated1137// to prevent the parent from being GC during the download process.1138parent, loadedParent := v.resource.PeerManager().Load(piece.ParentID)1139if loadedParent {1140parent.UpdatedAt.Store(time.Now())1141parent.Host.UpdatedAt.Store(time.Now())1142}1143
1144// Handle task with piece finished request.1145peer.Task.UpdatedAt.Store(time.Now())1146
1147// Collect piece and traffic metrics.1148metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),1149peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1150metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),1151peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length))1152if v.config.Metrics.EnableHost {1153metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,1154peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))1155if loadedParent {1156metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,1157parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length))1158}1159}1160
1161return nil1162}
1163
1164// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
1165func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error {1166// Construct piece.1167piece := &resource.Piece{1168Number: int32(req.Piece.GetNumber()),1169ParentID: req.Piece.GetParentId(),1170Offset: req.Piece.GetOffset(),1171Length: req.Piece.GetLength(),1172TrafficType: req.Piece.GetTrafficType(),1173Cost: req.Piece.GetCost().AsDuration(),1174CreatedAt: req.Piece.GetCreatedAt().AsTime(),1175}1176
1177if len(req.Piece.GetDigest()) > 0 {1178d, err := digest.Parse(req.Piece.GetDigest())1179if err != nil {1180return status.Errorf(codes.InvalidArgument, err.Error())1181}1182
1183piece.Digest = d1184}1185
1186peer, loaded := v.resource.PeerManager().Load(peerID)1187if !loaded {1188return status.Errorf(codes.NotFound, "peer %s not found", peerID)1189}1190
1191// Handle peer with piece back-to-source finished request. When the piece is downloaded successfully, peer.UpdatedAt1192// needs to be updated to prevent the peer from being GC during the download process.1193peer.StorePiece(piece)1194peer.FinishedPieces.Set(uint(piece.Number))1195peer.AppendPieceCost(piece.Cost)1196peer.PieceUpdatedAt.Store(time.Now())1197peer.UpdatedAt.Store(time.Now())1198
1199// Handle task with piece back-to-source finished request.1200peer.Task.StorePiece(piece)1201peer.Task.UpdatedAt.Store(time.Now())1202
1203// Collect piece and traffic metrics.1204metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),1205peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1206metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),1207peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length))1208if v.config.Metrics.EnableHost {1209metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,1210peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))1211}1212
1213return nil1214}
1215
1216// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
1217func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFailedRequest) error {1218peer, loaded := v.resource.PeerManager().Load(peerID)1219if !loaded {1220return status.Errorf(codes.NotFound, "peer %s not found", peerID)1221}1222
1223// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.1224metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),1225peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1226metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),1227peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1228
1229if req.Temporary {1230// Handle peer with piece temporary failed request.1231peer.UpdatedAt.Store(time.Now())1232peer.BlockParents.Add(req.GetParentId())1233if parent, loaded := v.resource.PeerManager().Load(req.GetParentId()); loaded {1234parent.Host.UploadFailedCount.Inc()1235}1236
1237// Handle task with piece temporary failed request.1238peer.Task.UpdatedAt.Store(time.Now())1239return nil1240}1241
1242return status.Error(codes.FailedPrecondition, "download piece failed")1243}
1244
1245// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
1246func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFailedRequest) error {1247peer, loaded := v.resource.PeerManager().Load(peerID)1248if !loaded {1249return status.Errorf(codes.NotFound, "peer %s not found", peerID)1250}1251
1252// Handle peer with piece back-to-source failed request.1253peer.UpdatedAt.Store(time.Now())1254
1255// Handle task with piece back-to-source failed request.1256peer.Task.UpdatedAt.Store(time.Now())1257
1258// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.1259metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),1260peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1261metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),1262peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()1263
1264return status.Error(codes.Internal, "download piece from source failed")1265}
1266
1267// handleResource handles resource included host, task, and peer.
1268func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) {1269// If the host does not exist and the host address cannot be found,1270// it may cause an exception.1271host, loaded := v.resource.HostManager().Load(hostID)1272if !loaded {1273return nil, nil, nil, status.Errorf(codes.NotFound, "host %s not found", hostID)1274}1275
1276// Store new task or update task.1277task, loaded := v.resource.TaskManager().Load(taskID)1278if !loaded {1279options := []resource.TaskOption{resource.WithPieceLength(int32(download.GetPieceLength()))}1280if download.GetDigest() != "" {1281d, err := digest.Parse(download.GetDigest())1282if err != nil {1283return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error())1284}1285
1286// If request has invalid digest, then new task with the nil digest.1287options = append(options, resource.WithDigest(d))1288}1289
1290task = resource.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(),1291download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...)1292v.resource.TaskManager().Store(task)1293} else {1294task.URL = download.GetUrl()1295task.FilteredQueryParams = download.GetFilteredQueryParams()1296task.Header = download.GetRequestHeader()1297}1298
1299// Store new peer or load peer.1300peer, loaded := v.resource.PeerManager().Load(peerID)1301if !loaded {1302options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}1303if download.GetRange() != nil {1304options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))1305}1306
1307peer = resource.NewPeer(peerID, &v.config.Resource, task, host, options...)1308v.resource.PeerManager().Store(peer)1309}1310
1311return host, task, peer, nil1312}
1313
1314// downloadTaskBySeedPeer downloads task by seed peer.
1315func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download *commonv2.Download, peer *resource.Peer) error {1316// Trigger the first download task based on different priority levels,1317// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.1318priority := peer.CalculatePriority(v.dynconfig)1319peer.Log.Infof("peer priority is %s", priority.String())1320switch priority {1321case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:1322// Super peer is first triggered to download back-to-source.1323if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {1324go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {1325peer.Log.Infof("%s seed peer triggers download task", hostType.Name())1326if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {1327peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())1328return1329}1330
1331peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())1332}(ctx, taskID, download, types.HostTypeSuperSeed)1333
1334break1335}1336
1337fallthrough1338case commonv2.Priority_LEVEL5:1339// Strong peer is first triggered to download back-to-source.1340if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {1341go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {1342peer.Log.Infof("%s seed peer triggers download task", hostType.Name())1343if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {1344peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())1345return1346}1347
1348peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())1349}(ctx, taskID, download, types.HostTypeSuperSeed)1350
1351break1352}1353
1354fallthrough1355case commonv2.Priority_LEVEL4:1356// Weak peer is first triggered to download back-to-source.1357if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {1358go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {1359peer.Log.Infof("%s seed peer triggers download task", hostType.Name())1360if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {1361peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())1362return1363}1364
1365peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())1366}(ctx, taskID, download, types.HostTypeSuperSeed)1367
1368break1369}1370
1371fallthrough1372case commonv2.Priority_LEVEL3:1373// When the task has no available peer,1374// the peer is first to download back-to-source.1375peer.NeedBackToSource.Store(true)1376case commonv2.Priority_LEVEL2:1377// Peer is first to download back-to-source.1378return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())1379case commonv2.Priority_LEVEL1:1380// Download task is forbidden.1381return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())1382default:1383return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)1384}1385
1386return nil1387}
1388