Dragonfly2
962 строки · 26.5 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination mocks/stroage_manager_mock.go -source storage_manager.go -package mocks
18
19package storage20
21import (22"context"23"encoding/base64"24"encoding/json"25"errors"26"fmt"27"io"28"io/fs"29"os"30"path"31"path/filepath"32"sort"33"strings"34"sync"35"syscall"36"time"37
38"github.com/docker/go-units"39"github.com/shirou/gopsutil/v3/disk"40"go.opentelemetry.io/otel"41"go.opentelemetry.io/otel/trace"42
43commonv1 "d7y.io/api/v2/pkg/apis/common/v1"44
45"d7y.io/dragonfly/v2/client/config"46"d7y.io/dragonfly/v2/client/daemon/gc"47"d7y.io/dragonfly/v2/client/util"48logger "d7y.io/dragonfly/v2/internal/dflog"49nethttp "d7y.io/dragonfly/v2/pkg/net/http"50)
51
52type TaskStorageDriver interface {53// WritePiece put a piece of a task to storage54WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error)55
56// ReadPiece get a piece data reader of a task from storage57// return a Reader and a Closer from task data with sought, caller should read bytes and close it.58// If req.Num is equal to -1, range has a fixed value.59ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error)60
61ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error)62
63GetPieces(ctx context.Context, req *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error)64
65GetTotalPieces(ctx context.Context, req *PeerTaskMetadata) (int32, error)66
67GetExtendAttribute(ctx context.Context, req *PeerTaskMetadata) (*commonv1.ExtendAttribute, error)68
69UpdateTask(ctx context.Context, req *UpdateTaskRequest) error70
71// Store stores task data to the target path72Store(ctx context.Context, req *StoreRequest) error73
74ValidateDigest(req *PeerTaskMetadata) error75
76IsInvalid(req *PeerTaskMetadata) (bool, error)77}
78
79// Reclaimer stands storage reclaimer
80type Reclaimer interface {81// CanReclaim indicates whether the storage can be reclaimed82CanReclaim() bool83
84// MarkReclaim marks the storage which will be reclaimed85MarkReclaim()86
87// Reclaim reclaims the storage88Reclaim() error89}
90
91type Manager interface {92TaskStorageDriver
93// KeepAlive tests if storage is used in given time duration94util.KeepAlive95// RegisterTask registers a task in storage driver96RegisterTask(ctx context.Context, req *RegisterTaskRequest) (TaskStorageDriver, error)97// RegisterSubTask registers a subtask in storage driver98RegisterSubTask(ctx context.Context, req *RegisterSubTaskRequest) (TaskStorageDriver, error)99// UnregisterTask unregisters a task in storage driver100UnregisterTask(ctx context.Context, req CommonTaskRequest) error101// FindCompletedTask try to find a completed task for fast path102FindCompletedTask(taskID string) *ReusePeerTask103// FindCompletedSubTask try to find a completed subtask for fast path104FindCompletedSubTask(taskID string) *ReusePeerTask105// FindPartialCompletedTask try to find a partial completed task for fast path106FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask107// CleanUp cleans all storage data108CleanUp()109}
110
111var (112ErrTaskNotFound = errors.New("task not found")113ErrPieceNotFound = errors.New("piece not found")114ErrPieceCountNotSet = errors.New("total piece count not set")115ErrDigestNotSet = errors.New("digest not set")116ErrInvalidDigest = errors.New("invalid digest")117ErrBadRequest = errors.New("bad request")118)
119
120const (121GCName = "StorageManager"122)
123
124var tracer trace.Tracer125
126func init() {127tracer = otel.Tracer("dfget-daemon-gc")128}
129
130type storageManager struct {131sync.Mutex132util.KeepAlive133storeStrategy config.StoreStrategy134storeOption *config.StorageOption135tasks sync.Map136markedReclaimTasks []PeerTaskMetadata137dataPathStat *syscall.Stat_t138gcCallback func(CommonTaskRequest)139gcInterval time.Duration140dataDirMode fs.FileMode141
142indexRWMutex sync.RWMutex143indexTask2PeerTask map[string][]*localTaskStore // key: task id, value: slice of localTaskStore144
145subIndexRWMutex sync.RWMutex146subIndexTask2PeerTask map[string][]*localSubTaskStore // key: task id, value: slice of localSubTaskStore147}
148
149var _ gc.GC = (*storageManager)(nil)150var _ Manager = (*storageManager)(nil)151
152type GCCallback func(request CommonTaskRequest)153
154func NewStorageManager(storeStrategy config.StoreStrategy, opt *config.StorageOption, gcCallback GCCallback, dirMode fs.FileMode, moreOpts ...func(*storageManager) error) (Manager, error) {155dataDirMode := defaultDirectoryMode156// If dirMode isn't in config, use default157if dirMode != os.FileMode(0) {158dataDirMode = defaultDirectoryMode159}160if !path.IsAbs(opt.DataPath) {161abs, err := filepath.Abs(opt.DataPath)162if err != nil {163return nil, err164}165opt.DataPath = abs166}167stat, err := os.Stat(opt.DataPath)168if os.IsNotExist(err) {169if err := os.MkdirAll(opt.DataPath, dataDirMode); err != nil {170return nil, err171}172stat, err = os.Stat(opt.DataPath)173}174if err != nil {175return nil, err176}177switch storeStrategy {178case config.SimpleLocalTaskStoreStrategy, config.AdvanceLocalTaskStoreStrategy:179case config.StoreStrategy(""):180storeStrategy = config.SimpleLocalTaskStoreStrategy181default:182return nil, fmt.Errorf("not support store strategy: %s", storeStrategy)183}184
185s := &storageManager{186KeepAlive: util.NewKeepAlive("storage manager"),187storeStrategy: storeStrategy,188storeOption: opt,189dataPathStat: stat.Sys().(*syscall.Stat_t),190gcCallback: gcCallback,191gcInterval: time.Minute,192dataDirMode: dataDirMode,193indexTask2PeerTask: map[string][]*localTaskStore{},194subIndexTask2PeerTask: map[string][]*localSubTaskStore{},195}196
197for _, o := range moreOpts {198if err := o(s); err != nil {199return nil, err200}201}202
203if err := s.ReloadPersistentTask(gcCallback); err != nil {204logger.Warnf("reload tasks error: %s", err)205}206
207gc.Register(GCName, s)208return s, nil209}
210
211func WithStorageOption(opt *config.StorageOption) func(*storageManager) error {212return func(manager *storageManager) error {213manager.storeOption = opt214return nil215}216}
217
218func WithGCInterval(gcInterval time.Duration) func(*storageManager) error {219return func(manager *storageManager) error {220manager.gcInterval = gcInterval221return nil222}223}
224
225func (s *storageManager) RegisterTask(ctx context.Context, req *RegisterTaskRequest) (TaskStorageDriver, error) {226ts, ok := s.LoadTask(227PeerTaskMetadata{228PeerID: req.PeerID,229TaskID: req.TaskID,230})231if ok {232return ts, nil233}234// double check if task store exists235// if ok, just unlock and return236s.Lock()237defer s.Unlock()238if ts, ok = s.LoadTask(239PeerTaskMetadata{240PeerID: req.PeerID,241TaskID: req.TaskID,242}); ok {243return ts, nil244}245// still not exist, create a new task store246return s.CreateTask(req)247}
248
249func (s *storageManager) RegisterSubTask(ctx context.Context, req *RegisterSubTaskRequest) (TaskStorageDriver, error) {250t, ok := s.LoadTask(251PeerTaskMetadata{252PeerID: req.Parent.PeerID,253TaskID: req.Parent.TaskID,254})255if !ok {256return nil, fmt.Errorf("task %s not found", req.Parent.TaskID)257}258
259subtask := t.(*localTaskStore).SubTask(req)260s.subIndexRWMutex.Lock()261if ts, ok := s.subIndexTask2PeerTask[req.SubTask.TaskID]; ok {262ts = append(ts, subtask)263s.subIndexTask2PeerTask[req.SubTask.TaskID] = ts264} else {265s.subIndexTask2PeerTask[req.SubTask.TaskID] = []*localSubTaskStore{subtask}266}267s.subIndexRWMutex.Unlock()268
269s.Lock()270s.tasks.Store(271PeerTaskMetadata{272PeerID: req.SubTask.PeerID,273TaskID: req.SubTask.TaskID,274}, subtask)275s.Unlock()276return subtask, nil277}
278
279func (s *storageManager) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) {280t, ok := s.LoadTask(281PeerTaskMetadata{282PeerID: req.PeerID,283TaskID: req.TaskID,284})285if !ok {286return 0, ErrTaskNotFound287}288return t.WritePiece(ctx, req)289}
290
291func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) {292t, ok := s.LoadTask(293PeerTaskMetadata{294PeerID: req.PeerID,295TaskID: req.TaskID,296})297if !ok {298// TODO recover for local task persistentMetadata data299return nil, nil, ErrTaskNotFound300}301return t.ReadPiece(ctx, req)302}
303
304func (s *storageManager) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) {305t, ok := s.LoadTask(306PeerTaskMetadata{307PeerID: req.PeerID,308TaskID: req.TaskID,309})310if !ok {311// TODO recover for local task persistentMetadata data312return nil, ErrTaskNotFound313}314return t.ReadAllPieces(ctx, req)315}
316
317func (s *storageManager) Store(ctx context.Context, req *StoreRequest) error {318t, ok := s.LoadTask(319PeerTaskMetadata{320PeerID: req.PeerID,321TaskID: req.TaskID,322})323if !ok {324// TODO recover for local task persistentMetadata data325return ErrTaskNotFound326}327return t.Store(ctx, req)328}
329
330func (s *storageManager) GetPieces(ctx context.Context, req *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {331t, ok := s.LoadTask(332PeerTaskMetadata{333TaskID: req.TaskId,334PeerID: req.DstPid,335})336if !ok {337return nil, ErrTaskNotFound338}339return t.GetPieces(ctx, req)340}
341
342func (s *storageManager) GetTotalPieces(ctx context.Context, req *PeerTaskMetadata) (int32, error) {343t, ok := s.LoadTask(344PeerTaskMetadata{345TaskID: req.TaskID,346PeerID: req.PeerID,347})348if !ok {349return -1, ErrTaskNotFound350}351return t.(TaskStorageDriver).GetTotalPieces(ctx, req)352}
353
354func (s *storageManager) GetExtendAttribute(ctx context.Context, req *PeerTaskMetadata) (*commonv1.ExtendAttribute, error) {355t, ok := s.LoadTask(356PeerTaskMetadata{357TaskID: req.TaskID,358PeerID: req.PeerID,359})360if !ok {361return nil, ErrTaskNotFound362}363return t.(TaskStorageDriver).GetExtendAttribute(ctx, req)364}
365
366func (s *storageManager) LoadTask(meta PeerTaskMetadata) (TaskStorageDriver, bool) {367s.Keep()368d, ok := s.tasks.Load(meta)369if !ok {370return nil, false371}372return d.(TaskStorageDriver), ok373}
374
375func (s *storageManager) LoadAndDeleteTask(meta PeerTaskMetadata) (TaskStorageDriver, bool) {376s.Keep()377d, ok := s.tasks.LoadAndDelete(meta)378if !ok {379return nil, false380}381return d.(TaskStorageDriver), ok382}
383
384func (s *storageManager) UpdateTask(ctx context.Context, req *UpdateTaskRequest) error {385t, ok := s.LoadTask(386PeerTaskMetadata{387TaskID: req.TaskID,388PeerID: req.PeerID,389})390if !ok {391return ErrTaskNotFound392}393return t.UpdateTask(ctx, req)394}
395
396func (s *storageManager) CreateTask(req *RegisterTaskRequest) (TaskStorageDriver, error) {397s.Keep()398logger.Debugf("init local task storage, peer id: %s, task id: %s", req.PeerID, req.TaskID)399
400dataDir := path.Join(s.storeOption.DataPath, req.TaskID, req.PeerID)401t := &localTaskStore{402persistentMetadata: persistentMetadata{403StoreStrategy: string(s.storeStrategy),404TaskID: req.TaskID,405TaskMeta: map[string]string{},406ContentLength: req.ContentLength,407TotalPieces: req.TotalPieces,408PieceMd5Sign: req.PieceMd5Sign,409PeerID: req.PeerID,410Pieces: map[int32]PieceMetadata{},411},412gcCallback: s.gcCallback,413dataDir: dataDir,414metadataFilePath: path.Join(dataDir, taskMetadata),415expireTime: s.storeOption.TaskExpireTime.Duration,416subtasks: map[PeerTaskMetadata]*localSubTaskStore{},417
418SugaredLoggerOnWith: logger.With("task", req.TaskID, "peer", req.PeerID, "component", "localTaskStore"),419}420
421dataDirMode := defaultDirectoryMode422// If dirMode isn't in config, use default423if s.dataDirMode != os.FileMode(0) {424dataDirMode = s.dataDirMode425}426
427if err := os.MkdirAll(t.dataDir, dataDirMode); err != nil && !os.IsExist(err) {428return nil, err429}430t.touch()431
432// fallback to simple strategy for proxy433if req.DesiredLocation == "" {434t.StoreStrategy = string(config.SimpleLocalTaskStoreStrategy)435}436data := path.Join(dataDir, taskData)437switch t.StoreStrategy {438case string(config.SimpleLocalTaskStoreStrategy):439t.DataFilePath = data440f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)441if err != nil {442return nil, err443}444f.Close()445case string(config.AdvanceLocalTaskStoreStrategy):446dir, file := path.Split(req.DesiredLocation)447dirStat, err := os.Stat(dir)448if err != nil {449return nil, err450}451
452t.DataFilePath = path.Join(dir, fmt.Sprintf(".%s.dfget.cache.%s", file, req.PeerID))453f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)454if err != nil {455return nil, err456}457f.Close()458
459stat := dirStat.Sys().(*syscall.Stat_t)460// same dev, can hard link461if stat.Dev == s.dataPathStat.Dev {462logger.Debugf("same device, try to hard link")463if err := os.Link(t.DataFilePath, data); err != nil {464logger.Warnf("hard link failed for same device: %s, fallback to symbol link", err)465// fallback to symbol link466if err := os.Symlink(t.DataFilePath, data); err != nil {467logger.Errorf("symbol link failed: %s", err)468return nil, err469}470}471} else {472logger.Debugf("different devices, try to symbol link")473// make symbol link for reload error gc474if err := os.Symlink(t.DataFilePath, data); err != nil {475logger.Errorf("symbol link failed: %s", err)476return nil, err477}478}479}480s.tasks.Store(481PeerTaskMetadata{482PeerID: req.PeerID,483TaskID: req.TaskID,484}, t)485
486s.indexRWMutex.Lock()487if ts, ok := s.indexTask2PeerTask[req.TaskID]; ok {488ts = append(ts, t)489s.indexTask2PeerTask[req.TaskID] = ts490} else {491s.indexTask2PeerTask[req.TaskID] = []*localTaskStore{t}492}493s.indexRWMutex.Unlock()494return t, nil495}
496
497func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask {498s.indexRWMutex.RLock()499defer s.indexRWMutex.RUnlock()500ts, ok := s.indexTask2PeerTask[taskID]501if !ok {502return nil503}504for _, t := range ts {505if t.invalid.Load() {506continue507}508// touch it before marking reclaim509t.touch()510// already marked, skip511if t.reclaimMarked.Load() {512continue513}514
515if t.Done {516return &ReusePeerTask{517Storage: t,518PeerTaskMetadata: PeerTaskMetadata{519PeerID: t.PeerID,520TaskID: taskID,521},522ContentLength: t.ContentLength,523TotalPieces: t.TotalPieces,524Header: t.Header,525}526}527}528return nil529}
530
531func (s *storageManager) FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask {532s.indexRWMutex.RLock()533defer s.indexRWMutex.RUnlock()534ts, ok := s.indexTask2PeerTask[taskID]535if !ok {536return nil537}538for _, t := range ts {539if t.invalid.Load() {540continue541}542// touch it before marking reclaim543t.touch()544// already marked, skip545if t.reclaimMarked.Load() {546continue547}548
549if t.Done || t.partialCompleted(rg) {550return &ReusePeerTask{551Storage: t,552PeerTaskMetadata: PeerTaskMetadata{553PeerID: t.PeerID,554TaskID: taskID,555},556ContentLength: t.ContentLength,557TotalPieces: t.TotalPieces,558Header: t.Header,559}560}561}562return nil563}
564
565func (s *storageManager) FindCompletedSubTask(taskID string) *ReusePeerTask {566s.subIndexRWMutex.RLock()567defer s.subIndexRWMutex.RUnlock()568ts, ok := s.subIndexTask2PeerTask[taskID]569if !ok {570return nil571}572for _, t := range ts {573if t.invalid.Load() {574continue575}576// touch it before marking reclaim577t.parent.touch()578// already marked, skip579if t.parent.reclaimMarked.Load() {580continue581}582
583if !t.Done {584continue585}586return &ReusePeerTask{587PeerTaskMetadata: PeerTaskMetadata{588PeerID: t.PeerID,589TaskID: taskID,590},591ContentLength: t.ContentLength,592TotalPieces: t.TotalPieces,593}594}595return nil596}
597
598func (s *storageManager) cleanIndex(taskID, peerID string) {599s.indexRWMutex.Lock()600defer s.indexRWMutex.Unlock()601ts, ok := s.indexTask2PeerTask[taskID]602if !ok {603return604}605var remain []*localTaskStore606// FIXME switch instead copy607for _, t := range ts {608if t.PeerID == peerID {609logger.Debugf("clean index for %s/%s", taskID, peerID)610continue611}612remain = append(remain, t)613}614s.indexTask2PeerTask[taskID] = remain615}
616
617func (s *storageManager) cleanSubIndex(taskID, peerID string) {618s.subIndexRWMutex.Lock()619defer s.subIndexRWMutex.Unlock()620ts, ok := s.subIndexTask2PeerTask[taskID]621if !ok {622return623}624var remain []*localSubTaskStore625// FIXME switch instead copy626for _, t := range ts {627if t.PeerID == peerID {628logger.Debugf("clean index for %s/%s", taskID, peerID)629continue630}631remain = append(remain, t)632}633s.subIndexTask2PeerTask[taskID] = remain634}
635
636func (s *storageManager) ValidateDigest(req *PeerTaskMetadata) error {637t, ok := s.LoadTask(638PeerTaskMetadata{639TaskID: req.TaskID,640PeerID: req.PeerID,641})642if !ok {643return ErrTaskNotFound644}645return t.ValidateDigest(req)646}
647
648func (s *storageManager) IsInvalid(req *PeerTaskMetadata) (bool, error) {649t, ok := s.LoadTask(650PeerTaskMetadata{651TaskID: req.TaskID,652PeerID: req.PeerID,653})654if !ok {655return false, ErrTaskNotFound656}657return t.IsInvalid(req)658}
659
660func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error {661dirs, err := os.ReadDir(s.storeOption.DataPath)662if os.IsNotExist(err) {663return nil664}665if err != nil {666return err667}668var (669loadErrs []error670loadErrDirs []string671)672for _, dir := range dirs {673taskID := dir.Name()674taskDir := path.Join(s.storeOption.DataPath, taskID)675peerDirs, err := os.ReadDir(taskDir)676if err != nil {677continue678}679// remove empty task dir680if len(peerDirs) == 0 {681// skip dot files or directories682if strings.HasPrefix(taskDir, ".") {683continue684}685if err := os.Remove(taskDir); err != nil {686logger.Errorf("remove empty task dir %s failed: %s", taskDir, err)687} else {688logger.Infof("remove empty task dir %s", taskDir)689}690continue691}692for _, peerDir := range peerDirs {693peerID := peerDir.Name()694dataDir := path.Join(s.storeOption.DataPath, taskID, peerID)695t := &localTaskStore{696dataDir: dataDir,697metadataFilePath: path.Join(dataDir, taskMetadata),698expireTime: s.storeOption.TaskExpireTime.Duration,699gcCallback: gcCallback,700SugaredLoggerOnWith: logger.With("task", taskID, "peer", peerID, "component", s.storeStrategy),701}702t.touch()703
704metadataFile, err := os.Open(t.metadataFilePath)705if err != nil {706loadErrs = append(loadErrs, err)707loadErrDirs = append(loadErrDirs, dataDir)708logger.With("action", "reload", "stage", "read metadata", "taskID", taskID, "peerID", peerID).709Warnf("open task metadata error: %s", err)710continue711}712bytes, err0 := io.ReadAll(metadataFile)713if err0 != nil {714metadataFile.Close()715loadErrs = append(loadErrs, err0)716loadErrDirs = append(loadErrDirs, dataDir)717logger.With("action", "reload", "stage", "read metadata", "taskID", taskID, "peerID", peerID).718Warnf("load task from disk error: %s", err0)719continue720}721metadataFile.Close()722
723if err0 = json.Unmarshal(bytes, &t.persistentMetadata); err0 != nil {724loadErrs = append(loadErrs, err0)725loadErrDirs = append(loadErrDirs, dataDir)726logger.With("action", "reload", "stage", "parse metadata", "taskID", taskID, "peerID", peerID).727Warnf("load task from disk error: %s, data base64 encode: %s", err0, base64.StdEncoding.EncodeToString(bytes))728continue729}730logger.Debugf("load task %s/%s from disk, metadata %s, last access: %v, expire time: %s",731t.persistentMetadata.TaskID, t.persistentMetadata.PeerID, t.metadataFilePath, time.Unix(0, t.lastAccess.Load()), t.expireTime)732s.tasks.Store(PeerTaskMetadata{733PeerID: peerID,734TaskID: taskID,735}, t)736
737// update index738if ts, ok := s.indexTask2PeerTask[taskID]; ok {739ts = append(ts, t)740s.indexTask2PeerTask[taskID] = ts741} else {742s.indexTask2PeerTask[taskID] = []*localTaskStore{t}743}744}745}746// remove load error peer tasks747for _, dir := range loadErrDirs {748// remove metadata749if err = os.Remove(path.Join(dir, taskMetadata)); err != nil {750logger.Warnf("remove load error file %s error: %s", path.Join(dir, taskMetadata), err)751} else {752logger.Warnf("remove load error file %s ok", path.Join(dir, taskMetadata))753}754
755// remove data756data := path.Join(dir, taskData)757stat, err := os.Lstat(data)758if err == nil {759// remove sym link file760if stat.Mode()&os.ModeSymlink == os.ModeSymlink {761dest, err0 := os.Readlink(data)762if err0 == nil {763if err = os.Remove(dest); err != nil {764logger.Warnf("remove load error file %s error: %s", data, err)765}766}767}768if err = os.Remove(data); err != nil {769logger.Warnf("remove load error file %s error: %s", data, err)770} else {771logger.Warnf("remove load error file %s ok", data)772}773}774
775if err = os.Remove(dir); err != nil {776logger.Warnf("remove load error directory %s error: %s", dir, err)777}778logger.Warnf("remove load error directory %s ok", dir)779}780if len(loadErrs) > 0 {781var sb strings.Builder782for _, err := range loadErrs {783sb.WriteString(err.Error())784}785return fmt.Errorf("load tasks from disk error: %q", sb.String())786}787return nil788}
789
790func (s *storageManager) TryGC() (bool, error) {791// FIXME gc subtask792var markedTasks []PeerTaskMetadata793var totalNotMarkedSize int64794s.tasks.Range(func(key, task any) bool {795if task.(Reclaimer).CanReclaim() {796task.(Reclaimer).MarkReclaim()797markedTasks = append(markedTasks, key.(PeerTaskMetadata))798} else {799lts, ok := task.(*localTaskStore)800if ok {801// just calculate not reclaimed task802totalNotMarkedSize += lts.ContentLength803logger.Debugf("task %s/%s not reach gc time",804key.(PeerTaskMetadata).TaskID, key.(PeerTaskMetadata).PeerID)805}806}807return true808})809
810quotaBytesExceed := totalNotMarkedSize - int64(s.storeOption.DiskGCThreshold)811quotaExceed := s.storeOption.DiskGCThreshold > 0 && quotaBytesExceed > 0812usageExceed, usageBytesExceed := s.diskUsageExceed()813
814if quotaExceed || usageExceed {815var bytesExceed int64816// only use quotaBytesExceed when s.storeOption.DiskGCThreshold > 0817if s.storeOption.DiskGCThreshold > 0 && quotaBytesExceed > usageBytesExceed {818bytesExceed = quotaBytesExceed819} else {820bytesExceed = usageBytesExceed821}822logger.Infof("quota threshold reached, start gc oldest task, size: %d bytes", bytesExceed)823var tasks []*localTaskStore824s.tasks.Range(func(key, val any) bool {825// skip reclaimed task826task, ok := val.(*localTaskStore)827if !ok { // skip subtask828return true829}830if task.reclaimMarked.Load() {831return true832}833// task is not done, and is active in s.gcInterval834// next gc loop will check it again835if !task.Done && time.Since(time.Unix(0, task.lastAccess.Load())) < s.gcInterval {836return true837}838tasks = append(tasks, task)839return true840})841// sort by access time842sort.SliceStable(tasks, func(i, j int) bool {843return tasks[i].lastAccess.Load() < tasks[j].lastAccess.Load()844})845for _, task := range tasks {846task.MarkReclaim()847markedTasks = append(markedTasks, PeerTaskMetadata{task.PeerID, task.TaskID})848logger.Infof("quota threshold reached, mark task %s/%s reclaimed, last access: %s, size: %s",849task.TaskID, task.PeerID, time.Unix(0, task.lastAccess.Load()).Format(time.RFC3339Nano),850units.BytesSize(float64(task.ContentLength)))851bytesExceed -= task.ContentLength852if bytesExceed <= 0 {853break854}855}856if bytesExceed > 0 {857logger.Warnf("no enough tasks to gc, remind %d bytes", bytesExceed)858}859}860
861for _, key := range s.markedReclaimTasks {862t, ok := s.tasks.Load(key)863if !ok {864continue865}866_, span := tracer.Start(context.Background(), config.SpanPeerGC)867s.tasks.Delete(key)868
869if lts, ok := t.(*localTaskStore); ok {870span.SetAttributes(config.AttributePeerID.String(lts.PeerID))871span.SetAttributes(config.AttributeTaskID.String(lts.TaskID))872s.cleanIndex(lts.TaskID, lts.PeerID)873} else {874task := t.(*localSubTaskStore)875span.SetAttributes(config.AttributePeerID.String(task.PeerID))876span.SetAttributes(config.AttributeTaskID.String(task.TaskID))877s.cleanSubIndex(task.TaskID, task.PeerID)878}879
880if err := t.(Reclaimer).Reclaim(); err != nil {881// FIXME: retry later or push to queue882logger.Errorf("gc task %s/%s error: %s", key.TaskID, key.PeerID, err)883span.RecordError(err)884span.End()885continue886}887logger.Infof("task %s/%s reclaimed", key.TaskID, key.PeerID)888// remove reclaimed task in markedTasks889for i, k := range markedTasks {890if k.TaskID == key.TaskID && k.PeerID == key.PeerID {891markedTasks = append(markedTasks[:i], markedTasks[i+1:]...)892break893}894}895span.End()896}897logger.Infof("marked %d task(s), reclaimed %d task(s)", len(markedTasks), len(s.markedReclaimTasks))898s.markedReclaimTasks = markedTasks899return true, nil900}
901
902// delete the given task from local storage and unregister it from scheduler.
903func (s *storageManager) deleteTask(meta PeerTaskMetadata) error {904task, ok := s.LoadAndDeleteTask(meta)905if !ok {906logger.Infof("deleteTask: task meta not found: %v", meta)907return nil908}909
910logger.Debugf("deleteTask: deleting task: %v", meta)911if _, ok := task.(*localTaskStore); ok {912s.cleanIndex(meta.TaskID, meta.PeerID)913} else {914s.cleanSubIndex(meta.TaskID, meta.PeerID)915}916// MarkReclaim() will call gcCallback, which will unregister task from scheduler917task.(Reclaimer).MarkReclaim()918return task.(Reclaimer).Reclaim()919}
920
921func (s *storageManager) UnregisterTask(ctx context.Context, req CommonTaskRequest) error {922return s.deleteTask(PeerTaskMetadata{923TaskID: req.TaskID,924PeerID: req.PeerID,925})926}
927
928func (s *storageManager) CleanUp() {929_, _ = s.forceGC()930}
931
932func (s *storageManager) forceGC() (bool, error) {933s.tasks.Range(func(key, task any) bool {934meta := key.(PeerTaskMetadata)935err := s.deleteTask(meta)936if err != nil {937logger.Errorf("gc task store %s error: %s", key, err)938}939return true940})941return true, nil942}
943
944func (s *storageManager) diskUsageExceed() (exceed bool, bytes int64) {945if s.storeOption.DiskGCThresholdPercent <= 0 {946return false, 0947}948usage, err := disk.Usage(s.storeOption.DataPath)949if err != nil {950logger.Warnf("get %s disk usage error: %s", s.storeOption.DataPath, err)951return false, 0952}953logger.Debugf("disk usage: %+v", usage)954if usage.UsedPercent < s.storeOption.DiskGCThresholdPercent {955return false, 0956}957
958bs := (usage.UsedPercent - s.storeOption.DiskGCThresholdPercent) * float64(usage.Total) / 100.0959logger.Infof("disk used percent %f, exceed threshold percent %f, %d bytes to reclaim",960usage.UsedPercent, s.storeOption.DiskGCThresholdPercent, int64(bs))961return true, int64(bs)962}
963