Dragonfly2

Форк
0
/
task_manager.go 
134 строки · 3.2 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination task_manager_mock.go -source task_manager.go -package resource
18

19
package resource
20

21
import (
22
	"sync"
23

24
	pkggc "d7y.io/dragonfly/v2/pkg/gc"
25
	"d7y.io/dragonfly/v2/scheduler/config"
26
)
27

28
const (
29
	// GC task id.
30
	GCTaskID = "task"
31
)
32

33
// TaskManager is the interface used for task manager.
34
type TaskManager interface {
35
	// Load returns task for a key.
36
	Load(string) (*Task, bool)
37

38
	// Store sets task.
39
	Store(*Task)
40

41
	// LoadOrStore returns task the key if present.
42
	// Otherwise, it stores and returns the given task.
43
	// The loaded result is true if the task was loaded, false if stored.
44
	LoadOrStore(*Task) (*Task, bool)
45

46
	// Delete deletes task for a key.
47
	Delete(string)
48

49
	// Range calls f sequentially for each key and value present in the map.
50
	// If f returns false, range stops the iteration.
51
	Range(f func(any, any) bool)
52

53
	// Try to reclaim task.
54
	RunGC() error
55
}
56

57
// taskManager contains content for task manager.
58
type taskManager struct {
59
	// Task sync map.
60
	*sync.Map
61
}
62

63
// New task manager interface.
64
func newTaskManager(cfg *config.GCConfig, gc pkggc.GC) (TaskManager, error) {
65
	t := &taskManager{
66
		Map: &sync.Map{},
67
	}
68

69
	if err := gc.Add(pkggc.Task{
70
		ID:       GCTaskID,
71
		Interval: cfg.TaskGCInterval,
72
		Timeout:  cfg.TaskGCInterval,
73
		Runner:   t,
74
	}); err != nil {
75
		return nil, err
76
	}
77

78
	return t, nil
79
}
80

81
// Load returns task for a key.
82
func (t *taskManager) Load(key string) (*Task, bool) {
83
	rawTask, loaded := t.Map.Load(key)
84
	if !loaded {
85
		return nil, false
86
	}
87

88
	return rawTask.(*Task), loaded
89
}
90

91
// Store sets task.
92
func (t *taskManager) Store(task *Task) {
93
	t.Map.Store(task.ID, task)
94
}
95

96
// LoadOrStore returns task the key if present.
97
// Otherwise, it stores and returns the given task.
98
// The loaded result is true if the task was loaded, false if stored.
99
func (t *taskManager) LoadOrStore(task *Task) (*Task, bool) {
100
	rawTask, loaded := t.Map.LoadOrStore(task.ID, task)
101
	return rawTask.(*Task), loaded
102
}
103

104
// Delete deletes task for a key.
105
func (t *taskManager) Delete(key string) {
106
	t.Map.Delete(key)
107
}
108

109
// Range calls f sequentially for each key and value present in the map.
110
// If f returns false, range stops the iteration.
111
func (t *taskManager) Range(f func(key, value any) bool) {
112
	t.Map.Range(f)
113
}
114

115
// Try to reclaim task.
116
func (t *taskManager) RunGC() error {
117
	t.Map.Range(func(_, value any) bool {
118
		task, ok := value.(*Task)
119
		if !ok {
120
			task.Log.Error("invalid task")
121
			return true
122
		}
123

124
		// If there is no peer then task will be reclaimed.
125
		if task.PeerCount() == 0 {
126
			task.Log.Info("task has been reclaimed")
127
			t.Delete(task.ID)
128
		}
129

130
		return true
131
	})
132

133
	return nil
134
}
135

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.