cubefs

Форк
0
274 строки · 8.0 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"context"
19
	"fmt"
20
	"net/url"
21

22
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
23
	"github.com/cubefs/cubefs/blobstore/common/proto"
24
)
25

26
type AcquireArgs struct {
27
	IDC string `json:"idc"`
28
}
29

30
func (c *client) AcquireTask(ctx context.Context, args *AcquireArgs) (ret *proto.MigrateTask, err error) {
31
	err = c.request(func(host string) error {
32
		return c.GetWith(ctx, host+PathTaskAcquire+"?idc="+args.IDC, &ret)
33
	})
34
	return
35
}
36

37
type TaskRenewalArgs struct {
38
	IDC string                      `json:"idc"`
39
	IDs map[proto.TaskType][]string `json:"ids"`
40
}
41

42
type TaskRenewalRet struct {
43
	Errors map[proto.TaskType]map[string]string `json:"errors,omitempty"`
44
}
45

46
func (c *client) RenewalTask(ctx context.Context, args *TaskRenewalArgs) (ret *TaskRenewalRet, err error) {
47
	err = c.request(func(host string) error {
48
		return c.PostWith(ctx, host+PathTaskRenewal, &ret, args)
49
	})
50
	return
51
}
52

53
type TaskReportArgs struct {
54
	TaskType proto.TaskType `json:"task_type"`
55
	TaskID   string         `json:"task_id"`
56

57
	TaskStats            proto.TaskStatistics `json:"task_stats"`
58
	IncreaseDataSizeByte int                  `json:"increase_data_size_byte"`
59
	IncreaseShardCnt     int                  `json:"increase_shard_cnt"`
60
}
61

62
func (c *client) ReportTask(ctx context.Context, args *TaskReportArgs) (err error) {
63
	return c.request(func(host string) error {
64
		return c.PostWith(ctx, host+PathTaskReport, nil, args)
65
	})
66
}
67

68
// OperateTaskArgs for task action.
69
type OperateTaskArgs struct {
70
	IDC      string                `json:"idc"`
71
	TaskID   string                `json:"task_id"`
72
	TaskType proto.TaskType        `json:"task_type"`
73
	Src      []proto.VunitLocation `json:"src"`
74
	Dest     proto.VunitLocation   `json:"dest"`
75
	Reason   string                `json:"reason"`
76
}
77

78
func (c *client) ReclaimTask(ctx context.Context, args *OperateTaskArgs) (err error) {
79
	return c.request(func(host string) error {
80
		return c.PostWith(ctx, host+PathTaskReclaim, nil, args)
81
	})
82
}
83

84
func (c *client) CancelTask(ctx context.Context, args *OperateTaskArgs) (err error) {
85
	return c.request(func(host string) error {
86
		return c.PostWith(ctx, host+PathTaskCancel, nil, args)
87
	})
88
}
89

90
func (c *client) CompleteTask(ctx context.Context, args *OperateTaskArgs) (err error) {
91
	return c.request(func(host string) error {
92
		return c.PostWith(ctx, host+PathTaskComplete, nil, args)
93
	})
94
}
95

96
func (c *client) AcquireInspectTask(ctx context.Context) (ret *proto.VolumeInspectTask, err error) {
97
	err = c.request(func(host string) error {
98
		return c.GetWith(ctx, host+PathInspectAcquire, &ret)
99
	})
100
	return
101
}
102

103
func (c *client) CompleteInspectTask(ctx context.Context, args *proto.VolumeInspectRet) (err error) {
104
	return c.request(func(host string) error {
105
		return c.PostWith(ctx, host+PathInspectComplete, nil, args)
106
	})
107
}
108

109
type AddManualMigrateArgs struct {
110
	Vuid           proto.Vuid `json:"vuid"`
111
	DirectDownload bool       `json:"direct_download"`
112
}
113

114
func (args *AddManualMigrateArgs) Valid() bool {
115
	return args.Vuid.IsValid()
116
}
117

118
func (c *client) AddManualMigrateTask(ctx context.Context, args *AddManualMigrateArgs) (err error) {
119
	return c.request(func(host string) error {
120
		return c.PostWith(ctx, host+PathManualMigrateTaskAdd, nil, args)
121
	})
122
}
123

124
// MigrateTaskDetailArgs migrate task detail args.
125
type MigrateTaskDetailArgs struct {
126
	Type proto.TaskType `json:"type"`
127
	ID   string         `json:"id"`
128
}
129

130
// MigrateTaskDetail migrate task detail.
131
type MigrateTaskDetail struct {
132
	Task proto.MigrateTask    `json:"task"`
133
	Stat proto.TaskStatistics `json:"stat"`
134
}
135

136
type PerMinStats struct {
137
	FinishedCnt    string `json:"finished_cnt"`
138
	ShardCnt       string `json:"shard_cnt"`
139
	DataAmountByte string `json:"data_amount_byte"`
140
}
141

142
type DiskRepairTasksStat struct {
143
	Enable           bool           `json:"enable"`
144
	RepairingDisks   []proto.DiskID `json:"repairing_disks"`
145
	TotalTasksCnt    int            `json:"total_tasks_cnt"`
146
	RepairedTasksCnt int            `json:"repaired_tasks_cnt"`
147
	MigrateTasksStat
148
}
149

150
type MigrateTasksStat struct {
151
	PreparingCnt   int         `json:"preparing_cnt"`
152
	WorkerDoingCnt int         `json:"worker_doing_cnt"`
153
	FinishingCnt   int         `json:"finishing_cnt"`
154
	StatsPerMin    PerMinStats `json:"stats_per_min"`
155
}
156

157
type DiskDropTasksStat struct {
158
	Enable          bool           `json:"enable"`
159
	DroppingDisks   []proto.DiskID `json:"dropping_disks"`
160
	TotalTasksCnt   int            `json:"total_tasks_cnt"`
161
	DroppedTasksCnt int            `json:"dropped_tasks_cnt"`
162
	MigrateTasksStat
163
}
164

165
type BalanceTasksStat struct {
166
	Enable bool `json:"enable"`
167
	MigrateTasksStat
168
}
169

170
type ManualMigrateTasksStat struct {
171
	MigrateTasksStat
172
}
173

174
type VolumeInspectTasksStat struct {
175
	Enable         bool   `json:"enable"`
176
	FinishedPerMin string `json:"finished_per_min"`
177
	TimeOutPerMin  string `json:"time_out_per_min"`
178
}
179

180
// RunnerStat shard repair and blob delete stat
181
type RunnerStat struct {
182
	Enable        bool     `json:"enable"`
183
	SuccessPerMin string   `json:"success_per_min"`
184
	FailedPerMin  string   `json:"failed_per_min"`
185
	TotalErrCnt   uint64   `json:"total_err_cnt"`
186
	ErrStats      []string `json:"err_stats"`
187
}
188

189
type TasksStat struct {
190
	DiskRepair    *DiskRepairTasksStat    `json:"disk_repair,omitempty"`
191
	DiskDrop      *DiskDropTasksStat      `json:"disk_drop,omitempty"`
192
	Balance       *BalanceTasksStat       `json:"balance,omitempty"`
193
	ManualMigrate *ManualMigrateTasksStat `json:"manual_migrate,omitempty"`
194
	VolumeInspect *VolumeInspectTasksStat `json:"volume_inspect,omitempty"`
195
	ShardRepair   *RunnerStat             `json:"shard_repair"`
196
	BlobDelete    *RunnerStat             `json:"blob_delete"`
197
}
198

199
func (c *client) DetailMigrateTask(ctx context.Context, args *MigrateTaskDetailArgs) (detail MigrateTaskDetail, err error) {
200
	if args == nil || !args.Type.Valid() {
201
		err = errcode.ErrIllegalArguments
202
		return
203
	}
204
	err = c.request(func(host string) error {
205
		path := fmt.Sprintf("%s%s/%s/%s", host, PathTaskDetail, args.Type, args.ID)
206
		return c.GetWith(ctx, path, &detail)
207
	})
208
	return
209
}
210

211
func (c *client) Stats(ctx context.Context, host string) (ret TasksStat, err error) {
212
	err = c.GetWith(ctx, hostWithScheme(host)+PathStats, &ret)
213
	return
214
}
215

216
func (c *client) LeaderStats(ctx context.Context) (ret TasksStat, err error) {
217
	err = c.request(func(host string) error {
218
		return c.GetWith(ctx, host+PathStatsLeader, &ret)
219
	})
220
	return
221
}
222

223
type DiskMigratingStatsArgs struct {
224
	TaskType proto.TaskType `json:"task_type"`
225
	DiskID   proto.DiskID   `json:"disk_id"`
226
}
227

228
type DiskMigratingStats struct {
229
	TotalTasksCnt    int `json:"total_tasks_cnt"`
230
	MigratedTasksCnt int `json:"migrated_tasks_cnt"`
231
}
232

233
func (c *client) DiskMigratingStats(ctx context.Context, args *DiskMigratingStatsArgs) (ret *DiskMigratingStats, err error) {
234
	if args == nil || !args.TaskType.Valid() {
235
		err = errcode.ErrIllegalArguments
236
		return
237
	}
238
	err = c.request(func(host string) error {
239
		path := host + PathStatsDiskMigrating + fmt.Sprintf("?task_type=%s&disk_id=%d", args.TaskType, args.DiskID)
240
		return c.GetWith(ctx, path, &ret)
241
	})
242
	return
243
}
244

245
func (c *client) selectHost() ([]string, error) {
246
	hosts := c.selector.GetRandomN(c.hostRetry)
247
	if len(hosts) == 0 {
248
		return nil, errNoServiceAvailable
249
	}
250
	return hosts, nil
251
}
252

253
func (c *client) request(req func(host string) error) (err error) {
254
	var hosts []string
255
	hosts, err = c.selectHost()
256
	if err != nil {
257
		return err
258
	}
259

260
	for _, host := range hosts {
261
		if err = req(host); err == nil {
262
			return err
263
		}
264
	}
265
	return err
266
}
267

268
func hostWithScheme(host string) string {
269
	u, err := url.Parse(host)
270
	if err == nil && u.Scheme != "" && u.Host != "" {
271
		return host
272
	}
273
	return "http://" + host
274
}
275

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.