cubefs

Форк
0
429 строк · 11.3 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package scheduler
16

17
import (
18
	"fmt"
19
	"math/rand"
20
	"net/http"
21

22
	api "github.com/cubefs/cubefs/blobstore/api/scheduler"
23
	errcode "github.com/cubefs/cubefs/blobstore/common/errors"
24
	"github.com/cubefs/cubefs/blobstore/common/proto"
25
	"github.com/cubefs/cubefs/blobstore/common/rpc"
26
	"github.com/cubefs/cubefs/blobstore/common/trace"
27
	"github.com/cubefs/cubefs/blobstore/scheduler/base"
28
	"github.com/cubefs/cubefs/blobstore/scheduler/client"
29
	"github.com/cubefs/cubefs/blobstore/util/task"
30
)
31

32
var errIllegalTaskType = rpc.NewError(http.StatusBadRequest, "illegal_type", errcode.ErrIllegalTaskType)
33

34
// Service rpc service
35
type Service struct {
36
	ClusterID     proto.ClusterID
37
	leader        bool
38
	leaderHost    string
39
	followerHosts []string
40

41
	balanceMgr    Migrator
42
	diskDropMgr   IDisKMigrator
43
	diskRepairMgr IDisKMigrator
44
	manualMigMgr  IManualMigrator
45
	inspectMgr    IVolumeInspector
46

47
	shardRepairMgr  ITaskRunner
48
	blobDeleteMgr   ITaskRunner
49
	clusterTopology IClusterTopology
50
	volumeUpdater   client.IVolumeUpdater
51
	kafkaMonitors   []*base.KafkaTopicMonitor
52

53
	clusterMgrCli client.ClusterMgrAPI
54
}
55

56
func (svr *Service) mgrByType(typ proto.TaskType) (Migrator, error) {
57
	switch typ {
58
	case proto.TaskTypeDiskRepair:
59
		return svr.diskRepairMgr, nil
60
	case proto.TaskTypeBalance:
61
		return svr.balanceMgr, nil
62
	case proto.TaskTypeDiskDrop:
63
		return svr.diskDropMgr, nil
64
	case proto.TaskTypeManualMigrate:
65
		return svr.manualMigMgr, nil
66
	default:
67
		return nil, errIllegalTaskType
68
	}
69
}
70

71
func (svr *Service) diskMgrByType(typ proto.TaskType) (IDisKMigrator, error) {
72
	switch typ {
73
	case proto.TaskTypeDiskDrop:
74
		return svr.diskDropMgr, nil
75
	case proto.TaskTypeDiskRepair:
76
		return svr.diskRepairMgr, nil
77
	default:
78
		return nil, errIllegalTaskType
79
	}
80
}
81

82
// HTTPTaskAcquire acquire task
83
func (svr *Service) HTTPTaskAcquire(c *rpc.Context) {
84
	args := new(api.AcquireArgs)
85
	if err := c.ParseArgs(args); err != nil {
86
		c.RespondError(err)
87
		return
88
	}
89

90
	// acquire task ordered: returns disk repair task first and other random
91
	ctx := c.Request.Context()
92
	migrators := []Migrator{svr.diskRepairMgr, svr.manualMigMgr, svr.diskDropMgr, svr.balanceMgr}
93
	shuffledMigrators := migrators[1:]
94
	rand.Shuffle(len(shuffledMigrators), func(i, j int) {
95
		shuffledMigrators[i], shuffledMigrators[j] = shuffledMigrators[j], shuffledMigrators[i]
96
	})
97
	for _, acquire := range migrators {
98
		if migrateTask, err := acquire.AcquireTask(ctx, args.IDC); err == nil {
99
			c.RespondJSON(migrateTask)
100
			return
101
		}
102
	}
103
	c.RespondError(errcode.ErrNothingTodo)
104
}
105

106
// HTTPTaskReclaim reclaim task
107
func (svr *Service) HTTPTaskReclaim(c *rpc.Context) {
108
	args := new(api.OperateTaskArgs)
109
	if err := c.ParseArgs(args); err != nil {
110
		c.RespondError(err)
111
		return
112
	}
113
	if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
114
		c.RespondError(errcode.ErrIllegalArguments)
115
		return
116
	}
117
	ctx := c.Request.Context()
118
	reclaimer, err := svr.mgrByType(args.TaskType)
119
	if err != nil {
120
		c.RespondError(err)
121
		return
122
	}
123

124
	newDst, err := base.AllocVunitSafe(ctx, svr.clusterMgrCli, args.Dest.Vuid, args.Src)
125
	if err != nil {
126
		c.RespondError(err)
127
		return
128
	}
129
	c.RespondError(reclaimer.ReclaimTask(ctx, args.IDC, args.TaskID, args.Src, args.Dest, newDst))
130
}
131

132
// HTTPTaskCancel cancel task
133
func (svr *Service) HTTPTaskCancel(c *rpc.Context) {
134
	args := new(api.OperateTaskArgs)
135
	if err := c.ParseArgs(args); err != nil {
136
		c.RespondError(err)
137
		return
138
	}
139
	if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
140
		c.RespondError(errcode.ErrIllegalArguments)
141
		return
142
	}
143
	ctx := c.Request.Context()
144
	canceler, err := svr.mgrByType(args.TaskType)
145
	if err != nil {
146
		c.RespondError(err)
147
		return
148
	}
149
	c.RespondError(canceler.CancelTask(ctx, args))
150
}
151

152
// HTTPTaskComplete complete task
153
func (svr *Service) HTTPTaskComplete(c *rpc.Context) {
154
	args := new(api.OperateTaskArgs)
155
	if err := c.ParseArgs(args); err != nil {
156
		c.RespondError(err)
157
		return
158
	}
159
	if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
160
		c.RespondError(errcode.ErrIllegalArguments)
161
		return
162
	}
163
	ctx := c.Request.Context()
164
	completer, err := svr.mgrByType(args.TaskType)
165
	if err != nil {
166
		c.RespondError(err)
167
		return
168
	}
169
	c.RespondError(completer.CompleteTask(ctx, args))
170
}
171

172
// HTTPInspectAcquire acquire inspect task
173
func (svr *Service) HTTPInspectAcquire(c *rpc.Context) {
174
	ctx := c.Request.Context()
175

176
	task, _ := svr.inspectMgr.AcquireInspect(ctx)
177
	if task != nil {
178
		c.RespondJSON(task)
179
		return
180
	}
181
	c.RespondError(errcode.ErrNothingTodo)
182
}
183

184
// HTTPInspectComplete complete inspect task
185
func (svr *Service) HTTPInspectComplete(c *rpc.Context) {
186
	args := new(proto.VolumeInspectRet)
187
	if err := c.ParseArgs(args); err != nil {
188
		c.RespondError(err)
189
		return
190
	}
191

192
	ctx := c.Request.Context()
193
	svr.inspectMgr.CompleteInspect(ctx, args)
194
	c.Respond()
195
}
196

197
// HTTPTaskRenewal renewal task
198
func (svr *Service) HTTPTaskRenewal(c *rpc.Context) {
199
	args := new(api.TaskRenewalArgs)
200
	if err := c.ParseArgs(args); err != nil {
201
		c.RespondError(err)
202
		return
203
	}
204

205
	ctx := c.Request.Context()
206
	typeErrors := make(map[proto.TaskType]map[string]string)
207
	for typ, ids := range args.IDs {
208
		renewaler, err := svr.mgrByType(typ)
209
		if err != nil {
210
			c.RespondError(err)
211
			return
212
		}
213

214
		errors := make(map[string]string)
215
		for _, id := range ids {
216
			if !client.ValidMigrateTask(typ, id) {
217
				errors[id] = errcode.ErrIllegalArguments.Error()
218
				continue
219
			}
220
			if err := renewaler.RenewalTask(ctx, args.IDC, id); err != nil {
221
				errors[id] = err.Error()
222
			}
223
		}
224

225
		if len(errors) > 0 {
226
			typeErrors[typ] = errors
227
		}
228
	}
229
	c.RespondJSON(api.TaskRenewalRet{Errors: typeErrors})
230
}
231

232
// HTTPTaskReport reports task stats
233
func (svr *Service) HTTPTaskReport(c *rpc.Context) {
234
	args := new(api.TaskReportArgs)
235
	if err := c.ParseArgs(args); err != nil {
236
		c.RespondError(err)
237
		return
238
	}
239
	if !client.ValidMigrateTask(args.TaskType, args.TaskID) {
240
		c.RespondError(errcode.ErrIllegalArguments)
241
		return
242
	}
243
	reporter, err := svr.mgrByType(args.TaskType)
244
	if err != nil {
245
		c.RespondError(err)
246
		return
247
	}
248
	reporter.ReportWorkerTaskStats(args)
249
	c.Respond()
250
}
251

252
// HTTPMigrateTaskDetail returns migrate task detail.
253
func (svr *Service) HTTPMigrateTaskDetail(c *rpc.Context) {
254
	args := new(api.MigrateTaskDetailArgs)
255
	if err := c.ParseArgs(args); err != nil {
256
		c.RespondError(err)
257
		return
258
	}
259
	if args.ID == "" || !client.ValidMigrateTask(args.Type, args.ID) {
260
		c.RespondError(errcode.ErrIllegalArguments)
261
		return
262
	}
263

264
	querier, err := svr.mgrByType(args.Type)
265
	if err != nil {
266
		c.RespondError(err)
267
		return
268
	}
269
	detail, err := querier.QueryTask(c.Request.Context(), args.ID)
270
	if err != nil {
271
		c.RespondError(rpc.NewError(http.StatusNotFound, "NotFound", err))
272
		return
273
	}
274
	c.RespondJSON(detail)
275
}
276

277
// HTTPDiskMigratingStats returns disk migrating stats
278
func (svr *Service) HTTPDiskMigratingStats(c *rpc.Context) {
279
	args := new(api.DiskMigratingStatsArgs)
280
	if err := c.ParseArgs(args); err != nil {
281
		c.RespondError(err)
282
		return
283
	}
284
	querier, err := svr.diskMgrByType(args.TaskType)
285
	if err != nil {
286
		c.RespondError(err)
287
		return
288
	}
289
	stats, err := querier.DiskProgress(c.Request.Context(), args.DiskID)
290
	if err != nil {
291
		c.RespondError(err)
292
		return
293
	}
294
	c.RespondJSON(stats)
295
}
296

297
// HTTPStats returns service stats
298
func (svr *Service) HTTPStats(c *rpc.Context) {
299
	ctx := c.Request.Context()
300
	taskStats := api.TasksStat{}
301

302
	// delete stats
303
	deleteSuccessCounter, deleteFailedCounter := svr.blobDeleteMgr.GetTaskStats()
304
	delErrStats, delTotalErrCnt := svr.blobDeleteMgr.GetErrorStats()
305
	taskStats.BlobDelete = &api.RunnerStat{
306
		Enable:        svr.blobDeleteMgr.Enabled(),
307
		SuccessPerMin: fmt.Sprint(deleteSuccessCounter),
308
		FailedPerMin:  fmt.Sprint(deleteFailedCounter),
309
		TotalErrCnt:   delTotalErrCnt,
310
		ErrStats:      delErrStats,
311
	}
312

313
	// stats shard repair tasks
314
	repairSuccessCounter, repairFailedCounter := svr.shardRepairMgr.GetTaskStats()
315
	repairErrStats, repairTotalErrCnt := svr.shardRepairMgr.GetErrorStats()
316
	taskStats.ShardRepair = &api.RunnerStat{
317
		Enable:        svr.shardRepairMgr.Enabled(),
318
		SuccessPerMin: fmt.Sprint(repairSuccessCounter),
319
		FailedPerMin:  fmt.Sprint(repairFailedCounter),
320
		TotalErrCnt:   repairTotalErrCnt,
321
		ErrStats:      repairErrStats,
322
	}
323

324
	if !svr.leader {
325
		c.RespondJSON(taskStats)
326
		return
327
	}
328

329
	// stats repair tasks
330
	repairDisks, totalTasksCnt, repairedTasksCnt := svr.diskRepairMgr.Progress(ctx)
331
	taskStats.DiskRepair = &api.DiskRepairTasksStat{
332
		Enable:           svr.diskRepairMgr.Enabled(),
333
		RepairingDisks:   repairDisks,
334
		TotalTasksCnt:    totalTasksCnt,
335
		RepairedTasksCnt: repairedTasksCnt,
336
		MigrateTasksStat: svr.diskRepairMgr.Stats(),
337
	}
338

339
	// stats drop tasks
340
	dropDisks, totalTasksCnt, droppedTasksCnt := svr.diskDropMgr.Progress(ctx)
341
	taskStats.DiskDrop = &api.DiskDropTasksStat{
342
		Enable:           svr.diskDropMgr.Enabled(),
343
		DroppingDisks:    dropDisks,
344
		TotalTasksCnt:    totalTasksCnt,
345
		DroppedTasksCnt:  droppedTasksCnt,
346
		MigrateTasksStat: svr.diskDropMgr.Stats(),
347
	}
348

349
	// stats balance tasks
350
	taskStats.Balance = &api.BalanceTasksStat{
351
		Enable:           svr.balanceMgr.Enabled(),
352
		MigrateTasksStat: svr.balanceMgr.Stats(),
353
	}
354

355
	// stats manual migrate tasks
356
	taskStats.ManualMigrate = &api.ManualMigrateTasksStat{
357
		MigrateTasksStat: svr.manualMigMgr.Stats(),
358
	}
359

360
	// stats inspect tasks
361
	finished, timeout := svr.inspectMgr.GetTaskStats()
362
	taskStats.VolumeInspect = &api.VolumeInspectTasksStat{
363
		Enable:         svr.inspectMgr.Enabled(),
364
		FinishedPerMin: fmt.Sprint(finished),
365
		TimeOutPerMin:  fmt.Sprint(timeout),
366
	}
367

368
	c.RespondJSON(taskStats)
369
}
370

371
// HTTPManualMigrateTaskAdd adds manual migrate task
372
func (svr *Service) HTTPManualMigrateTaskAdd(c *rpc.Context) {
373
	ctx := c.Request.Context()
374

375
	args := new(api.AddManualMigrateArgs)
376
	if err := c.ParseArgs(args); err != nil {
377
		c.RespondError(err)
378
		return
379
	}
380

381
	if !args.Valid() {
382
		c.RespondError(errcode.ErrIllegalArguments)
383
		return
384
	}
385

386
	err := svr.manualMigMgr.AddManualTask(ctx, args.Vuid, !args.DirectDownload)
387
	c.RespondError(rpc.Error2HTTPError(err))
388
}
389

390
// HTTPUpdateVolume updates volume cache
391
func (svr *Service) HTTPUpdateVolume(c *rpc.Context) {
392
	args := new(api.UpdateVolumeArgs)
393
	if err := c.ParseArgs(args); err != nil {
394
		c.RespondError(err)
395
		return
396
	}
397

398
	ctx := c.Request.Context()
399
	span := trace.SpanFromContextSafe(ctx)
400

401
	// update local cache of volume
402
	_, err := svr.clusterTopology.UpdateVolume(args.Vid)
403
	if err != nil {
404
		span.Errorf("local volume cache update failed: vid[%d], err[%+v]", args.Vid, err)
405
		c.RespondError(err)
406
		return
407
	}
408

409
	if !svr.leader {
410
		c.Respond()
411
		return
412
	}
413

414
	tasks := make([]func() error, 0, len(svr.followerHosts))
415
	for _, host := range svr.followerHosts {
416
		host := host
417
		tasks = append(tasks, func() error {
418
			return svr.volumeUpdater.UpdateFollowerVolumeCache(ctx, host, args.Vid)
419
		})
420
	}
421

422
	span.Debug("to update follower volume cache")
423
	if err := task.Run(ctx, tasks...); err != nil {
424
		span.Errorf("notify follower to update cache err[%+v]", err)
425
		c.RespondError(err)
426
		return
427
	}
428
	c.Respond()
429
}
430

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.