cubefs

Форк
0
/
scheduler.go 
261 строка · 6.7 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package proto
16

17
import (
18
	"sync"
19

20
	"github.com/cubefs/cubefs/blobstore/common/codemode"
21
	"github.com/cubefs/cubefs/blobstore/util/errors"
22
)
23

24
var (
25
	ErrTaskPaused = errors.New("task has paused")
26
	ErrTaskEmpty  = errors.New("no task to run")
27
)
28

29
const (
30
	// TaskRenewalPeriodS + RenewalTimeoutS < TaskLeaseExpiredS
31
	TaskRenewalPeriodS = 5  // worker alive tasks  renewal period
32
	RenewalTimeoutS    = 1  // timeout of worker task renewal
33
	TaskLeaseExpiredS  = 10 // task lease duration in scheduler
34
)
35

36
type TaskType string
37

38
const (
39
	TaskTypeDiskRepair    TaskType = "disk_repair"
40
	TaskTypeBalance       TaskType = "balance"
41
	TaskTypeDiskDrop      TaskType = "disk_drop"
42
	TaskTypeManualMigrate TaskType = "manual_migrate"
43
	TaskTypeVolumeInspect TaskType = "volume_inspect"
44
	TaskTypeShardRepair   TaskType = "shard_repair"
45
	TaskTypeBlobDelete    TaskType = "blob_delete"
46
)
47

48
func (t TaskType) Valid() bool {
49
	switch t {
50
	case TaskTypeDiskRepair, TaskTypeBalance, TaskTypeDiskDrop, TaskTypeManualMigrate,
51
		TaskTypeVolumeInspect, TaskTypeShardRepair, TaskTypeBlobDelete:
52
		return true
53
	default:
54
		return false
55
	}
56
}
57

58
func (t TaskType) String() string {
59
	return string(t)
60
}
61

62
type VunitLocation struct {
63
	Vuid   Vuid   `json:"vuid" bson:"vuid"`
64
	Host   string `json:"host" bson:"host"`
65
	DiskID DiskID `json:"disk_id" bson:"disk_id"`
66
}
67

68
// for task check
69
func CheckVunitLocations(locations []VunitLocation) bool {
70
	if len(locations) == 0 {
71
		return false
72
	}
73

74
	for _, l := range locations {
75
		if l.Vuid == InvalidVuid || l.Host == "" || l.DiskID == InvalidDiskID {
76
			return false
77
		}
78
	}
79
	return true
80
}
81

82
type MigrateState uint8
83

84
const (
85
	MigrateStateInited MigrateState = iota + 1
86
	MigrateStatePrepared
87
	MigrateStateWorkCompleted
88
	MigrateStateFinished
89
	MigrateStateFinishedInAdvance
90
)
91

92
type MigrateTask struct {
93
	TaskID   string       `json:"task_id"`   // task id
94
	TaskType TaskType     `json:"task_type"` // task type
95
	State    MigrateState `json:"state"`     // task state
96

97
	SourceIDC    string `json:"source_idc"`     // source idc
98
	SourceDiskID DiskID `json:"source_disk_id"` // source disk id
99
	SourceVuid   Vuid   `json:"source_vuid"`    // source volume unit id
100

101
	Sources  []VunitLocation   `json:"sources"`   // source volume units location
102
	CodeMode codemode.CodeMode `json:"code_mode"` // codemode
103

104
	Destination VunitLocation `json:"destination"` // destination volume unit location
105

106
	Ctime string `json:"ctime"` // create time
107
	MTime string `json:"mtime"` // modify time
108

109
	FinishAdvanceReason string `json:"finish_advance_reason"`
110
	// task migrate chunk direct download first,if fail will recover chunk by ec repair
111
	ForbiddenDirectDownload bool `json:"forbidden_direct_download"`
112

113
	WorkerRedoCnt uint8 `json:"worker_redo_cnt"` // worker redo task count
114
}
115

116
func (t *MigrateTask) Vid() Vid {
117
	return t.SourceVuid.Vid()
118
}
119

120
func (t *MigrateTask) GetSources() []VunitLocation {
121
	return t.Sources
122
}
123

124
func (t *MigrateTask) GetDestination() VunitLocation {
125
	return t.Destination
126
}
127

128
func (t *MigrateTask) SetDestination(dest VunitLocation) {
129
	t.Destination = dest
130
}
131

132
func (t *MigrateTask) DestinationDiskID() DiskID {
133
	return t.Destination.DiskID
134
}
135

136
func (t *MigrateTask) GetSourceDiskID() DiskID {
137
	return t.SourceDiskID
138
}
139

140
func (t *MigrateTask) Running() bool {
141
	return t.State == MigrateStatePrepared || t.State == MigrateStateWorkCompleted
142
}
143

144
func (t *MigrateTask) Copy() *MigrateTask {
145
	task := &MigrateTask{}
146
	*task = *t
147
	dst := make([]VunitLocation, len(t.Sources))
148
	copy(dst, t.Sources)
149
	task.Sources = dst
150
	return task
151
}
152

153
func (t *MigrateTask) IsValid() bool {
154
	return t.TaskType.Valid() && t.CodeMode.IsValid() &&
155
		CheckVunitLocations(t.Sources) &&
156
		CheckVunitLocations([]VunitLocation{t.Destination})
157
}
158

159
type VolumeInspectCheckPoint struct {
160
	StartVid Vid    `json:"start_vid"` // min vid in current batch volumes
161
	Ctime    string `json:"ctime"`
162
}
163

164
type VolumeInspectTask struct {
165
	TaskID   string            `json:"task_id"`
166
	Mode     codemode.CodeMode `json:"mode"`
167
	Replicas []VunitLocation   `json:"replicas"`
168
}
169

170
func (t *VolumeInspectTask) IsValid() bool {
171
	return t.Mode.IsValid() && CheckVunitLocations(t.Replicas)
172
}
173

174
type MissedShard struct {
175
	Vuid Vuid   `json:"vuid"`
176
	Bid  BlobID `json:"bid"`
177
}
178

179
type VolumeInspectRet struct {
180
	TaskID        string         `json:"task_id"`
181
	InspectErrStr string         `json:"inspect_err_str"` // inspect run success or not
182
	MissedShards  []*MissedShard `json:"missed_shards"`
183
}
184

185
func (inspect *VolumeInspectRet) Err() error {
186
	if len(inspect.InspectErrStr) == 0 {
187
		return nil
188
	}
189
	return errors.New(inspect.InspectErrStr)
190
}
191

192
type ShardRepairTask struct {
193
	Bid      BlobID            `json:"bid"`
194
	CodeMode codemode.CodeMode `json:"code_mode"`
195
	Sources  []VunitLocation   `json:"sources"`
196
	BadIdxs  []uint8           `json:"bad_idxs"` // TODO: BadIdxes
197
	Reason   string            `json:"reason"`
198
}
199

200
func (task *ShardRepairTask) IsValid() bool {
201
	return task.CodeMode.IsValid() && CheckVunitLocations(task.Sources)
202
}
203

204
// TaskStatistics thread-unsafe task statistics.
205
type TaskStatistics struct {
206
	DoneSize   uint64 `json:"done_size"`
207
	DoneCount  uint64 `json:"done_count"`
208
	TotalSize  uint64 `json:"total_size"`
209
	TotalCount uint64 `json:"total_count"`
210
	Progress   uint64 `json:"progress"`
211
}
212

213
// TaskProgress migrate task running progress.
214
type TaskProgress interface {
215
	Total(size, count uint64) // reset total size and count.
216
	Do(size, count uint64)    // update progress.
217
	Done() TaskStatistics     // returns newest statistics.
218
}
219

220
// NewTaskProgress returns thread-safe task progress.
221
func NewTaskProgress() TaskProgress {
222
	return &taskProgress{}
223
}
224

225
type taskProgress struct {
226
	mu sync.Mutex
227
	st TaskStatistics
228
}
229

230
func (p *taskProgress) Total(size, count uint64) {
231
	p.mu.Lock()
232
	st := &p.st
233
	st.TotalSize = size
234
	st.TotalCount = count
235
	if st.TotalSize == 0 {
236
		st.Progress = 100
237
	} else {
238
		st.Progress = (st.DoneSize * 100) / st.TotalSize
239
	}
240
	p.mu.Unlock()
241
}
242

243
func (p *taskProgress) Do(size, count uint64) {
244
	p.mu.Lock()
245
	st := &p.st
246
	st.DoneSize += size
247
	st.DoneCount += count
248
	if st.TotalSize == 0 {
249
		st.Progress = 100
250
	} else {
251
		st.Progress = (st.DoneSize * 100) / st.TotalSize
252
	}
253
	p.mu.Unlock()
254
}
255

256
func (p *taskProgress) Done() TaskStatistics {
257
	p.mu.Lock()
258
	st := p.st
259
	p.mu.Unlock()
260
	return st
261
}
262

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.