Dragonfly2
173 строки · 6.0 Кб
1/*
2* Copyright 2023 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package rpcserver
18
19import (
20"context"
21
22"google.golang.org/protobuf/types/known/emptypb"
23
24commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
25schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
26
27"d7y.io/dragonfly/v2/pkg/idgen"
28"d7y.io/dragonfly/v2/pkg/types"
29"d7y.io/dragonfly/v2/scheduler/config"
30"d7y.io/dragonfly/v2/scheduler/metrics"
31"d7y.io/dragonfly/v2/scheduler/networktopology"
32"d7y.io/dragonfly/v2/scheduler/resource"
33"d7y.io/dragonfly/v2/scheduler/scheduling"
34"d7y.io/dragonfly/v2/scheduler/service"
35"d7y.io/dragonfly/v2/scheduler/storage"
36)
37
38// schedulerServerV1 is v1 version of the scheduler grpc server.
39type schedulerServerV1 struct {
40// Service interface.
41service *service.V1
42}
43
44// newSchedulerServerV1 returns v1 version of the scheduler server.
45func newSchedulerServerV1(
46cfg *config.Config,
47resource resource.Resource,
48scheduling scheduling.Scheduling,
49dynconfig config.DynconfigInterface,
50storage storage.Storage,
51networkTopology networktopology.NetworkTopology,
52) schedulerv1.SchedulerServer {
53return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
54}
55
56// RegisterPeerTask registers peer and triggers seed peer download task.
57func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
58// FIXME Scheudler will not generate task id.
59if req.TaskId == "" {
60req.TaskId = idgen.TaskIDV1(req.Url, req.UrlMeta)
61}
62
63tag := req.UrlMeta.Tag
64application := req.UrlMeta.Application
65priority := req.UrlMeta.Priority
66
67// Collect RegisterPeerCount metrics.
68metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
69tag, application, types.HostTypeNormalName).Inc()
70resp, err := s.service.RegisterPeerTask(ctx, req)
71if err != nil {
72// Collect RegisterPeerFailureCount metrics.
73metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
74tag, application, types.HostTypeNormalName).Inc()
75}
76
77return resp, err
78}
79
80// ReportPieceResult handles the piece information reported by dfdaemon.
81func (s *schedulerServerV1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error {
82// Collect ConcurrentScheduleGauge metrics.
83metrics.ConcurrentScheduleGauge.Inc()
84defer metrics.ConcurrentScheduleGauge.Dec()
85
86return s.service.ReportPieceResult(stream)
87}
88
89// ReportPeerResult handles peer result reported by dfdaemon.
90func (s *schedulerServerV1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) (*emptypb.Empty, error) {
91return new(emptypb.Empty), s.service.ReportPeerResult(ctx, req)
92}
93
94// AnnounceTask informs scheduler a peer has completed task.
95func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) (*emptypb.Empty, error) {
96// Collect AnnouncePeerCount metrics.
97metrics.AnnouncePeerCount.Inc()
98if err := s.service.AnnounceTask(ctx, req); err != nil {
99// Collect AnnouncePeerFailureCount metrics.
100metrics.AnnouncePeerFailureCount.Inc()
101return nil, err
102}
103
104return new(emptypb.Empty), nil
105}
106
107// StatTask checks if the given task exists.
108func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) {
109// Collect StatTaskCount metrics.
110metrics.StatTaskCount.Inc()
111resp, err := s.service.StatTask(ctx, req)
112if err != nil {
113// Collect StatTaskFailureCount metrics.
114metrics.StatTaskFailureCount.Inc()
115return nil, err
116}
117
118return resp, nil
119}
120
121// LeaveTask makes the peer unschedulable.
122func (s *schedulerServerV1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) (*emptypb.Empty, error) {
123// Collect LeavePeerCount metrics.
124metrics.LeavePeerCount.Inc()
125if err := s.service.LeaveTask(ctx, req); err != nil {
126// Collect LeavePeerFailureCount metrics.
127metrics.LeavePeerFailureCount.Inc()
128return nil, err
129}
130
131return new(emptypb.Empty), nil
132}
133
134// AnnounceHost announces host to scheduler.
135func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequest) (*emptypb.Empty, error) {
136// Collect AnnounceHostCount metrics.
137metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
138req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
139if err := s.service.AnnounceHost(ctx, req); err != nil {
140// Collect AnnounceHostFailureCount metrics.
141metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
142req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
143return nil, err
144}
145
146return new(emptypb.Empty), nil
147}
148
149// LeaveHost releases host in scheduler.
150func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) (*emptypb.Empty, error) {
151// Collect LeaveHostCount metrics.
152metrics.LeaveHostCount.Inc()
153if err := s.service.LeaveHost(ctx, req); err != nil {
154// Collect LeaveHostFailureCount metrics.
155metrics.LeaveHostFailureCount.Inc()
156return nil, err
157}
158
159return new(emptypb.Empty), nil
160}
161
162// SyncProbes sync probes of the host.
163func (s *schedulerServerV1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
164// Collect SyncProbesCount metrics.
165metrics.SyncProbesCount.Inc()
166if err := s.service.SyncProbes(stream); err != nil {
167// Collect SyncProbesFailureCount metrics.
168metrics.SyncProbesFailureCount.Inc()
169return err
170}
171
172return nil
173}
174