Dragonfly2
134 строки · 3.2 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination task_manager_mock.go -source task_manager.go -package resource
18
19package resource
20
21import (
22"sync"
23
24pkggc "d7y.io/dragonfly/v2/pkg/gc"
25"d7y.io/dragonfly/v2/scheduler/config"
26)
27
28const (
29// GC task id.
30GCTaskID = "task"
31)
32
33// TaskManager is the interface used for task manager.
34type TaskManager interface {
35// Load returns task for a key.
36Load(string) (*Task, bool)
37
38// Store sets task.
39Store(*Task)
40
41// LoadOrStore returns task the key if present.
42// Otherwise, it stores and returns the given task.
43// The loaded result is true if the task was loaded, false if stored.
44LoadOrStore(*Task) (*Task, bool)
45
46// Delete deletes task for a key.
47Delete(string)
48
49// Range calls f sequentially for each key and value present in the map.
50// If f returns false, range stops the iteration.
51Range(f func(any, any) bool)
52
53// Try to reclaim task.
54RunGC() error
55}
56
57// taskManager contains content for task manager.
58type taskManager struct {
59// Task sync map.
60*sync.Map
61}
62
63// New task manager interface.
64func newTaskManager(cfg *config.GCConfig, gc pkggc.GC) (TaskManager, error) {
65t := &taskManager{
66Map: &sync.Map{},
67}
68
69if err := gc.Add(pkggc.Task{
70ID: GCTaskID,
71Interval: cfg.TaskGCInterval,
72Timeout: cfg.TaskGCInterval,
73Runner: t,
74}); err != nil {
75return nil, err
76}
77
78return t, nil
79}
80
81// Load returns task for a key.
82func (t *taskManager) Load(key string) (*Task, bool) {
83rawTask, loaded := t.Map.Load(key)
84if !loaded {
85return nil, false
86}
87
88return rawTask.(*Task), loaded
89}
90
91// Store sets task.
92func (t *taskManager) Store(task *Task) {
93t.Map.Store(task.ID, task)
94}
95
96// LoadOrStore returns task the key if present.
97// Otherwise, it stores and returns the given task.
98// The loaded result is true if the task was loaded, false if stored.
99func (t *taskManager) LoadOrStore(task *Task) (*Task, bool) {
100rawTask, loaded := t.Map.LoadOrStore(task.ID, task)
101return rawTask.(*Task), loaded
102}
103
104// Delete deletes task for a key.
105func (t *taskManager) Delete(key string) {
106t.Map.Delete(key)
107}
108
109// Range calls f sequentially for each key and value present in the map.
110// If f returns false, range stops the iteration.
111func (t *taskManager) Range(f func(key, value any) bool) {
112t.Map.Range(f)
113}
114
115// Try to reclaim task.
116func (t *taskManager) RunGC() error {
117t.Map.Range(func(_, value any) bool {
118task, ok := value.(*Task)
119if !ok {
120task.Log.Error("invalid task")
121return true
122}
123
124// If there is no peer then task will be reclaimed.
125if task.PeerCount() == 0 {
126task.Log.Info("task has been reclaimed")
127t.Delete(task.ID)
128}
129
130return true
131})
132
133return nil
134}
135