Dragonfly2

Форк
0
/
scheduler_server_v1.go 
173 строки · 6.0 Кб
1
/*
2
 *     Copyright 2023 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package rpcserver
18

19
import (
20
	"context"
21

22
	"google.golang.org/protobuf/types/known/emptypb"
23

24
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
25
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
26

27
	"d7y.io/dragonfly/v2/pkg/idgen"
28
	"d7y.io/dragonfly/v2/pkg/types"
29
	"d7y.io/dragonfly/v2/scheduler/config"
30
	"d7y.io/dragonfly/v2/scheduler/metrics"
31
	"d7y.io/dragonfly/v2/scheduler/networktopology"
32
	"d7y.io/dragonfly/v2/scheduler/resource"
33
	"d7y.io/dragonfly/v2/scheduler/scheduling"
34
	"d7y.io/dragonfly/v2/scheduler/service"
35
	"d7y.io/dragonfly/v2/scheduler/storage"
36
)
37

38
// schedulerServerV1 is v1 version of the scheduler grpc server.
39
type schedulerServerV1 struct {
40
	// Service interface.
41
	service *service.V1
42
}
43

44
// newSchedulerServerV1 returns v1 version of the scheduler server.
45
func newSchedulerServerV1(
46
	cfg *config.Config,
47
	resource resource.Resource,
48
	scheduling scheduling.Scheduling,
49
	dynconfig config.DynconfigInterface,
50
	storage storage.Storage,
51
	networkTopology networktopology.NetworkTopology,
52
) schedulerv1.SchedulerServer {
53
	return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
54
}
55

56
// RegisterPeerTask registers peer and triggers seed peer download task.
57
func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
58
	// FIXME Scheudler will not generate task id.
59
	if req.TaskId == "" {
60
		req.TaskId = idgen.TaskIDV1(req.Url, req.UrlMeta)
61
	}
62

63
	tag := req.UrlMeta.Tag
64
	application := req.UrlMeta.Application
65
	priority := req.UrlMeta.Priority
66

67
	// Collect RegisterPeerCount metrics.
68
	metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
69
		tag, application, types.HostTypeNormalName).Inc()
70
	resp, err := s.service.RegisterPeerTask(ctx, req)
71
	if err != nil {
72
		// Collect RegisterPeerFailureCount metrics.
73
		metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
74
			tag, application, types.HostTypeNormalName).Inc()
75
	}
76

77
	return resp, err
78
}
79

80
// ReportPieceResult handles the piece information reported by dfdaemon.
81
func (s *schedulerServerV1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error {
82
	// Collect ConcurrentScheduleGauge metrics.
83
	metrics.ConcurrentScheduleGauge.Inc()
84
	defer metrics.ConcurrentScheduleGauge.Dec()
85

86
	return s.service.ReportPieceResult(stream)
87
}
88

89
// ReportPeerResult handles peer result reported by dfdaemon.
90
func (s *schedulerServerV1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) (*emptypb.Empty, error) {
91
	return new(emptypb.Empty), s.service.ReportPeerResult(ctx, req)
92
}
93

94
// AnnounceTask informs scheduler a peer has completed task.
95
func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest) (*emptypb.Empty, error) {
96
	// Collect AnnouncePeerCount metrics.
97
	metrics.AnnouncePeerCount.Inc()
98
	if err := s.service.AnnounceTask(ctx, req); err != nil {
99
		// Collect AnnouncePeerFailureCount metrics.
100
		metrics.AnnouncePeerFailureCount.Inc()
101
		return nil, err
102
	}
103

104
	return new(emptypb.Empty), nil
105
}
106

107
// StatTask checks if the given task exists.
108
func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) {
109
	// Collect StatTaskCount metrics.
110
	metrics.StatTaskCount.Inc()
111
	resp, err := s.service.StatTask(ctx, req)
112
	if err != nil {
113
		// Collect StatTaskFailureCount metrics.
114
		metrics.StatTaskFailureCount.Inc()
115
		return nil, err
116
	}
117

118
	return resp, nil
119
}
120

121
// LeaveTask makes the peer unschedulable.
122
func (s *schedulerServerV1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) (*emptypb.Empty, error) {
123
	// Collect LeavePeerCount metrics.
124
	metrics.LeavePeerCount.Inc()
125
	if err := s.service.LeaveTask(ctx, req); err != nil {
126
		// Collect LeavePeerFailureCount metrics.
127
		metrics.LeavePeerFailureCount.Inc()
128
		return nil, err
129
	}
130

131
	return new(emptypb.Empty), nil
132
}
133

134
// AnnounceHost announces host to scheduler.
135
func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequest) (*emptypb.Empty, error) {
136
	// Collect AnnounceHostCount metrics.
137
	metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
138
		req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
139
	if err := s.service.AnnounceHost(ctx, req); err != nil {
140
		// Collect AnnounceHostFailureCount metrics.
141
		metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion,
142
			req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc()
143
		return nil, err
144
	}
145

146
	return new(emptypb.Empty), nil
147
}
148

149
// LeaveHost releases host in scheduler.
150
func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) (*emptypb.Empty, error) {
151
	// Collect LeaveHostCount metrics.
152
	metrics.LeaveHostCount.Inc()
153
	if err := s.service.LeaveHost(ctx, req); err != nil {
154
		// Collect LeaveHostFailureCount metrics.
155
		metrics.LeaveHostFailureCount.Inc()
156
		return nil, err
157
	}
158

159
	return new(emptypb.Empty), nil
160
}
161

162
// SyncProbes sync probes of the host.
163
func (s *schedulerServerV1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
164
	// Collect SyncProbesCount metrics.
165
	metrics.SyncProbesCount.Inc()
166
	if err := s.service.SyncProbes(stream); err != nil {
167
		// Collect SyncProbesFailureCount metrics.
168
		metrics.SyncProbesFailureCount.Inc()
169
		return err
170
	}
171

172
	return nil
173
}
174

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.