Dragonfly2

Форк
0
/
service_v2.go 
1387 строк · 53.1 Кб
1
/*
2
 *     Copyright 2023 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package service
18

19
import (
20
	"context"
21
	"fmt"
22
	"io"
23
	"time"
24

25
	"google.golang.org/grpc/codes"
26
	"google.golang.org/grpc/status"
27
	"google.golang.org/protobuf/proto"
28
	"google.golang.org/protobuf/types/known/durationpb"
29
	"google.golang.org/protobuf/types/known/timestamppb"
30

31
	commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
32
	dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
33
	schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
34

35
	logger "d7y.io/dragonfly/v2/internal/dflog"
36
	"d7y.io/dragonfly/v2/pkg/container/set"
37
	"d7y.io/dragonfly/v2/pkg/digest"
38
	"d7y.io/dragonfly/v2/pkg/net/http"
39
	"d7y.io/dragonfly/v2/pkg/types"
40
	"d7y.io/dragonfly/v2/scheduler/config"
41
	"d7y.io/dragonfly/v2/scheduler/metrics"
42
	"d7y.io/dragonfly/v2/scheduler/networktopology"
43
	"d7y.io/dragonfly/v2/scheduler/resource"
44
	"d7y.io/dragonfly/v2/scheduler/scheduling"
45
	"d7y.io/dragonfly/v2/scheduler/storage"
46
)
47

48
// V2 is the interface for v2 version of the service.
49
type V2 struct {
50
	// Resource interface.
51
	resource resource.Resource
52

53
	// Scheduling interface.
54
	scheduling scheduling.Scheduling
55

56
	// Scheduler service config.
57
	config *config.Config
58

59
	// Dynamic config.
60
	dynconfig config.DynconfigInterface
61

62
	// Storage interface.
63
	storage storage.Storage
64

65
	// Network topology interface.
66
	networkTopology networktopology.NetworkTopology
67
}
68

69
// New v2 version of service instance.
70
func NewV2(
71
	cfg *config.Config,
72
	resource resource.Resource,
73
	scheduling scheduling.Scheduling,
74
	dynconfig config.DynconfigInterface,
75
	storage storage.Storage,
76
	networkTopology networktopology.NetworkTopology,
77
) *V2 {
78
	return &V2{
79
		resource:        resource,
80
		scheduling:      scheduling,
81
		config:          cfg,
82
		dynconfig:       dynconfig,
83
		storage:         storage,
84
		networkTopology: networkTopology,
85
	}
86
}
87

88
// AnnouncePeer announces peer to scheduler.
89
func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error {
90
	ctx, cancel := context.WithCancel(stream.Context())
91
	defer cancel()
92

93
	for {
94
		select {
95
		case <-ctx.Done():
96
			logger.Info("context was done")
97
			return ctx.Err()
98
		default:
99
		}
100

101
		req, err := stream.Recv()
102
		if err != nil {
103
			if err == io.EOF {
104
				return nil
105
			}
106

107
			logger.Errorf("receive error: %s", err.Error())
108
			return err
109
		}
110

111
		log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
112
		switch announcePeerRequest := req.GetRequest().(type) {
113
		case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
114
			registerPeerRequest := announcePeerRequest.RegisterPeerRequest
115
			log.Infof("receive RegisterPeerRequest, url: %s, range: %#v, header: %#v, need back-to-source: %t",
116
				registerPeerRequest.Download.GetUrl(), registerPeerRequest.Download.GetRange(), registerPeerRequest.Download.GetRequestHeader(), registerPeerRequest.Download.GetNeedBackToSource())
117
			if err := v.handleRegisterPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), registerPeerRequest); err != nil {
118
				log.Error(err)
119
				return err
120
			}
121
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
122
			log.Info("receive DownloadPeerStartedRequest")
123
			if err := v.handleDownloadPeerStartedRequest(ctx, req.GetPeerId()); err != nil {
124
				log.Error(err)
125
				return err
126
			}
127
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
128
			log.Info("receive DownloadPeerBackToSourceStartedRequest")
129
			if err := v.handleDownloadPeerBackToSourceStartedRequest(ctx, req.GetPeerId()); err != nil {
130
				log.Error(err)
131
				return err
132
			}
133
		case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:
134
			rescheduleRequest := announcePeerRequest.RescheduleRequest
135

136
			log.Infof("receive RescheduleRequest description: %s", rescheduleRequest.GetDescription())
137
			if err := v.handleRescheduleRequest(ctx, req.GetPeerId(), rescheduleRequest.GetCandidateParents()); err != nil {
138
				log.Error(err)
139
				return err
140
			}
141
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
142
			downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
143
			log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
144
			if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
145
				log.Error(err)
146
				return err
147
			}
148
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
149
			downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
150
			log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
151
			if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
152
				log.Error(err)
153
				return err
154
			}
155
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
156
			log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
157
			if err := v.handleDownloadPeerFailedRequest(ctx, req.GetPeerId()); err != nil {
158
				log.Error(err)
159
				return err
160
			}
161
		case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
162
			log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
163
			if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.GetPeerId()); err != nil {
164
				log.Error(err)
165
				return err
166
			}
167
		case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
168
			piece := announcePeerRequest.DownloadPieceFinishedRequest.Piece
169
			log.Infof("receive DownloadPieceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
170
			if err := v.handleDownloadPieceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFinishedRequest); err != nil {
171
				log.Error(err)
172
				return err
173
			}
174
		case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
175
			piece := announcePeerRequest.DownloadPieceBackToSourceFinishedRequest.Piece
176
			log.Infof("receive DownloadPieceBackToSourceFinishedRequest, piece number: %d, piece length: %d, traffic type: %s, cost: %s, parent id: %s", piece.GetNumber(), piece.GetLength(), piece.GetTrafficType(), piece.GetCost().AsDuration().String(), piece.GetParentId())
177
			if err := v.handleDownloadPieceBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFinishedRequest); err != nil {
178
				log.Error(err)
179
				return err
180
			}
181
		case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
182
			downloadPieceFailedRequest := announcePeerRequest.DownloadPieceFailedRequest
183
			log.Infof("receive DownloadPieceFailedRequest, piece number: %d, temporary: %t, parent id: %s", downloadPieceFailedRequest.GetPieceNumber(), downloadPieceFailedRequest.GetTemporary(), downloadPieceFailedRequest.GetParentId())
184
			if err := v.handleDownloadPieceFailedRequest(ctx, req.GetPeerId(), downloadPieceFailedRequest); err != nil {
185
				log.Error(err)
186
				return err
187
			}
188
		case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
189
			downloadPieceBackToSourceFailedRequest := announcePeerRequest.DownloadPieceBackToSourceFailedRequest
190
			log.Infof("receive DownloadPieceBackToSourceFailedRequest, piece number: %d", downloadPieceBackToSourceFailedRequest.GetPieceNumber())
191
			if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.GetPeerId(), downloadPieceBackToSourceFailedRequest); err != nil {
192
				log.Error(err)
193
				return err
194
			}
195
		default:
196
			msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest)
197
			log.Error(msg)
198
			return status.Error(codes.FailedPrecondition, msg)
199
		}
200
	}
201
}
202

203
// StatPeer checks information of peer.
204
func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
205
	logger.WithTaskID(req.GetTaskId()).Infof("stat peer request: %#v", req)
206

207
	peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
208
	if !loaded {
209
		return nil, status.Errorf(codes.NotFound, "peer %s not found", req.GetPeerId())
210
	}
211

212
	resp := &commonv2.Peer{
213
		Id:               peer.ID,
214
		Priority:         peer.Priority,
215
		Cost:             durationpb.New(peer.Cost.Load()),
216
		State:            peer.FSM.Current(),
217
		NeedBackToSource: peer.NeedBackToSource.Load(),
218
		CreatedAt:        timestamppb.New(peer.CreatedAt.Load()),
219
		UpdatedAt:        timestamppb.New(peer.UpdatedAt.Load()),
220
	}
221

222
	// Set range to response.
223
	if peer.Range != nil {
224
		resp.Range = &commonv2.Range{
225
			Start:  uint64(peer.Range.Start),
226
			Length: uint64(peer.Range.Length),
227
		}
228
	}
229

230
	// Set pieces to response.
231
	peer.Pieces.Range(func(key, value any) bool {
232
		piece, ok := value.(*resource.Piece)
233
		if !ok {
234
			peer.Log.Errorf("invalid piece %s %#v", key, value)
235
			return true
236
		}
237

238
		respPiece := &commonv2.Piece{
239
			Number:      uint32(piece.Number),
240
			ParentId:    &piece.ParentID,
241
			Offset:      piece.Offset,
242
			Length:      piece.Length,
243
			TrafficType: &piece.TrafficType,
244
			Cost:        durationpb.New(piece.Cost),
245
			CreatedAt:   timestamppb.New(piece.CreatedAt),
246
		}
247

248
		if piece.Digest != nil {
249
			respPiece.Digest = piece.Digest.String()
250
		}
251

252
		resp.Pieces = append(resp.Pieces, respPiece)
253
		return true
254
	})
255

256
	// Set task to response.
257
	resp.Task = &commonv2.Task{
258
		Id:                  peer.Task.ID,
259
		Type:                peer.Task.Type,
260
		Url:                 peer.Task.URL,
261
		Tag:                 &peer.Task.Tag,
262
		Application:         &peer.Task.Application,
263
		FilteredQueryParams: peer.Task.FilteredQueryParams,
264
		RequestHeader:       peer.Task.Header,
265
		PieceLength:         uint32(peer.Task.PieceLength),
266
		ContentLength:       uint64(peer.Task.ContentLength.Load()),
267
		PieceCount:          uint32(peer.Task.TotalPieceCount.Load()),
268
		SizeScope:           peer.Task.SizeScope(),
269
		State:               peer.Task.FSM.Current(),
270
		PeerCount:           uint32(peer.Task.PeerCount()),
271
		CreatedAt:           timestamppb.New(peer.Task.CreatedAt.Load()),
272
		UpdatedAt:           timestamppb.New(peer.Task.UpdatedAt.Load()),
273
	}
274

275
	// Set digest to task response.
276
	if peer.Task.Digest != nil {
277
		dgst := peer.Task.Digest.String()
278
		resp.Task.Digest = &dgst
279
	}
280

281
	// Set pieces to task response.
282
	peer.Task.Pieces.Range(func(key, value any) bool {
283
		piece, ok := value.(*resource.Piece)
284
		if !ok {
285
			peer.Task.Log.Errorf("invalid piece %s %#v", key, value)
286
			return true
287
		}
288

289
		respPiece := &commonv2.Piece{
290
			Number:      uint32(piece.Number),
291
			ParentId:    &piece.ParentID,
292
			Offset:      piece.Offset,
293
			Length:      piece.Length,
294
			TrafficType: &piece.TrafficType,
295
			Cost:        durationpb.New(piece.Cost),
296
			CreatedAt:   timestamppb.New(piece.CreatedAt),
297
		}
298

299
		if piece.Digest != nil {
300
			respPiece.Digest = piece.Digest.String()
301
		}
302

303
		resp.Task.Pieces = append(resp.Task.Pieces, respPiece)
304
		return true
305
	})
306

307
	// Set host to response.
308
	resp.Host = &commonv2.Host{
309
		Id:              peer.Host.ID,
310
		Type:            uint32(peer.Host.Type),
311
		Hostname:        peer.Host.Hostname,
312
		Ip:              peer.Host.IP,
313
		Port:            peer.Host.Port,
314
		DownloadPort:    peer.Host.DownloadPort,
315
		Os:              peer.Host.OS,
316
		Platform:        peer.Host.Platform,
317
		PlatformFamily:  peer.Host.PlatformFamily,
318
		PlatformVersion: peer.Host.PlatformVersion,
319
		KernelVersion:   peer.Host.KernelVersion,
320
		Cpu: &commonv2.CPU{
321
			LogicalCount:   peer.Host.CPU.LogicalCount,
322
			PhysicalCount:  peer.Host.CPU.PhysicalCount,
323
			Percent:        peer.Host.CPU.Percent,
324
			ProcessPercent: peer.Host.CPU.ProcessPercent,
325
			Times: &commonv2.CPUTimes{
326
				User:      peer.Host.CPU.Times.User,
327
				System:    peer.Host.CPU.Times.System,
328
				Idle:      peer.Host.CPU.Times.Idle,
329
				Nice:      peer.Host.CPU.Times.Nice,
330
				Iowait:    peer.Host.CPU.Times.Iowait,
331
				Irq:       peer.Host.CPU.Times.Irq,
332
				Softirq:   peer.Host.CPU.Times.Softirq,
333
				Steal:     peer.Host.CPU.Times.Steal,
334
				Guest:     peer.Host.CPU.Times.Guest,
335
				GuestNice: peer.Host.CPU.Times.GuestNice,
336
			},
337
		},
338
		Memory: &commonv2.Memory{
339
			Total:              peer.Host.Memory.Total,
340
			Available:          peer.Host.Memory.Available,
341
			Used:               peer.Host.Memory.Used,
342
			UsedPercent:        peer.Host.Memory.UsedPercent,
343
			ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
344
			Free:               peer.Host.Memory.Free,
345
		},
346
		Network: &commonv2.Network{
347
			TcpConnectionCount:       peer.Host.Network.TCPConnectionCount,
348
			UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
349
			Location:                 &peer.Host.Network.Location,
350
			Idc:                      &peer.Host.Network.IDC,
351
		},
352
		Disk: &commonv2.Disk{
353
			Total:             peer.Host.Disk.Total,
354
			Free:              peer.Host.Disk.Free,
355
			Used:              peer.Host.Disk.Used,
356
			UsedPercent:       peer.Host.Disk.UsedPercent,
357
			InodesTotal:       peer.Host.Disk.InodesTotal,
358
			InodesUsed:        peer.Host.Disk.InodesUsed,
359
			InodesFree:        peer.Host.Disk.InodesFree,
360
			InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
361
		},
362
		Build: &commonv2.Build{
363
			GitVersion: peer.Host.Build.GitVersion,
364
			GitCommit:  &peer.Host.Build.GitCommit,
365
			GoVersion:  &peer.Host.Build.GoVersion,
366
			Platform:   &peer.Host.Build.Platform,
367
		},
368
	}
369

370
	return resp, nil
371
}
372

373
// LeavePeer releases peer in scheduler.
374
func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error {
375
	log := logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId())
376
	log.Infof("leave peer request: %#v", req)
377

378
	peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
379
	if !loaded {
380
		msg := fmt.Sprintf("peer %s not found", req.GetPeerId())
381
		log.Error(msg)
382
		return status.Error(codes.NotFound, msg)
383
	}
384

385
	if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil {
386
		msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
387
		peer.Log.Error(msg)
388
		return status.Error(codes.FailedPrecondition, msg)
389
	}
390

391
	return nil
392
}
393

394
// TODO Implement function.
395
// ExchangePeer exchanges peer information.
396
func (v *V2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest) (*schedulerv2.ExchangePeerResponse, error) {
397
	return nil, nil
398
}
399

400
// StatTask checks information of task.
401
func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) {
402
	log := logger.WithTaskID(req.GetId())
403
	log.Infof("stat task request: %#v", req)
404

405
	task, loaded := v.resource.TaskManager().Load(req.GetId())
406
	if !loaded {
407
		msg := fmt.Sprintf("task %s not found", req.GetId())
408
		log.Error(msg)
409
		return nil, status.Error(codes.NotFound, msg)
410
	}
411

412
	resp := &commonv2.Task{
413
		Id:                  task.ID,
414
		Type:                task.Type,
415
		Url:                 task.URL,
416
		Tag:                 &task.Tag,
417
		Application:         &task.Application,
418
		FilteredQueryParams: task.FilteredQueryParams,
419
		RequestHeader:       task.Header,
420
		PieceLength:         uint32(task.PieceLength),
421
		ContentLength:       uint64(task.ContentLength.Load()),
422
		PieceCount:          uint32(task.TotalPieceCount.Load()),
423
		SizeScope:           task.SizeScope(),
424
		State:               task.FSM.Current(),
425
		PeerCount:           uint32(task.PeerCount()),
426
		CreatedAt:           timestamppb.New(task.CreatedAt.Load()),
427
		UpdatedAt:           timestamppb.New(task.UpdatedAt.Load()),
428
	}
429

430
	// Set digest to response.
431
	if task.Digest != nil {
432
		dgst := task.Digest.String()
433
		resp.Digest = &dgst
434
	}
435

436
	// Set pieces to response.
437
	task.Pieces.Range(func(key, value any) bool {
438
		piece, ok := value.(*resource.Piece)
439
		if !ok {
440
			task.Log.Errorf("invalid piece %s %#v", key, value)
441
			return true
442
		}
443

444
		respPiece := &commonv2.Piece{
445
			Number:      uint32(piece.Number),
446
			ParentId:    &piece.ParentID,
447
			Offset:      piece.Offset,
448
			Length:      piece.Length,
449
			TrafficType: &piece.TrafficType,
450
			Cost:        durationpb.New(piece.Cost),
451
			CreatedAt:   timestamppb.New(piece.CreatedAt),
452
		}
453

454
		if piece.Digest != nil {
455
			respPiece.Digest = piece.Digest.String()
456
		}
457

458
		resp.Pieces = append(resp.Pieces, respPiece)
459
		return true
460
	})
461

462
	return resp, nil
463
}
464

465
// AnnounceHost announces host to scheduler.
466
func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) error {
467
	logger.WithHostID(req.Host.GetId()).Infof("announce host request: %#v", req.GetHost())
468

469
	// Get scheduler cluster client config by manager.
470
	var concurrentUploadLimit int32
471
	if clientConfig, err := v.dynconfig.GetSchedulerClusterClientConfig(); err == nil {
472
		concurrentUploadLimit = int32(clientConfig.LoadLimit)
473
	}
474

475
	host, loaded := v.resource.HostManager().Load(req.Host.GetId())
476
	if !loaded {
477
		options := []resource.HostOption{
478
			resource.WithOS(req.Host.GetOs()),
479
			resource.WithPlatform(req.Host.GetPlatform()),
480
			resource.WithPlatformFamily(req.Host.GetPlatformFamily()),
481
			resource.WithPlatformVersion(req.Host.GetPlatformVersion()),
482
			resource.WithKernelVersion(req.Host.GetKernelVersion()),
483
		}
484

485
		if concurrentUploadLimit > 0 {
486
			options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit))
487
		}
488

489
		if req.Host.GetCpu() != nil {
490
			options = append(options, resource.WithCPU(resource.CPU{
491
				LogicalCount:   req.Host.Cpu.GetLogicalCount(),
492
				PhysicalCount:  req.Host.Cpu.GetPhysicalCount(),
493
				Percent:        req.Host.Cpu.GetPercent(),
494
				ProcessPercent: req.Host.Cpu.GetProcessPercent(),
495
				Times: resource.CPUTimes{
496
					User:      req.Host.Cpu.Times.GetUser(),
497
					System:    req.Host.Cpu.Times.GetSystem(),
498
					Idle:      req.Host.Cpu.Times.GetIdle(),
499
					Nice:      req.Host.Cpu.Times.GetNice(),
500
					Iowait:    req.Host.Cpu.Times.GetIowait(),
501
					Irq:       req.Host.Cpu.Times.GetIrq(),
502
					Softirq:   req.Host.Cpu.Times.GetSoftirq(),
503
					Steal:     req.Host.Cpu.Times.GetSteal(),
504
					Guest:     req.Host.Cpu.Times.GetGuest(),
505
					GuestNice: req.Host.Cpu.Times.GetGuest(),
506
				},
507
			}))
508
		}
509

510
		if req.Host.GetMemory() != nil {
511
			options = append(options, resource.WithMemory(resource.Memory{
512
				Total:              req.Host.Memory.GetTotal(),
513
				Available:          req.Host.Memory.GetAvailable(),
514
				Used:               req.Host.Memory.GetUsed(),
515
				UsedPercent:        req.Host.Memory.GetUsedPercent(),
516
				ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),
517
				Free:               req.Host.Memory.GetFree(),
518
			}))
519
		}
520

521
		if req.Host.GetNetwork() != nil {
522
			options = append(options, resource.WithNetwork(resource.Network{
523
				TCPConnectionCount:       req.Host.Network.GetTcpConnectionCount(),
524
				UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
525
				Location:                 req.Host.Network.GetLocation(),
526
				IDC:                      req.Host.Network.GetIdc(),
527
			}))
528
		}
529

530
		if req.Host.GetDisk() != nil {
531
			options = append(options, resource.WithDisk(resource.Disk{
532
				Total:             req.Host.Disk.GetTotal(),
533
				Free:              req.Host.Disk.GetFree(),
534
				Used:              req.Host.Disk.GetUsed(),
535
				UsedPercent:       req.Host.Disk.GetUsedPercent(),
536
				InodesTotal:       req.Host.Disk.GetInodesTotal(),
537
				InodesUsed:        req.Host.Disk.GetInodesUsed(),
538
				InodesFree:        req.Host.Disk.GetInodesFree(),
539
				InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
540
			}))
541
		}
542

543
		if req.Host.GetBuild() != nil {
544
			options = append(options, resource.WithBuild(resource.Build{
545
				GitVersion: req.Host.Build.GetGitVersion(),
546
				GitCommit:  req.Host.Build.GetGitCommit(),
547
				GoVersion:  req.Host.Build.GetGoVersion(),
548
				Platform:   req.Host.Build.GetPlatform(),
549
			}))
550
		}
551

552
		if req.Host.GetSchedulerClusterId() != 0 {
553
			options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
554
		}
555

556
		host = resource.NewHost(
557
			req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
558
			req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),
559
			options...,
560
		)
561

562
		v.resource.HostManager().Store(host)
563
		host.Log.Infof("announce new host: %#v", req)
564
		return nil
565
	}
566

567
	// Host already exists and updates properties.
568
	host.Port = req.Host.GetPort()
569
	host.DownloadPort = req.Host.GetDownloadPort()
570
	host.Type = types.HostType(req.Host.GetType())
571
	host.OS = req.Host.GetOs()
572
	host.Platform = req.Host.GetPlatform()
573
	host.PlatformFamily = req.Host.GetPlatformFamily()
574
	host.PlatformVersion = req.Host.GetPlatformVersion()
575
	host.KernelVersion = req.Host.GetKernelVersion()
576
	host.UpdatedAt.Store(time.Now())
577

578
	if concurrentUploadLimit > 0 {
579
		host.ConcurrentUploadLimit.Store(concurrentUploadLimit)
580
	}
581

582
	if req.Host.GetCpu() != nil {
583
		host.CPU = resource.CPU{
584
			LogicalCount:   req.Host.Cpu.GetLogicalCount(),
585
			PhysicalCount:  req.Host.Cpu.GetPhysicalCount(),
586
			Percent:        req.Host.Cpu.GetPercent(),
587
			ProcessPercent: req.Host.Cpu.GetProcessPercent(),
588
			Times: resource.CPUTimes{
589
				User:      req.Host.Cpu.Times.GetUser(),
590
				System:    req.Host.Cpu.Times.GetSystem(),
591
				Idle:      req.Host.Cpu.Times.GetIdle(),
592
				Nice:      req.Host.Cpu.Times.GetNice(),
593
				Iowait:    req.Host.Cpu.Times.GetIowait(),
594
				Irq:       req.Host.Cpu.Times.GetIrq(),
595
				Softirq:   req.Host.Cpu.Times.GetSoftirq(),
596
				Steal:     req.Host.Cpu.Times.GetSteal(),
597
				Guest:     req.Host.Cpu.Times.GetGuest(),
598
				GuestNice: req.Host.Cpu.Times.GetGuestNice(),
599
			},
600
		}
601
	}
602

603
	if req.Host.GetMemory() != nil {
604
		host.Memory = resource.Memory{
605
			Total:              req.Host.Memory.GetTotal(),
606
			Available:          req.Host.Memory.GetAvailable(),
607
			Used:               req.Host.Memory.GetUsed(),
608
			UsedPercent:        req.Host.Memory.GetUsedPercent(),
609
			ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),
610
			Free:               req.Host.Memory.GetFree(),
611
		}
612
	}
613

614
	if req.Host.GetNetwork() != nil {
615
		host.Network = resource.Network{
616
			TCPConnectionCount:       req.Host.Network.GetTcpConnectionCount(),
617
			UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
618
			Location:                 req.Host.Network.GetLocation(),
619
			IDC:                      req.Host.Network.GetIdc(),
620
		}
621
	}
622

623
	if req.Host.GetDisk() != nil {
624
		host.Disk = resource.Disk{
625
			Total:             req.Host.Disk.GetTotal(),
626
			Free:              req.Host.Disk.GetFree(),
627
			Used:              req.Host.Disk.GetUsed(),
628
			UsedPercent:       req.Host.Disk.GetUsedPercent(),
629
			InodesTotal:       req.Host.Disk.GetInodesTotal(),
630
			InodesUsed:        req.Host.Disk.GetInodesUsed(),
631
			InodesFree:        req.Host.Disk.GetInodesFree(),
632
			InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
633
		}
634
	}
635

636
	if req.Host.GetBuild() != nil {
637
		host.Build = resource.Build{
638
			GitVersion: req.Host.Build.GetGitVersion(),
639
			GitCommit:  req.Host.Build.GetGitCommit(),
640
			GoVersion:  req.Host.Build.GetGoVersion(),
641
			Platform:   req.Host.Build.GetPlatform(),
642
		}
643
	}
644

645
	return nil
646
}
647

648
// LeaveHost releases host in scheduler.
649
func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) error {
650
	log := logger.WithHostID(req.GetId())
651
	log.Infof("leave host request: %#v", req)
652

653
	host, loaded := v.resource.HostManager().Load(req.GetId())
654
	if !loaded {
655
		msg := fmt.Sprintf("host %s not found", req.GetId())
656
		log.Error(msg)
657
		return status.Error(codes.NotFound, msg)
658
	}
659

660
	// Leave peers in host.
661
	host.LeavePeers()
662

663
	// Delete host from network topology.
664
	if v.networkTopology != nil {
665
		if err := v.networkTopology.DeleteHost(host.ID); err != nil {
666
			log.Errorf("delete network topology host error: %s", err.Error())
667
			return err
668
		}
669
	}
670

671
	return nil
672
}
673

674
// SyncProbes sync probes of the host.
675
func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
676
	if v.networkTopology == nil {
677
		return status.Errorf(codes.Unimplemented, "network topology is not enabled")
678
	}
679

680
	for {
681
		req, err := stream.Recv()
682
		if err != nil {
683
			if err == io.EOF {
684
				return nil
685
			}
686

687
			logger.Errorf("receive error: %s", err.Error())
688
			return err
689
		}
690

691
		log := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp())
692
		switch syncProbesRequest := req.GetRequest().(type) {
693
		case *schedulerv2.SyncProbesRequest_ProbeStartedRequest:
694
			// Find probed hosts in network topology. Based on the source host information,
695
			// the most candidate hosts will be evaluated.
696
			log.Info("receive SyncProbesRequest_ProbeStartedRequest")
697
			hosts, err := v.networkTopology.FindProbedHosts(req.Host.GetId())
698
			if err != nil {
699
				log.Error(err)
700
				return status.Error(codes.FailedPrecondition, err.Error())
701
			}
702

703
			var probedHosts []*commonv2.Host
704
			for _, host := range hosts {
705
				probedHosts = append(probedHosts, &commonv2.Host{
706
					Id:              host.ID,
707
					Type:            uint32(host.Type),
708
					Hostname:        host.Hostname,
709
					Ip:              host.IP,
710
					Port:            host.Port,
711
					DownloadPort:    host.DownloadPort,
712
					Os:              host.OS,
713
					Platform:        host.Platform,
714
					PlatformFamily:  host.PlatformFamily,
715
					PlatformVersion: host.PlatformVersion,
716
					KernelVersion:   host.KernelVersion,
717
					Cpu: &commonv2.CPU{
718
						LogicalCount:   host.CPU.LogicalCount,
719
						PhysicalCount:  host.CPU.PhysicalCount,
720
						Percent:        host.CPU.Percent,
721
						ProcessPercent: host.CPU.ProcessPercent,
722
						Times: &commonv2.CPUTimes{
723
							User:      host.CPU.Times.User,
724
							System:    host.CPU.Times.System,
725
							Idle:      host.CPU.Times.Idle,
726
							Nice:      host.CPU.Times.Nice,
727
							Iowait:    host.CPU.Times.Iowait,
728
							Irq:       host.CPU.Times.Irq,
729
							Softirq:   host.CPU.Times.Softirq,
730
							Steal:     host.CPU.Times.Steal,
731
							Guest:     host.CPU.Times.Guest,
732
							GuestNice: host.CPU.Times.GuestNice,
733
						},
734
					},
735
					Memory: &commonv2.Memory{
736
						Total:              host.Memory.Total,
737
						Available:          host.Memory.Available,
738
						Used:               host.Memory.Used,
739
						UsedPercent:        host.Memory.UsedPercent,
740
						ProcessUsedPercent: host.Memory.ProcessUsedPercent,
741
						Free:               host.Memory.Free,
742
					},
743
					Network: &commonv2.Network{
744
						TcpConnectionCount:       host.Network.TCPConnectionCount,
745
						UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
746
						Location:                 &host.Network.Location,
747
						Idc:                      &host.Network.IDC,
748
					},
749
					Disk: &commonv2.Disk{
750
						Total:             host.Disk.Total,
751
						Free:              host.Disk.Free,
752
						Used:              host.Disk.Used,
753
						UsedPercent:       host.Disk.UsedPercent,
754
						InodesTotal:       host.Disk.InodesTotal,
755
						InodesUsed:        host.Disk.InodesUsed,
756
						InodesFree:        host.Disk.InodesFree,
757
						InodesUsedPercent: host.Disk.InodesUsedPercent,
758
					},
759
					Build: &commonv2.Build{
760
						GitVersion: host.Build.GitVersion,
761
						GitCommit:  &host.Build.GitCommit,
762
						GoVersion:  &host.Build.GoVersion,
763
						Platform:   &host.Build.Platform,
764
					},
765
				})
766
			}
767

768
			log.Infof("probe started: %#v", probedHosts)
769
			if err := stream.Send(&schedulerv2.SyncProbesResponse{
770
				Hosts: probedHosts,
771
			}); err != nil {
772
				log.Error(err)
773
				return err
774
			}
775
		case *schedulerv2.SyncProbesRequest_ProbeFinishedRequest:
776
			// Store probes in network topology. First create the association between
777
			// source host and destination host, and then store the value of probe.
778
			log.Info("receive SyncProbesRequest_ProbeFinishedRequest")
779
			for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes {
780
				probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id)
781
				if !loaded {
782
					log.Errorf("host %s not found", probe.Host.Id)
783
					continue
784
				}
785

786
				if err := v.networkTopology.Store(req.Host.GetId(), probedHost.ID); err != nil {
787
					log.Errorf("store failed: %s", err.Error())
788
					continue
789
				}
790

791
				if err := v.networkTopology.Probes(req.Host.GetId(), probe.Host.Id).Enqueue(&networktopology.Probe{
792
					Host:      probedHost,
793
					RTT:       probe.Rtt.AsDuration(),
794
					CreatedAt: probe.CreatedAt.AsTime(),
795
				}); err != nil {
796
					log.Errorf("enqueue failed: %s", err.Error())
797
					continue
798
				}
799

800
				log.Infof("probe finished: %#v", probe)
801
			}
802
		case *schedulerv2.SyncProbesRequest_ProbeFailedRequest:
803
			// Log failed probes.
804
			log.Info("receive SyncProbesRequest_ProbeFailedRequest")
805
			var failedProbedHostIDs []string
806
			for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes {
807
				failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id)
808
			}
809

810
			log.Warnf("probe failed: %#v", failedProbedHostIDs)
811
		default:
812
			msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest)
813
			log.Error(msg)
814
			return status.Error(codes.FailedPrecondition, msg)
815
		}
816
	}
817
}
818

819
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
820
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
821
	// Handle resource included host, task, and peer.
822
	_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
823
	if err != nil {
824
		return err
825
	}
826

827
	// Collect RegisterPeerCount metrics.
828
	priority := peer.CalculatePriority(v.dynconfig)
829
	metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
830
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
831

832
	blocklist := set.NewSafeSet[string]()
833
	blocklist.Add(peer.ID)
834
	download := proto.Clone(req.Download).(*commonv2.Download)
835
	switch {
836
	// If scheduler trigger seed peer download back-to-source,
837
	// the needBackToSource flag should be true.
838
	case download.GetNeedBackToSource():
839
		peer.Log.Infof("peer need back to source")
840
		peer.NeedBackToSource.Store(true)
841
	// If task is pending, failed, leave, or succeeded and has no available peer,
842
	// scheduler trigger seed peer download back-to-source.
843
	case task.FSM.Is(resource.TaskStatePending) ||
844
		task.FSM.Is(resource.TaskStateFailed) ||
845
		task.FSM.Is(resource.TaskStateLeave) ||
846
		task.FSM.Is(resource.TaskStateSucceeded) &&
847
			!task.HasAvailablePeer(blocklist):
848
		// If trigger the seed peer download back-to-source,
849
		// the need back-to-source flag should be true.
850
		download.NeedBackToSource = true
851

852
		// Output path should be empty, prevent the seed peer
853
		// copy file to output path.
854
		download.OutputPath = nil
855
		if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
856
			// Collect RegisterPeerFailureCount metrics.
857
			metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
858
				peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
859
			return err
860
		}
861
	}
862

863
	// Handle task with peer register request.
864
	if !peer.Task.FSM.Is(resource.TaskStateRunning) {
865
		if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
866
			// Collect RegisterPeerFailureCount metrics.
867
			metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
868
				peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
869
			return status.Error(codes.Internal, err.Error())
870
		}
871
	} else {
872
		peer.Task.UpdatedAt.Store(time.Now())
873
	}
874

875
	// FSM event state transition by size scope.
876
	sizeScope := peer.Task.SizeScope()
877
	switch sizeScope {
878
	case commonv2.SizeScope_EMPTY:
879
		// Return an EmptyTaskResponse directly.
880
		peer.Log.Info("scheduling as SizeScope_EMPTY")
881
		stream, loaded := peer.LoadAnnouncePeerStream()
882
		if !loaded {
883
			return status.Error(codes.NotFound, "AnnouncePeerStream not found")
884
		}
885

886
		if err := peer.FSM.Event(ctx, resource.PeerEventRegisterEmpty); err != nil {
887
			return status.Errorf(codes.Internal, err.Error())
888
		}
889

890
		if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
891
			Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{
892
				EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},
893
			},
894
		}); err != nil {
895
			peer.Log.Error(err)
896
			return status.Error(codes.Internal, err.Error())
897
		}
898

899
		return nil
900
	case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW:
901
		peer.Log.Info("scheduling as SizeScope_NORMAL")
902
		if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil {
903
			return status.Error(codes.Internal, err.Error())
904
		}
905

906
		// Scheduling parent for the peer.
907
		peer.BlockParents.Add(peer.ID)
908
		if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
909
			// Collect RegisterPeerFailureCount metrics.
910
			metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
911
				peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
912
			return status.Error(codes.FailedPrecondition, err.Error())
913
		}
914

915
		return nil
916
	default:
917
		return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)
918
	}
919
}
920

921
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
922
func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string) error {
923
	peer, loaded := v.resource.PeerManager().Load(peerID)
924
	if !loaded {
925
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
926
	}
927

928
	// Collect DownloadPeerStartedCount metrics.
929
	priority := peer.CalculatePriority(v.dynconfig)
930
	metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
931
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
932

933
	// Handle peer with peer started request.
934
	if !peer.FSM.Is(resource.PeerStateRunning) {
935
		if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
936
			// Collect DownloadPeerStartedFailureCount metrics.
937
			metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
938
				peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
939
			return status.Error(codes.Internal, err.Error())
940
		}
941
	}
942

943
	return nil
944
}
945

946
// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest.
947
func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, peerID string) error {
948
	peer, loaded := v.resource.PeerManager().Load(peerID)
949
	if !loaded {
950
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
951
	}
952

953
	// Collect DownloadPeerBackToSourceStartedCount metrics.
954
	priority := peer.CalculatePriority(v.dynconfig)
955
	metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
956
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
957

958
	// Handle peer with peer back-to-source started request.
959
	if !peer.FSM.Is(resource.PeerStateRunning) {
960
		if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
961
			// Collect DownloadPeerBackToSourceStartedFailureCount metrics.
962
			metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
963
				peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
964
			return status.Error(codes.Internal, err.Error())
965
		}
966
	}
967

968
	return nil
969
}
970

971
// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
972
func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string, candidateParents []*commonv2.Peer) error {
973
	peer, loaded := v.resource.PeerManager().Load(peerID)
974
	if !loaded {
975
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
976
	}
977

978
	// Add candidate parent ids to block parents.
979
	for _, candidateParent := range candidateParents {
980
		peer.BlockParents.Add(candidateParent.GetId())
981
	}
982

983
	if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
984
		return status.Error(codes.FailedPrecondition, err.Error())
985
	}
986

987
	return nil
988
}
989

990
// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
991
func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error {
992
	peer, loaded := v.resource.PeerManager().Load(peerID)
993
	if !loaded {
994
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
995
	}
996

997
	// Handle peer with peer finished request.
998
	peer.Cost.Store(time.Since(peer.CreatedAt.Load()))
999
	if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {
1000
		return status.Error(codes.Internal, err.Error())
1001
	}
1002

1003
	// Collect DownloadPeerCount and DownloadPeerDuration metrics.
1004
	priority := peer.CalculatePriority(v.dynconfig)
1005
	metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1006
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1007
	// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead
1008
	metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load()))
1009

1010
	return nil
1011
}
1012

1013
// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
1014
func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) error {
1015
	peer, loaded := v.resource.PeerManager().Load(peerID)
1016
	if !loaded {
1017
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1018
	}
1019

1020
	// Handle peer with peer back-to-source finished request.
1021
	peer.Cost.Store(time.Since(peer.CreatedAt.Load()))
1022
	if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {
1023
		return status.Error(codes.Internal, err.Error())
1024
	}
1025

1026
	// Handle task with peer back-to-source finished request, peer can only represent
1027
	// a successful task after downloading the complete task.
1028
	if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
1029
		peer.Task.ContentLength.Store(int64(req.GetContentLength()))
1030
		peer.Task.TotalPieceCount.Store(int32(req.GetPieceCount()))
1031
		if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
1032
			return status.Error(codes.Internal, err.Error())
1033
		}
1034
	}
1035

1036
	// Collect DownloadPeerCount and DownloadPeerDuration metrics.
1037
	priority := peer.CalculatePriority(v.dynconfig)
1038
	metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1039
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1040
	// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead
1041
	metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load()))
1042

1043
	return nil
1044
}
1045

1046
// handleDownloadPeerFailedRequest handles DownloadPeerFailedRequest of AnnouncePeerRequest.
1047
func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string) error {
1048
	peer, loaded := v.resource.PeerManager().Load(peerID)
1049
	if !loaded {
1050
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1051
	}
1052

1053
	// Handle peer with peer failed request.
1054
	if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil {
1055
		return status.Error(codes.Internal, err.Error())
1056
	}
1057

1058
	// Handle task with peer failed request.
1059
	peer.Task.UpdatedAt.Store(time.Now())
1060

1061
	// Collect DownloadPeerCount and DownloadPeerFailureCount metrics.
1062
	priority := peer.CalculatePriority(v.dynconfig)
1063
	metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1064
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1065
	metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1066
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1067

1068
	return nil
1069
}
1070

1071
// handleDownloadPeerBackToSourceFailedRequest handles DownloadPeerBackToSourceFailedRequest of AnnouncePeerRequest.
1072
func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, peerID string) error {
1073
	peer, loaded := v.resource.PeerManager().Load(peerID)
1074
	if !loaded {
1075
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1076
	}
1077

1078
	// Handle peer with peer back-to-source failed request.
1079
	if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil {
1080
		return status.Error(codes.Internal, err.Error())
1081
	}
1082

1083
	// Handle task with peer back-to-source failed request.
1084
	peer.Task.ContentLength.Store(-1)
1085
	peer.Task.TotalPieceCount.Store(0)
1086
	peer.Task.DirectPiece = []byte{}
1087
	if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadFailed); err != nil {
1088
		return status.Error(codes.Internal, err.Error())
1089
	}
1090

1091
	// Collect DownloadPeerCount and DownloadPeerBackToSourceFailureCount metrics.
1092
	priority := peer.CalculatePriority(v.dynconfig)
1093
	metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1094
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1095
	metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
1096
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1097

1098
	return nil
1099
}
1100

1101
// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
1102
func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error {
1103
	// Construct piece.
1104
	piece := &resource.Piece{
1105
		Number:      int32(req.Piece.GetNumber()),
1106
		ParentID:    req.Piece.GetParentId(),
1107
		Offset:      req.Piece.GetOffset(),
1108
		Length:      req.Piece.GetLength(),
1109
		TrafficType: req.Piece.GetTrafficType(),
1110
		Cost:        req.Piece.GetCost().AsDuration(),
1111
		CreatedAt:   req.Piece.GetCreatedAt().AsTime(),
1112
	}
1113

1114
	if len(req.Piece.GetDigest()) > 0 {
1115
		d, err := digest.Parse(req.Piece.GetDigest())
1116
		if err != nil {
1117
			return status.Errorf(codes.InvalidArgument, err.Error())
1118
		}
1119

1120
		piece.Digest = d
1121
	}
1122

1123
	peer, loaded := v.resource.PeerManager().Load(peerID)
1124
	if !loaded {
1125
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1126
	}
1127

1128
	// Handle peer with piece finished request. When the piece is downloaded successfully, peer.UpdatedAt needs
1129
	// to be updated to prevent the peer from being GC during the download process.
1130
	peer.StorePiece(piece)
1131
	peer.FinishedPieces.Set(uint(piece.Number))
1132
	peer.AppendPieceCost(piece.Cost)
1133
	peer.PieceUpdatedAt.Store(time.Now())
1134
	peer.UpdatedAt.Store(time.Now())
1135

1136
	// When the piece is downloaded successfully, parent.UpdatedAt needs to be updated
1137
	// to prevent the parent from being GC during the download process.
1138
	parent, loadedParent := v.resource.PeerManager().Load(piece.ParentID)
1139
	if loadedParent {
1140
		parent.UpdatedAt.Store(time.Now())
1141
		parent.Host.UpdatedAt.Store(time.Now())
1142
	}
1143

1144
	// Handle task with piece finished request.
1145
	peer.Task.UpdatedAt.Store(time.Now())
1146

1147
	// Collect piece and traffic metrics.
1148
	metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
1149
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1150
	metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
1151
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length))
1152
	if v.config.Metrics.EnableHost {
1153
		metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,
1154
			peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))
1155
		if loadedParent {
1156
			metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,
1157
				parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length))
1158
		}
1159
	}
1160

1161
	return nil
1162
}
1163

1164
// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
1165
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error {
1166
	// Construct piece.
1167
	piece := &resource.Piece{
1168
		Number:      int32(req.Piece.GetNumber()),
1169
		ParentID:    req.Piece.GetParentId(),
1170
		Offset:      req.Piece.GetOffset(),
1171
		Length:      req.Piece.GetLength(),
1172
		TrafficType: req.Piece.GetTrafficType(),
1173
		Cost:        req.Piece.GetCost().AsDuration(),
1174
		CreatedAt:   req.Piece.GetCreatedAt().AsTime(),
1175
	}
1176

1177
	if len(req.Piece.GetDigest()) > 0 {
1178
		d, err := digest.Parse(req.Piece.GetDigest())
1179
		if err != nil {
1180
			return status.Errorf(codes.InvalidArgument, err.Error())
1181
		}
1182

1183
		piece.Digest = d
1184
	}
1185

1186
	peer, loaded := v.resource.PeerManager().Load(peerID)
1187
	if !loaded {
1188
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1189
	}
1190

1191
	// Handle peer with piece back-to-source finished request. When the piece is downloaded successfully, peer.UpdatedAt
1192
	// needs to be updated to prevent the peer from being GC during the download process.
1193
	peer.StorePiece(piece)
1194
	peer.FinishedPieces.Set(uint(piece.Number))
1195
	peer.AppendPieceCost(piece.Cost)
1196
	peer.PieceUpdatedAt.Store(time.Now())
1197
	peer.UpdatedAt.Store(time.Now())
1198

1199
	// Handle task with piece back-to-source finished request.
1200
	peer.Task.StorePiece(piece)
1201
	peer.Task.UpdatedAt.Store(time.Now())
1202

1203
	// Collect piece and traffic metrics.
1204
	metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
1205
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1206
	metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
1207
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length))
1208
	if v.config.Metrics.EnableHost {
1209
		metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application,
1210
			peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))
1211
	}
1212

1213
	return nil
1214
}
1215

1216
// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
1217
func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFailedRequest) error {
1218
	peer, loaded := v.resource.PeerManager().Load(peerID)
1219
	if !loaded {
1220
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1221
	}
1222

1223
	// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
1224
	metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
1225
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1226
	metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
1227
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1228

1229
	if req.Temporary {
1230
		// Handle peer with piece temporary failed request.
1231
		peer.UpdatedAt.Store(time.Now())
1232
		peer.BlockParents.Add(req.GetParentId())
1233
		if parent, loaded := v.resource.PeerManager().Load(req.GetParentId()); loaded {
1234
			parent.Host.UploadFailedCount.Inc()
1235
		}
1236

1237
		// Handle task with piece temporary failed request.
1238
		peer.Task.UpdatedAt.Store(time.Now())
1239
		return nil
1240
	}
1241

1242
	return status.Error(codes.FailedPrecondition, "download piece failed")
1243
}
1244

1245
// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
1246
func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFailedRequest) error {
1247
	peer, loaded := v.resource.PeerManager().Load(peerID)
1248
	if !loaded {
1249
		return status.Errorf(codes.NotFound, "peer %s not found", peerID)
1250
	}
1251

1252
	// Handle peer with piece back-to-source failed request.
1253
	peer.UpdatedAt.Store(time.Now())
1254

1255
	// Handle task with piece back-to-source failed request.
1256
	peer.Task.UpdatedAt.Store(time.Now())
1257

1258
	// Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
1259
	metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
1260
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1261
	metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
1262
		peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
1263

1264
	return status.Error(codes.Internal, "download piece from source failed")
1265
}
1266

1267
// handleResource handles resource included host, task, and peer.
1268
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) {
1269
	// If the host does not exist and the host address cannot be found,
1270
	// it may cause an exception.
1271
	host, loaded := v.resource.HostManager().Load(hostID)
1272
	if !loaded {
1273
		return nil, nil, nil, status.Errorf(codes.NotFound, "host %s not found", hostID)
1274
	}
1275

1276
	// Store new task or update task.
1277
	task, loaded := v.resource.TaskManager().Load(taskID)
1278
	if !loaded {
1279
		options := []resource.TaskOption{resource.WithPieceLength(int32(download.GetPieceLength()))}
1280
		if download.GetDigest() != "" {
1281
			d, err := digest.Parse(download.GetDigest())
1282
			if err != nil {
1283
				return nil, nil, nil, status.Error(codes.InvalidArgument, err.Error())
1284
			}
1285

1286
			// If request has invalid digest, then new task with the nil digest.
1287
			options = append(options, resource.WithDigest(d))
1288
		}
1289

1290
		task = resource.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(),
1291
			download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...)
1292
		v.resource.TaskManager().Store(task)
1293
	} else {
1294
		task.URL = download.GetUrl()
1295
		task.FilteredQueryParams = download.GetFilteredQueryParams()
1296
		task.Header = download.GetRequestHeader()
1297
	}
1298

1299
	// Store new peer or load peer.
1300
	peer, loaded := v.resource.PeerManager().Load(peerID)
1301
	if !loaded {
1302
		options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}
1303
		if download.GetRange() != nil {
1304
			options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))
1305
		}
1306

1307
		peer = resource.NewPeer(peerID, &v.config.Resource, task, host, options...)
1308
		v.resource.PeerManager().Store(peer)
1309
	}
1310

1311
	return host, task, peer, nil
1312
}
1313

1314
// downloadTaskBySeedPeer downloads task by seed peer.
1315
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download *commonv2.Download, peer *resource.Peer) error {
1316
	// Trigger the first download task based on different priority levels,
1317
	// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
1318
	priority := peer.CalculatePriority(v.dynconfig)
1319
	peer.Log.Infof("peer priority is %s", priority.String())
1320
	switch priority {
1321
	case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
1322
		// Super peer is first triggered to download back-to-source.
1323
		if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
1324
			go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
1325
				peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
1326
				if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
1327
					peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
1328
					return
1329
				}
1330

1331
				peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
1332
			}(ctx, taskID, download, types.HostTypeSuperSeed)
1333

1334
			break
1335
		}
1336

1337
		fallthrough
1338
	case commonv2.Priority_LEVEL5:
1339
		// Strong peer is first triggered to download back-to-source.
1340
		if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
1341
			go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
1342
				peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
1343
				if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
1344
					peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
1345
					return
1346
				}
1347

1348
				peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
1349
			}(ctx, taskID, download, types.HostTypeSuperSeed)
1350

1351
			break
1352
		}
1353

1354
		fallthrough
1355
	case commonv2.Priority_LEVEL4:
1356
		// Weak peer is first triggered to download back-to-source.
1357
		if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
1358
			go func(ctx context.Context, taskID string, download *commonv2.Download, hostType types.HostType) {
1359
				peer.Log.Infof("%s seed peer triggers download task", hostType.Name())
1360
				if err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), taskID, &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}); err != nil {
1361
					peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
1362
					return
1363
				}
1364

1365
				peer.Log.Infof("%s seed peer triggers download task success", hostType.Name())
1366
			}(ctx, taskID, download, types.HostTypeSuperSeed)
1367

1368
			break
1369
		}
1370

1371
		fallthrough
1372
	case commonv2.Priority_LEVEL3:
1373
		// When the task has no available peer,
1374
		// the peer is first to download back-to-source.
1375
		peer.NeedBackToSource.Store(true)
1376
	case commonv2.Priority_LEVEL2:
1377
		// Peer is first to download back-to-source.
1378
		return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
1379
	case commonv2.Priority_LEVEL1:
1380
		// Download task is forbidden.
1381
		return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
1382
	default:
1383
		return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
1384
	}
1385

1386
	return nil
1387
}
1388

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.