1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
20
"github.com/cubefs/cubefs/blobstore/common/codemode"
21
"github.com/cubefs/cubefs/blobstore/util/errors"
25
ErrTaskPaused = errors.New("task has paused")
26
ErrTaskEmpty = errors.New("no task to run")
30
// TaskRenewalPeriodS + RenewalTimeoutS < TaskLeaseExpiredS
31
TaskRenewalPeriodS = 5 // worker alive tasks renewal period
32
RenewalTimeoutS = 1 // timeout of worker task renewal
33
TaskLeaseExpiredS = 10 // task lease duration in scheduler
39
TaskTypeDiskRepair TaskType = "disk_repair"
40
TaskTypeBalance TaskType = "balance"
41
TaskTypeDiskDrop TaskType = "disk_drop"
42
TaskTypeManualMigrate TaskType = "manual_migrate"
43
TaskTypeVolumeInspect TaskType = "volume_inspect"
44
TaskTypeShardRepair TaskType = "shard_repair"
45
TaskTypeBlobDelete TaskType = "blob_delete"
48
func (t TaskType) Valid() bool {
50
case TaskTypeDiskRepair, TaskTypeBalance, TaskTypeDiskDrop, TaskTypeManualMigrate,
51
TaskTypeVolumeInspect, TaskTypeShardRepair, TaskTypeBlobDelete:
58
func (t TaskType) String() string {
62
type VunitLocation struct {
63
Vuid Vuid `json:"vuid" bson:"vuid"`
64
Host string `json:"host" bson:"host"`
65
DiskID DiskID `json:"disk_id" bson:"disk_id"`
69
func CheckVunitLocations(locations []VunitLocation) bool {
70
if len(locations) == 0 {
74
for _, l := range locations {
75
if l.Vuid == InvalidVuid || l.Host == "" || l.DiskID == InvalidDiskID {
82
type MigrateState uint8
85
MigrateStateInited MigrateState = iota + 1
87
MigrateStateWorkCompleted
89
MigrateStateFinishedInAdvance
92
type MigrateTask struct {
93
TaskID string `json:"task_id"` // task id
94
TaskType TaskType `json:"task_type"` // task type
95
State MigrateState `json:"state"` // task state
97
SourceIDC string `json:"source_idc"` // source idc
98
SourceDiskID DiskID `json:"source_disk_id"` // source disk id
99
SourceVuid Vuid `json:"source_vuid"` // source volume unit id
101
Sources []VunitLocation `json:"sources"` // source volume units location
102
CodeMode codemode.CodeMode `json:"code_mode"` // codemode
104
Destination VunitLocation `json:"destination"` // destination volume unit location
106
Ctime string `json:"ctime"` // create time
107
MTime string `json:"mtime"` // modify time
109
FinishAdvanceReason string `json:"finish_advance_reason"`
110
// task migrate chunk direct download first,if fail will recover chunk by ec repair
111
ForbiddenDirectDownload bool `json:"forbidden_direct_download"`
113
WorkerRedoCnt uint8 `json:"worker_redo_cnt"` // worker redo task count
116
func (t *MigrateTask) Vid() Vid {
117
return t.SourceVuid.Vid()
120
func (t *MigrateTask) GetSources() []VunitLocation {
124
func (t *MigrateTask) GetDestination() VunitLocation {
128
func (t *MigrateTask) SetDestination(dest VunitLocation) {
132
func (t *MigrateTask) DestinationDiskID() DiskID {
133
return t.Destination.DiskID
136
func (t *MigrateTask) GetSourceDiskID() DiskID {
137
return t.SourceDiskID
140
func (t *MigrateTask) Running() bool {
141
return t.State == MigrateStatePrepared || t.State == MigrateStateWorkCompleted
144
func (t *MigrateTask) Copy() *MigrateTask {
145
task := &MigrateTask{}
147
dst := make([]VunitLocation, len(t.Sources))
153
func (t *MigrateTask) IsValid() bool {
154
return t.TaskType.Valid() && t.CodeMode.IsValid() &&
155
CheckVunitLocations(t.Sources) &&
156
CheckVunitLocations([]VunitLocation{t.Destination})
159
type VolumeInspectCheckPoint struct {
160
StartVid Vid `json:"start_vid"` // min vid in current batch volumes
161
Ctime string `json:"ctime"`
164
type VolumeInspectTask struct {
165
TaskID string `json:"task_id"`
166
Mode codemode.CodeMode `json:"mode"`
167
Replicas []VunitLocation `json:"replicas"`
170
func (t *VolumeInspectTask) IsValid() bool {
171
return t.Mode.IsValid() && CheckVunitLocations(t.Replicas)
174
type MissedShard struct {
175
Vuid Vuid `json:"vuid"`
176
Bid BlobID `json:"bid"`
179
type VolumeInspectRet struct {
180
TaskID string `json:"task_id"`
181
InspectErrStr string `json:"inspect_err_str"` // inspect run success or not
182
MissedShards []*MissedShard `json:"missed_shards"`
185
func (inspect *VolumeInspectRet) Err() error {
186
if len(inspect.InspectErrStr) == 0 {
189
return errors.New(inspect.InspectErrStr)
192
type ShardRepairTask struct {
193
Bid BlobID `json:"bid"`
194
CodeMode codemode.CodeMode `json:"code_mode"`
195
Sources []VunitLocation `json:"sources"`
196
BadIdxs []uint8 `json:"bad_idxs"` // TODO: BadIdxes
197
Reason string `json:"reason"`
200
func (task *ShardRepairTask) IsValid() bool {
201
return task.CodeMode.IsValid() && CheckVunitLocations(task.Sources)
204
// TaskStatistics thread-unsafe task statistics.
205
type TaskStatistics struct {
206
DoneSize uint64 `json:"done_size"`
207
DoneCount uint64 `json:"done_count"`
208
TotalSize uint64 `json:"total_size"`
209
TotalCount uint64 `json:"total_count"`
210
Progress uint64 `json:"progress"`
213
// TaskProgress migrate task running progress.
214
type TaskProgress interface {
215
Total(size, count uint64) // reset total size and count.
216
Do(size, count uint64) // update progress.
217
Done() TaskStatistics // returns newest statistics.
220
// NewTaskProgress returns thread-safe task progress.
221
func NewTaskProgress() TaskProgress {
222
return &taskProgress{}
225
type taskProgress struct {
230
func (p *taskProgress) Total(size, count uint64) {
234
st.TotalCount = count
235
if st.TotalSize == 0 {
238
st.Progress = (st.DoneSize * 100) / st.TotalSize
243
func (p *taskProgress) Do(size, count uint64) {
247
st.DoneCount += count
248
if st.TotalSize == 0 {
251
st.Progress = (st.DoneSize * 100) / st.TotalSize
256
func (p *taskProgress) Done() TaskStatistics {