Dragonfly2

Форк
0
/
storage_manager.go 
962 строки · 26.5 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination mocks/stroage_manager_mock.go -source storage_manager.go -package mocks
18

19
package storage
20

21
import (
22
	"context"
23
	"encoding/base64"
24
	"encoding/json"
25
	"errors"
26
	"fmt"
27
	"io"
28
	"io/fs"
29
	"os"
30
	"path"
31
	"path/filepath"
32
	"sort"
33
	"strings"
34
	"sync"
35
	"syscall"
36
	"time"
37

38
	"github.com/docker/go-units"
39
	"github.com/shirou/gopsutil/v3/disk"
40
	"go.opentelemetry.io/otel"
41
	"go.opentelemetry.io/otel/trace"
42

43
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
44

45
	"d7y.io/dragonfly/v2/client/config"
46
	"d7y.io/dragonfly/v2/client/daemon/gc"
47
	"d7y.io/dragonfly/v2/client/util"
48
	logger "d7y.io/dragonfly/v2/internal/dflog"
49
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
50
)
51

52
type TaskStorageDriver interface {
53
	// WritePiece put a piece of a task to storage
54
	WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error)
55

56
	// ReadPiece get a piece data reader of a task from storage
57
	// return a Reader and a Closer from task data with sought, caller should read bytes and close it.
58
	// If req.Num is equal to -1, range has a fixed value.
59
	ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error)
60

61
	ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error)
62

63
	GetPieces(ctx context.Context, req *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error)
64

65
	GetTotalPieces(ctx context.Context, req *PeerTaskMetadata) (int32, error)
66

67
	GetExtendAttribute(ctx context.Context, req *PeerTaskMetadata) (*commonv1.ExtendAttribute, error)
68

69
	UpdateTask(ctx context.Context, req *UpdateTaskRequest) error
70

71
	// Store stores task data to the target path
72
	Store(ctx context.Context, req *StoreRequest) error
73

74
	ValidateDigest(req *PeerTaskMetadata) error
75

76
	IsInvalid(req *PeerTaskMetadata) (bool, error)
77
}
78

79
// Reclaimer stands storage reclaimer
80
type Reclaimer interface {
81
	// CanReclaim indicates whether the storage can be reclaimed
82
	CanReclaim() bool
83

84
	// MarkReclaim marks the storage which will be reclaimed
85
	MarkReclaim()
86

87
	// Reclaim reclaims the storage
88
	Reclaim() error
89
}
90

91
type Manager interface {
92
	TaskStorageDriver
93
	// KeepAlive tests if storage is used in given time duration
94
	util.KeepAlive
95
	// RegisterTask registers a task in storage driver
96
	RegisterTask(ctx context.Context, req *RegisterTaskRequest) (TaskStorageDriver, error)
97
	// RegisterSubTask registers a subtask in storage driver
98
	RegisterSubTask(ctx context.Context, req *RegisterSubTaskRequest) (TaskStorageDriver, error)
99
	// UnregisterTask unregisters a task in storage driver
100
	UnregisterTask(ctx context.Context, req CommonTaskRequest) error
101
	// FindCompletedTask try to find a completed task for fast path
102
	FindCompletedTask(taskID string) *ReusePeerTask
103
	// FindCompletedSubTask try to find a completed subtask for fast path
104
	FindCompletedSubTask(taskID string) *ReusePeerTask
105
	// FindPartialCompletedTask try to find a partial completed task for fast path
106
	FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask
107
	// CleanUp cleans all storage data
108
	CleanUp()
109
}
110

111
var (
112
	ErrTaskNotFound     = errors.New("task not found")
113
	ErrPieceNotFound    = errors.New("piece not found")
114
	ErrPieceCountNotSet = errors.New("total piece count not set")
115
	ErrDigestNotSet     = errors.New("digest not set")
116
	ErrInvalidDigest    = errors.New("invalid digest")
117
	ErrBadRequest       = errors.New("bad request")
118
)
119

120
const (
121
	GCName = "StorageManager"
122
)
123

124
var tracer trace.Tracer
125

126
func init() {
127
	tracer = otel.Tracer("dfget-daemon-gc")
128
}
129

130
type storageManager struct {
131
	sync.Mutex
132
	util.KeepAlive
133
	storeStrategy      config.StoreStrategy
134
	storeOption        *config.StorageOption
135
	tasks              sync.Map
136
	markedReclaimTasks []PeerTaskMetadata
137
	dataPathStat       *syscall.Stat_t
138
	gcCallback         func(CommonTaskRequest)
139
	gcInterval         time.Duration
140
	dataDirMode        fs.FileMode
141

142
	indexRWMutex       sync.RWMutex
143
	indexTask2PeerTask map[string][]*localTaskStore // key: task id, value: slice of localTaskStore
144

145
	subIndexRWMutex       sync.RWMutex
146
	subIndexTask2PeerTask map[string][]*localSubTaskStore // key: task id, value: slice of localSubTaskStore
147
}
148

149
var _ gc.GC = (*storageManager)(nil)
150
var _ Manager = (*storageManager)(nil)
151

152
type GCCallback func(request CommonTaskRequest)
153

154
func NewStorageManager(storeStrategy config.StoreStrategy, opt *config.StorageOption, gcCallback GCCallback, dirMode fs.FileMode, moreOpts ...func(*storageManager) error) (Manager, error) {
155
	dataDirMode := defaultDirectoryMode
156
	// If dirMode isn't in config, use default
157
	if dirMode != os.FileMode(0) {
158
		dataDirMode = defaultDirectoryMode
159
	}
160
	if !path.IsAbs(opt.DataPath) {
161
		abs, err := filepath.Abs(opt.DataPath)
162
		if err != nil {
163
			return nil, err
164
		}
165
		opt.DataPath = abs
166
	}
167
	stat, err := os.Stat(opt.DataPath)
168
	if os.IsNotExist(err) {
169
		if err := os.MkdirAll(opt.DataPath, dataDirMode); err != nil {
170
			return nil, err
171
		}
172
		stat, err = os.Stat(opt.DataPath)
173
	}
174
	if err != nil {
175
		return nil, err
176
	}
177
	switch storeStrategy {
178
	case config.SimpleLocalTaskStoreStrategy, config.AdvanceLocalTaskStoreStrategy:
179
	case config.StoreStrategy(""):
180
		storeStrategy = config.SimpleLocalTaskStoreStrategy
181
	default:
182
		return nil, fmt.Errorf("not support store strategy: %s", storeStrategy)
183
	}
184

185
	s := &storageManager{
186
		KeepAlive:             util.NewKeepAlive("storage manager"),
187
		storeStrategy:         storeStrategy,
188
		storeOption:           opt,
189
		dataPathStat:          stat.Sys().(*syscall.Stat_t),
190
		gcCallback:            gcCallback,
191
		gcInterval:            time.Minute,
192
		dataDirMode:           dataDirMode,
193
		indexTask2PeerTask:    map[string][]*localTaskStore{},
194
		subIndexTask2PeerTask: map[string][]*localSubTaskStore{},
195
	}
196

197
	for _, o := range moreOpts {
198
		if err := o(s); err != nil {
199
			return nil, err
200
		}
201
	}
202

203
	if err := s.ReloadPersistentTask(gcCallback); err != nil {
204
		logger.Warnf("reload tasks error: %s", err)
205
	}
206

207
	gc.Register(GCName, s)
208
	return s, nil
209
}
210

211
func WithStorageOption(opt *config.StorageOption) func(*storageManager) error {
212
	return func(manager *storageManager) error {
213
		manager.storeOption = opt
214
		return nil
215
	}
216
}
217

218
func WithGCInterval(gcInterval time.Duration) func(*storageManager) error {
219
	return func(manager *storageManager) error {
220
		manager.gcInterval = gcInterval
221
		return nil
222
	}
223
}
224

225
func (s *storageManager) RegisterTask(ctx context.Context, req *RegisterTaskRequest) (TaskStorageDriver, error) {
226
	ts, ok := s.LoadTask(
227
		PeerTaskMetadata{
228
			PeerID: req.PeerID,
229
			TaskID: req.TaskID,
230
		})
231
	if ok {
232
		return ts, nil
233
	}
234
	// double check if task store exists
235
	// if ok, just unlock and return
236
	s.Lock()
237
	defer s.Unlock()
238
	if ts, ok = s.LoadTask(
239
		PeerTaskMetadata{
240
			PeerID: req.PeerID,
241
			TaskID: req.TaskID,
242
		}); ok {
243
		return ts, nil
244
	}
245
	// still not exist, create a new task store
246
	return s.CreateTask(req)
247
}
248

249
func (s *storageManager) RegisterSubTask(ctx context.Context, req *RegisterSubTaskRequest) (TaskStorageDriver, error) {
250
	t, ok := s.LoadTask(
251
		PeerTaskMetadata{
252
			PeerID: req.Parent.PeerID,
253
			TaskID: req.Parent.TaskID,
254
		})
255
	if !ok {
256
		return nil, fmt.Errorf("task %s not found", req.Parent.TaskID)
257
	}
258

259
	subtask := t.(*localTaskStore).SubTask(req)
260
	s.subIndexRWMutex.Lock()
261
	if ts, ok := s.subIndexTask2PeerTask[req.SubTask.TaskID]; ok {
262
		ts = append(ts, subtask)
263
		s.subIndexTask2PeerTask[req.SubTask.TaskID] = ts
264
	} else {
265
		s.subIndexTask2PeerTask[req.SubTask.TaskID] = []*localSubTaskStore{subtask}
266
	}
267
	s.subIndexRWMutex.Unlock()
268

269
	s.Lock()
270
	s.tasks.Store(
271
		PeerTaskMetadata{
272
			PeerID: req.SubTask.PeerID,
273
			TaskID: req.SubTask.TaskID,
274
		}, subtask)
275
	s.Unlock()
276
	return subtask, nil
277
}
278

279
func (s *storageManager) WritePiece(ctx context.Context, req *WritePieceRequest) (int64, error) {
280
	t, ok := s.LoadTask(
281
		PeerTaskMetadata{
282
			PeerID: req.PeerID,
283
			TaskID: req.TaskID,
284
		})
285
	if !ok {
286
		return 0, ErrTaskNotFound
287
	}
288
	return t.WritePiece(ctx, req)
289
}
290

291
func (s *storageManager) ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) {
292
	t, ok := s.LoadTask(
293
		PeerTaskMetadata{
294
			PeerID: req.PeerID,
295
			TaskID: req.TaskID,
296
		})
297
	if !ok {
298
		// TODO recover for local task persistentMetadata data
299
		return nil, nil, ErrTaskNotFound
300
	}
301
	return t.ReadPiece(ctx, req)
302
}
303

304
func (s *storageManager) ReadAllPieces(ctx context.Context, req *ReadAllPiecesRequest) (io.ReadCloser, error) {
305
	t, ok := s.LoadTask(
306
		PeerTaskMetadata{
307
			PeerID: req.PeerID,
308
			TaskID: req.TaskID,
309
		})
310
	if !ok {
311
		// TODO recover for local task persistentMetadata data
312
		return nil, ErrTaskNotFound
313
	}
314
	return t.ReadAllPieces(ctx, req)
315
}
316

317
func (s *storageManager) Store(ctx context.Context, req *StoreRequest) error {
318
	t, ok := s.LoadTask(
319
		PeerTaskMetadata{
320
			PeerID: req.PeerID,
321
			TaskID: req.TaskID,
322
		})
323
	if !ok {
324
		// TODO recover for local task persistentMetadata data
325
		return ErrTaskNotFound
326
	}
327
	return t.Store(ctx, req)
328
}
329

330
func (s *storageManager) GetPieces(ctx context.Context, req *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
331
	t, ok := s.LoadTask(
332
		PeerTaskMetadata{
333
			TaskID: req.TaskId,
334
			PeerID: req.DstPid,
335
		})
336
	if !ok {
337
		return nil, ErrTaskNotFound
338
	}
339
	return t.GetPieces(ctx, req)
340
}
341

342
func (s *storageManager) GetTotalPieces(ctx context.Context, req *PeerTaskMetadata) (int32, error) {
343
	t, ok := s.LoadTask(
344
		PeerTaskMetadata{
345
			TaskID: req.TaskID,
346
			PeerID: req.PeerID,
347
		})
348
	if !ok {
349
		return -1, ErrTaskNotFound
350
	}
351
	return t.(TaskStorageDriver).GetTotalPieces(ctx, req)
352
}
353

354
func (s *storageManager) GetExtendAttribute(ctx context.Context, req *PeerTaskMetadata) (*commonv1.ExtendAttribute, error) {
355
	t, ok := s.LoadTask(
356
		PeerTaskMetadata{
357
			TaskID: req.TaskID,
358
			PeerID: req.PeerID,
359
		})
360
	if !ok {
361
		return nil, ErrTaskNotFound
362
	}
363
	return t.(TaskStorageDriver).GetExtendAttribute(ctx, req)
364
}
365

366
func (s *storageManager) LoadTask(meta PeerTaskMetadata) (TaskStorageDriver, bool) {
367
	s.Keep()
368
	d, ok := s.tasks.Load(meta)
369
	if !ok {
370
		return nil, false
371
	}
372
	return d.(TaskStorageDriver), ok
373
}
374

375
func (s *storageManager) LoadAndDeleteTask(meta PeerTaskMetadata) (TaskStorageDriver, bool) {
376
	s.Keep()
377
	d, ok := s.tasks.LoadAndDelete(meta)
378
	if !ok {
379
		return nil, false
380
	}
381
	return d.(TaskStorageDriver), ok
382
}
383

384
func (s *storageManager) UpdateTask(ctx context.Context, req *UpdateTaskRequest) error {
385
	t, ok := s.LoadTask(
386
		PeerTaskMetadata{
387
			TaskID: req.TaskID,
388
			PeerID: req.PeerID,
389
		})
390
	if !ok {
391
		return ErrTaskNotFound
392
	}
393
	return t.UpdateTask(ctx, req)
394
}
395

396
func (s *storageManager) CreateTask(req *RegisterTaskRequest) (TaskStorageDriver, error) {
397
	s.Keep()
398
	logger.Debugf("init local task storage, peer id: %s, task id: %s", req.PeerID, req.TaskID)
399

400
	dataDir := path.Join(s.storeOption.DataPath, req.TaskID, req.PeerID)
401
	t := &localTaskStore{
402
		persistentMetadata: persistentMetadata{
403
			StoreStrategy: string(s.storeStrategy),
404
			TaskID:        req.TaskID,
405
			TaskMeta:      map[string]string{},
406
			ContentLength: req.ContentLength,
407
			TotalPieces:   req.TotalPieces,
408
			PieceMd5Sign:  req.PieceMd5Sign,
409
			PeerID:        req.PeerID,
410
			Pieces:        map[int32]PieceMetadata{},
411
		},
412
		gcCallback:       s.gcCallback,
413
		dataDir:          dataDir,
414
		metadataFilePath: path.Join(dataDir, taskMetadata),
415
		expireTime:       s.storeOption.TaskExpireTime.Duration,
416
		subtasks:         map[PeerTaskMetadata]*localSubTaskStore{},
417

418
		SugaredLoggerOnWith: logger.With("task", req.TaskID, "peer", req.PeerID, "component", "localTaskStore"),
419
	}
420

421
	dataDirMode := defaultDirectoryMode
422
	// If dirMode isn't in config, use default
423
	if s.dataDirMode != os.FileMode(0) {
424
		dataDirMode = s.dataDirMode
425
	}
426

427
	if err := os.MkdirAll(t.dataDir, dataDirMode); err != nil && !os.IsExist(err) {
428
		return nil, err
429
	}
430
	t.touch()
431

432
	// fallback to simple strategy for proxy
433
	if req.DesiredLocation == "" {
434
		t.StoreStrategy = string(config.SimpleLocalTaskStoreStrategy)
435
	}
436
	data := path.Join(dataDir, taskData)
437
	switch t.StoreStrategy {
438
	case string(config.SimpleLocalTaskStoreStrategy):
439
		t.DataFilePath = data
440
		f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)
441
		if err != nil {
442
			return nil, err
443
		}
444
		f.Close()
445
	case string(config.AdvanceLocalTaskStoreStrategy):
446
		dir, file := path.Split(req.DesiredLocation)
447
		dirStat, err := os.Stat(dir)
448
		if err != nil {
449
			return nil, err
450
		}
451

452
		t.DataFilePath = path.Join(dir, fmt.Sprintf(".%s.dfget.cache.%s", file, req.PeerID))
453
		f, err := os.OpenFile(t.DataFilePath, os.O_CREATE|os.O_RDWR, defaultFileMode)
454
		if err != nil {
455
			return nil, err
456
		}
457
		f.Close()
458

459
		stat := dirStat.Sys().(*syscall.Stat_t)
460
		// same dev, can hard link
461
		if stat.Dev == s.dataPathStat.Dev {
462
			logger.Debugf("same device, try to hard link")
463
			if err := os.Link(t.DataFilePath, data); err != nil {
464
				logger.Warnf("hard link failed for same device: %s, fallback to symbol link", err)
465
				// fallback to symbol link
466
				if err := os.Symlink(t.DataFilePath, data); err != nil {
467
					logger.Errorf("symbol link failed: %s", err)
468
					return nil, err
469
				}
470
			}
471
		} else {
472
			logger.Debugf("different devices, try to symbol link")
473
			// make symbol link for reload error gc
474
			if err := os.Symlink(t.DataFilePath, data); err != nil {
475
				logger.Errorf("symbol link failed: %s", err)
476
				return nil, err
477
			}
478
		}
479
	}
480
	s.tasks.Store(
481
		PeerTaskMetadata{
482
			PeerID: req.PeerID,
483
			TaskID: req.TaskID,
484
		}, t)
485

486
	s.indexRWMutex.Lock()
487
	if ts, ok := s.indexTask2PeerTask[req.TaskID]; ok {
488
		ts = append(ts, t)
489
		s.indexTask2PeerTask[req.TaskID] = ts
490
	} else {
491
		s.indexTask2PeerTask[req.TaskID] = []*localTaskStore{t}
492
	}
493
	s.indexRWMutex.Unlock()
494
	return t, nil
495
}
496

497
func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask {
498
	s.indexRWMutex.RLock()
499
	defer s.indexRWMutex.RUnlock()
500
	ts, ok := s.indexTask2PeerTask[taskID]
501
	if !ok {
502
		return nil
503
	}
504
	for _, t := range ts {
505
		if t.invalid.Load() {
506
			continue
507
		}
508
		// touch it before marking reclaim
509
		t.touch()
510
		// already marked, skip
511
		if t.reclaimMarked.Load() {
512
			continue
513
		}
514

515
		if t.Done {
516
			return &ReusePeerTask{
517
				Storage: t,
518
				PeerTaskMetadata: PeerTaskMetadata{
519
					PeerID: t.PeerID,
520
					TaskID: taskID,
521
				},
522
				ContentLength: t.ContentLength,
523
				TotalPieces:   t.TotalPieces,
524
				Header:        t.Header,
525
			}
526
		}
527
	}
528
	return nil
529
}
530

531
func (s *storageManager) FindPartialCompletedTask(taskID string, rg *nethttp.Range) *ReusePeerTask {
532
	s.indexRWMutex.RLock()
533
	defer s.indexRWMutex.RUnlock()
534
	ts, ok := s.indexTask2PeerTask[taskID]
535
	if !ok {
536
		return nil
537
	}
538
	for _, t := range ts {
539
		if t.invalid.Load() {
540
			continue
541
		}
542
		// touch it before marking reclaim
543
		t.touch()
544
		// already marked, skip
545
		if t.reclaimMarked.Load() {
546
			continue
547
		}
548

549
		if t.Done || t.partialCompleted(rg) {
550
			return &ReusePeerTask{
551
				Storage: t,
552
				PeerTaskMetadata: PeerTaskMetadata{
553
					PeerID: t.PeerID,
554
					TaskID: taskID,
555
				},
556
				ContentLength: t.ContentLength,
557
				TotalPieces:   t.TotalPieces,
558
				Header:        t.Header,
559
			}
560
		}
561
	}
562
	return nil
563
}
564

565
func (s *storageManager) FindCompletedSubTask(taskID string) *ReusePeerTask {
566
	s.subIndexRWMutex.RLock()
567
	defer s.subIndexRWMutex.RUnlock()
568
	ts, ok := s.subIndexTask2PeerTask[taskID]
569
	if !ok {
570
		return nil
571
	}
572
	for _, t := range ts {
573
		if t.invalid.Load() {
574
			continue
575
		}
576
		// touch it before marking reclaim
577
		t.parent.touch()
578
		// already marked, skip
579
		if t.parent.reclaimMarked.Load() {
580
			continue
581
		}
582

583
		if !t.Done {
584
			continue
585
		}
586
		return &ReusePeerTask{
587
			PeerTaskMetadata: PeerTaskMetadata{
588
				PeerID: t.PeerID,
589
				TaskID: taskID,
590
			},
591
			ContentLength: t.ContentLength,
592
			TotalPieces:   t.TotalPieces,
593
		}
594
	}
595
	return nil
596
}
597

598
func (s *storageManager) cleanIndex(taskID, peerID string) {
599
	s.indexRWMutex.Lock()
600
	defer s.indexRWMutex.Unlock()
601
	ts, ok := s.indexTask2PeerTask[taskID]
602
	if !ok {
603
		return
604
	}
605
	var remain []*localTaskStore
606
	// FIXME switch instead copy
607
	for _, t := range ts {
608
		if t.PeerID == peerID {
609
			logger.Debugf("clean index for %s/%s", taskID, peerID)
610
			continue
611
		}
612
		remain = append(remain, t)
613
	}
614
	s.indexTask2PeerTask[taskID] = remain
615
}
616

617
func (s *storageManager) cleanSubIndex(taskID, peerID string) {
618
	s.subIndexRWMutex.Lock()
619
	defer s.subIndexRWMutex.Unlock()
620
	ts, ok := s.subIndexTask2PeerTask[taskID]
621
	if !ok {
622
		return
623
	}
624
	var remain []*localSubTaskStore
625
	// FIXME switch instead copy
626
	for _, t := range ts {
627
		if t.PeerID == peerID {
628
			logger.Debugf("clean index for %s/%s", taskID, peerID)
629
			continue
630
		}
631
		remain = append(remain, t)
632
	}
633
	s.subIndexTask2PeerTask[taskID] = remain
634
}
635

636
func (s *storageManager) ValidateDigest(req *PeerTaskMetadata) error {
637
	t, ok := s.LoadTask(
638
		PeerTaskMetadata{
639
			TaskID: req.TaskID,
640
			PeerID: req.PeerID,
641
		})
642
	if !ok {
643
		return ErrTaskNotFound
644
	}
645
	return t.ValidateDigest(req)
646
}
647

648
func (s *storageManager) IsInvalid(req *PeerTaskMetadata) (bool, error) {
649
	t, ok := s.LoadTask(
650
		PeerTaskMetadata{
651
			TaskID: req.TaskID,
652
			PeerID: req.PeerID,
653
		})
654
	if !ok {
655
		return false, ErrTaskNotFound
656
	}
657
	return t.IsInvalid(req)
658
}
659

660
func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error {
661
	dirs, err := os.ReadDir(s.storeOption.DataPath)
662
	if os.IsNotExist(err) {
663
		return nil
664
	}
665
	if err != nil {
666
		return err
667
	}
668
	var (
669
		loadErrs    []error
670
		loadErrDirs []string
671
	)
672
	for _, dir := range dirs {
673
		taskID := dir.Name()
674
		taskDir := path.Join(s.storeOption.DataPath, taskID)
675
		peerDirs, err := os.ReadDir(taskDir)
676
		if err != nil {
677
			continue
678
		}
679
		// remove empty task dir
680
		if len(peerDirs) == 0 {
681
			// skip dot files or directories
682
			if strings.HasPrefix(taskDir, ".") {
683
				continue
684
			}
685
			if err := os.Remove(taskDir); err != nil {
686
				logger.Errorf("remove empty task dir %s failed: %s", taskDir, err)
687
			} else {
688
				logger.Infof("remove empty task dir %s", taskDir)
689
			}
690
			continue
691
		}
692
		for _, peerDir := range peerDirs {
693
			peerID := peerDir.Name()
694
			dataDir := path.Join(s.storeOption.DataPath, taskID, peerID)
695
			t := &localTaskStore{
696
				dataDir:             dataDir,
697
				metadataFilePath:    path.Join(dataDir, taskMetadata),
698
				expireTime:          s.storeOption.TaskExpireTime.Duration,
699
				gcCallback:          gcCallback,
700
				SugaredLoggerOnWith: logger.With("task", taskID, "peer", peerID, "component", s.storeStrategy),
701
			}
702
			t.touch()
703

704
			metadataFile, err := os.Open(t.metadataFilePath)
705
			if err != nil {
706
				loadErrs = append(loadErrs, err)
707
				loadErrDirs = append(loadErrDirs, dataDir)
708
				logger.With("action", "reload", "stage", "read metadata", "taskID", taskID, "peerID", peerID).
709
					Warnf("open task metadata error: %s", err)
710
				continue
711
			}
712
			bytes, err0 := io.ReadAll(metadataFile)
713
			if err0 != nil {
714
				metadataFile.Close()
715
				loadErrs = append(loadErrs, err0)
716
				loadErrDirs = append(loadErrDirs, dataDir)
717
				logger.With("action", "reload", "stage", "read metadata", "taskID", taskID, "peerID", peerID).
718
					Warnf("load task from disk error: %s", err0)
719
				continue
720
			}
721
			metadataFile.Close()
722

723
			if err0 = json.Unmarshal(bytes, &t.persistentMetadata); err0 != nil {
724
				loadErrs = append(loadErrs, err0)
725
				loadErrDirs = append(loadErrDirs, dataDir)
726
				logger.With("action", "reload", "stage", "parse metadata", "taskID", taskID, "peerID", peerID).
727
					Warnf("load task from disk error: %s, data base64 encode: %s", err0, base64.StdEncoding.EncodeToString(bytes))
728
				continue
729
			}
730
			logger.Debugf("load task %s/%s from disk, metadata %s, last access: %v, expire time: %s",
731
				t.persistentMetadata.TaskID, t.persistentMetadata.PeerID, t.metadataFilePath, time.Unix(0, t.lastAccess.Load()), t.expireTime)
732
			s.tasks.Store(PeerTaskMetadata{
733
				PeerID: peerID,
734
				TaskID: taskID,
735
			}, t)
736

737
			// update index
738
			if ts, ok := s.indexTask2PeerTask[taskID]; ok {
739
				ts = append(ts, t)
740
				s.indexTask2PeerTask[taskID] = ts
741
			} else {
742
				s.indexTask2PeerTask[taskID] = []*localTaskStore{t}
743
			}
744
		}
745
	}
746
	// remove load error peer tasks
747
	for _, dir := range loadErrDirs {
748
		// remove metadata
749
		if err = os.Remove(path.Join(dir, taskMetadata)); err != nil {
750
			logger.Warnf("remove load error file %s error: %s", path.Join(dir, taskMetadata), err)
751
		} else {
752
			logger.Warnf("remove load error file %s ok", path.Join(dir, taskMetadata))
753
		}
754

755
		// remove data
756
		data := path.Join(dir, taskData)
757
		stat, err := os.Lstat(data)
758
		if err == nil {
759
			// remove sym link file
760
			if stat.Mode()&os.ModeSymlink == os.ModeSymlink {
761
				dest, err0 := os.Readlink(data)
762
				if err0 == nil {
763
					if err = os.Remove(dest); err != nil {
764
						logger.Warnf("remove load error file %s error: %s", data, err)
765
					}
766
				}
767
			}
768
			if err = os.Remove(data); err != nil {
769
				logger.Warnf("remove load error file %s error: %s", data, err)
770
			} else {
771
				logger.Warnf("remove load error file %s ok", data)
772
			}
773
		}
774

775
		if err = os.Remove(dir); err != nil {
776
			logger.Warnf("remove load error directory %s error: %s", dir, err)
777
		}
778
		logger.Warnf("remove load error directory %s ok", dir)
779
	}
780
	if len(loadErrs) > 0 {
781
		var sb strings.Builder
782
		for _, err := range loadErrs {
783
			sb.WriteString(err.Error())
784
		}
785
		return fmt.Errorf("load tasks from disk error: %q", sb.String())
786
	}
787
	return nil
788
}
789

790
func (s *storageManager) TryGC() (bool, error) {
791
	// FIXME gc subtask
792
	var markedTasks []PeerTaskMetadata
793
	var totalNotMarkedSize int64
794
	s.tasks.Range(func(key, task any) bool {
795
		if task.(Reclaimer).CanReclaim() {
796
			task.(Reclaimer).MarkReclaim()
797
			markedTasks = append(markedTasks, key.(PeerTaskMetadata))
798
		} else {
799
			lts, ok := task.(*localTaskStore)
800
			if ok {
801
				// just calculate not reclaimed task
802
				totalNotMarkedSize += lts.ContentLength
803
				logger.Debugf("task %s/%s not reach gc time",
804
					key.(PeerTaskMetadata).TaskID, key.(PeerTaskMetadata).PeerID)
805
			}
806
		}
807
		return true
808
	})
809

810
	quotaBytesExceed := totalNotMarkedSize - int64(s.storeOption.DiskGCThreshold)
811
	quotaExceed := s.storeOption.DiskGCThreshold > 0 && quotaBytesExceed > 0
812
	usageExceed, usageBytesExceed := s.diskUsageExceed()
813

814
	if quotaExceed || usageExceed {
815
		var bytesExceed int64
816
		// only use quotaBytesExceed when s.storeOption.DiskGCThreshold > 0
817
		if s.storeOption.DiskGCThreshold > 0 && quotaBytesExceed > usageBytesExceed {
818
			bytesExceed = quotaBytesExceed
819
		} else {
820
			bytesExceed = usageBytesExceed
821
		}
822
		logger.Infof("quota threshold reached, start gc oldest task, size: %d bytes", bytesExceed)
823
		var tasks []*localTaskStore
824
		s.tasks.Range(func(key, val any) bool {
825
			// skip reclaimed task
826
			task, ok := val.(*localTaskStore)
827
			if !ok { // skip subtask
828
				return true
829
			}
830
			if task.reclaimMarked.Load() {
831
				return true
832
			}
833
			// task is not done, and is active in s.gcInterval
834
			// next gc loop will check it again
835
			if !task.Done && time.Since(time.Unix(0, task.lastAccess.Load())) < s.gcInterval {
836
				return true
837
			}
838
			tasks = append(tasks, task)
839
			return true
840
		})
841
		// sort by access time
842
		sort.SliceStable(tasks, func(i, j int) bool {
843
			return tasks[i].lastAccess.Load() < tasks[j].lastAccess.Load()
844
		})
845
		for _, task := range tasks {
846
			task.MarkReclaim()
847
			markedTasks = append(markedTasks, PeerTaskMetadata{task.PeerID, task.TaskID})
848
			logger.Infof("quota threshold reached, mark task %s/%s reclaimed, last access: %s, size: %s",
849
				task.TaskID, task.PeerID, time.Unix(0, task.lastAccess.Load()).Format(time.RFC3339Nano),
850
				units.BytesSize(float64(task.ContentLength)))
851
			bytesExceed -= task.ContentLength
852
			if bytesExceed <= 0 {
853
				break
854
			}
855
		}
856
		if bytesExceed > 0 {
857
			logger.Warnf("no enough tasks to gc, remind %d bytes", bytesExceed)
858
		}
859
	}
860

861
	for _, key := range s.markedReclaimTasks {
862
		t, ok := s.tasks.Load(key)
863
		if !ok {
864
			continue
865
		}
866
		_, span := tracer.Start(context.Background(), config.SpanPeerGC)
867
		s.tasks.Delete(key)
868

869
		if lts, ok := t.(*localTaskStore); ok {
870
			span.SetAttributes(config.AttributePeerID.String(lts.PeerID))
871
			span.SetAttributes(config.AttributeTaskID.String(lts.TaskID))
872
			s.cleanIndex(lts.TaskID, lts.PeerID)
873
		} else {
874
			task := t.(*localSubTaskStore)
875
			span.SetAttributes(config.AttributePeerID.String(task.PeerID))
876
			span.SetAttributes(config.AttributeTaskID.String(task.TaskID))
877
			s.cleanSubIndex(task.TaskID, task.PeerID)
878
		}
879

880
		if err := t.(Reclaimer).Reclaim(); err != nil {
881
			// FIXME: retry later or push to queue
882
			logger.Errorf("gc task %s/%s error: %s", key.TaskID, key.PeerID, err)
883
			span.RecordError(err)
884
			span.End()
885
			continue
886
		}
887
		logger.Infof("task %s/%s reclaimed", key.TaskID, key.PeerID)
888
		// remove reclaimed task in markedTasks
889
		for i, k := range markedTasks {
890
			if k.TaskID == key.TaskID && k.PeerID == key.PeerID {
891
				markedTasks = append(markedTasks[:i], markedTasks[i+1:]...)
892
				break
893
			}
894
		}
895
		span.End()
896
	}
897
	logger.Infof("marked %d task(s), reclaimed %d task(s)", len(markedTasks), len(s.markedReclaimTasks))
898
	s.markedReclaimTasks = markedTasks
899
	return true, nil
900
}
901

902
// delete the given task from local storage and unregister it from scheduler.
903
func (s *storageManager) deleteTask(meta PeerTaskMetadata) error {
904
	task, ok := s.LoadAndDeleteTask(meta)
905
	if !ok {
906
		logger.Infof("deleteTask: task meta not found: %v", meta)
907
		return nil
908
	}
909

910
	logger.Debugf("deleteTask: deleting task: %v", meta)
911
	if _, ok := task.(*localTaskStore); ok {
912
		s.cleanIndex(meta.TaskID, meta.PeerID)
913
	} else {
914
		s.cleanSubIndex(meta.TaskID, meta.PeerID)
915
	}
916
	// MarkReclaim() will call gcCallback, which will unregister task from scheduler
917
	task.(Reclaimer).MarkReclaim()
918
	return task.(Reclaimer).Reclaim()
919
}
920

921
func (s *storageManager) UnregisterTask(ctx context.Context, req CommonTaskRequest) error {
922
	return s.deleteTask(PeerTaskMetadata{
923
		TaskID: req.TaskID,
924
		PeerID: req.PeerID,
925
	})
926
}
927

928
func (s *storageManager) CleanUp() {
929
	_, _ = s.forceGC()
930
}
931

932
func (s *storageManager) forceGC() (bool, error) {
933
	s.tasks.Range(func(key, task any) bool {
934
		meta := key.(PeerTaskMetadata)
935
		err := s.deleteTask(meta)
936
		if err != nil {
937
			logger.Errorf("gc task store %s error: %s", key, err)
938
		}
939
		return true
940
	})
941
	return true, nil
942
}
943

944
func (s *storageManager) diskUsageExceed() (exceed bool, bytes int64) {
945
	if s.storeOption.DiskGCThresholdPercent <= 0 {
946
		return false, 0
947
	}
948
	usage, err := disk.Usage(s.storeOption.DataPath)
949
	if err != nil {
950
		logger.Warnf("get %s disk usage error: %s", s.storeOption.DataPath, err)
951
		return false, 0
952
	}
953
	logger.Debugf("disk usage: %+v", usage)
954
	if usage.UsedPercent < s.storeOption.DiskGCThresholdPercent {
955
		return false, 0
956
	}
957

958
	bs := (usage.UsedPercent - s.storeOption.DiskGCThresholdPercent) * float64(usage.Total) / 100.0
959
	logger.Infof("disk used percent %f, exceed threshold percent %f, %d bytes to reclaim",
960
		usage.UsedPercent, s.storeOption.DiskGCThresholdPercent, int64(bs))
961
	return true, int64(bs)
962
}
963

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.