1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
errcode "github.com/cubefs/cubefs/blobstore/common/errors"
23
"github.com/cubefs/cubefs/blobstore/common/proto"
26
type AcquireArgs struct {
27
IDC string `json:"idc"`
30
func (c *client) AcquireTask(ctx context.Context, args *AcquireArgs) (ret *proto.MigrateTask, err error) {
31
err = c.request(func(host string) error {
32
return c.GetWith(ctx, host+PathTaskAcquire+"?idc="+args.IDC, &ret)
37
type TaskRenewalArgs struct {
38
IDC string `json:"idc"`
39
IDs map[proto.TaskType][]string `json:"ids"`
42
type TaskRenewalRet struct {
43
Errors map[proto.TaskType]map[string]string `json:"errors,omitempty"`
46
func (c *client) RenewalTask(ctx context.Context, args *TaskRenewalArgs) (ret *TaskRenewalRet, err error) {
47
err = c.request(func(host string) error {
48
return c.PostWith(ctx, host+PathTaskRenewal, &ret, args)
53
type TaskReportArgs struct {
54
TaskType proto.TaskType `json:"task_type"`
55
TaskID string `json:"task_id"`
57
TaskStats proto.TaskStatistics `json:"task_stats"`
58
IncreaseDataSizeByte int `json:"increase_data_size_byte"`
59
IncreaseShardCnt int `json:"increase_shard_cnt"`
62
func (c *client) ReportTask(ctx context.Context, args *TaskReportArgs) (err error) {
63
return c.request(func(host string) error {
64
return c.PostWith(ctx, host+PathTaskReport, nil, args)
68
// OperateTaskArgs for task action.
69
type OperateTaskArgs struct {
70
IDC string `json:"idc"`
71
TaskID string `json:"task_id"`
72
TaskType proto.TaskType `json:"task_type"`
73
Src []proto.VunitLocation `json:"src"`
74
Dest proto.VunitLocation `json:"dest"`
75
Reason string `json:"reason"`
78
func (c *client) ReclaimTask(ctx context.Context, args *OperateTaskArgs) (err error) {
79
return c.request(func(host string) error {
80
return c.PostWith(ctx, host+PathTaskReclaim, nil, args)
84
func (c *client) CancelTask(ctx context.Context, args *OperateTaskArgs) (err error) {
85
return c.request(func(host string) error {
86
return c.PostWith(ctx, host+PathTaskCancel, nil, args)
90
func (c *client) CompleteTask(ctx context.Context, args *OperateTaskArgs) (err error) {
91
return c.request(func(host string) error {
92
return c.PostWith(ctx, host+PathTaskComplete, nil, args)
96
func (c *client) AcquireInspectTask(ctx context.Context) (ret *proto.VolumeInspectTask, err error) {
97
err = c.request(func(host string) error {
98
return c.GetWith(ctx, host+PathInspectAcquire, &ret)
103
func (c *client) CompleteInspectTask(ctx context.Context, args *proto.VolumeInspectRet) (err error) {
104
return c.request(func(host string) error {
105
return c.PostWith(ctx, host+PathInspectComplete, nil, args)
109
type AddManualMigrateArgs struct {
110
Vuid proto.Vuid `json:"vuid"`
111
DirectDownload bool `json:"direct_download"`
114
func (args *AddManualMigrateArgs) Valid() bool {
115
return args.Vuid.IsValid()
118
func (c *client) AddManualMigrateTask(ctx context.Context, args *AddManualMigrateArgs) (err error) {
119
return c.request(func(host string) error {
120
return c.PostWith(ctx, host+PathManualMigrateTaskAdd, nil, args)
124
// MigrateTaskDetailArgs migrate task detail args.
125
type MigrateTaskDetailArgs struct {
126
Type proto.TaskType `json:"type"`
127
ID string `json:"id"`
130
// MigrateTaskDetail migrate task detail.
131
type MigrateTaskDetail struct {
132
Task proto.MigrateTask `json:"task"`
133
Stat proto.TaskStatistics `json:"stat"`
136
type PerMinStats struct {
137
FinishedCnt string `json:"finished_cnt"`
138
ShardCnt string `json:"shard_cnt"`
139
DataAmountByte string `json:"data_amount_byte"`
142
type DiskRepairTasksStat struct {
143
Enable bool `json:"enable"`
144
RepairingDisks []proto.DiskID `json:"repairing_disks"`
145
TotalTasksCnt int `json:"total_tasks_cnt"`
146
RepairedTasksCnt int `json:"repaired_tasks_cnt"`
150
type MigrateTasksStat struct {
151
PreparingCnt int `json:"preparing_cnt"`
152
WorkerDoingCnt int `json:"worker_doing_cnt"`
153
FinishingCnt int `json:"finishing_cnt"`
154
StatsPerMin PerMinStats `json:"stats_per_min"`
157
type DiskDropTasksStat struct {
158
Enable bool `json:"enable"`
159
DroppingDisks []proto.DiskID `json:"dropping_disks"`
160
TotalTasksCnt int `json:"total_tasks_cnt"`
161
DroppedTasksCnt int `json:"dropped_tasks_cnt"`
165
type BalanceTasksStat struct {
166
Enable bool `json:"enable"`
170
type ManualMigrateTasksStat struct {
174
type VolumeInspectTasksStat struct {
175
Enable bool `json:"enable"`
176
FinishedPerMin string `json:"finished_per_min"`
177
TimeOutPerMin string `json:"time_out_per_min"`
180
// RunnerStat shard repair and blob delete stat
181
type RunnerStat struct {
182
Enable bool `json:"enable"`
183
SuccessPerMin string `json:"success_per_min"`
184
FailedPerMin string `json:"failed_per_min"`
185
TotalErrCnt uint64 `json:"total_err_cnt"`
186
ErrStats []string `json:"err_stats"`
189
type TasksStat struct {
190
DiskRepair *DiskRepairTasksStat `json:"disk_repair,omitempty"`
191
DiskDrop *DiskDropTasksStat `json:"disk_drop,omitempty"`
192
Balance *BalanceTasksStat `json:"balance,omitempty"`
193
ManualMigrate *ManualMigrateTasksStat `json:"manual_migrate,omitempty"`
194
VolumeInspect *VolumeInspectTasksStat `json:"volume_inspect,omitempty"`
195
ShardRepair *RunnerStat `json:"shard_repair"`
196
BlobDelete *RunnerStat `json:"blob_delete"`
199
func (c *client) DetailMigrateTask(ctx context.Context, args *MigrateTaskDetailArgs) (detail MigrateTaskDetail, err error) {
200
if args == nil || !args.Type.Valid() {
201
err = errcode.ErrIllegalArguments
204
err = c.request(func(host string) error {
205
path := fmt.Sprintf("%s%s/%s/%s", host, PathTaskDetail, args.Type, args.ID)
206
return c.GetWith(ctx, path, &detail)
211
func (c *client) Stats(ctx context.Context, host string) (ret TasksStat, err error) {
212
err = c.GetWith(ctx, hostWithScheme(host)+PathStats, &ret)
216
func (c *client) LeaderStats(ctx context.Context) (ret TasksStat, err error) {
217
err = c.request(func(host string) error {
218
return c.GetWith(ctx, host+PathStatsLeader, &ret)
223
type DiskMigratingStatsArgs struct {
224
TaskType proto.TaskType `json:"task_type"`
225
DiskID proto.DiskID `json:"disk_id"`
228
type DiskMigratingStats struct {
229
TotalTasksCnt int `json:"total_tasks_cnt"`
230
MigratedTasksCnt int `json:"migrated_tasks_cnt"`
233
func (c *client) DiskMigratingStats(ctx context.Context, args *DiskMigratingStatsArgs) (ret *DiskMigratingStats, err error) {
234
if args == nil || !args.TaskType.Valid() {
235
err = errcode.ErrIllegalArguments
238
err = c.request(func(host string) error {
239
path := host + PathStatsDiskMigrating + fmt.Sprintf("?task_type=%s&disk_id=%d", args.TaskType, args.DiskID)
240
return c.GetWith(ctx, path, &ret)
245
func (c *client) selectHost() ([]string, error) {
246
hosts := c.selector.GetRandomN(c.hostRetry)
248
return nil, errNoServiceAvailable
253
func (c *client) request(req func(host string) error) (err error) {
255
hosts, err = c.selectHost()
260
for _, host := range hosts {
261
if err = req(host); err == nil {
268
func hostWithScheme(host string) string {
269
u, err := url.Parse(host)
270
if err == nil && u.Scheme != "" && u.Host != "" {
273
return "http://" + host