Dragonfly2
467 строк · 12.9 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package scheduler18
19import (20"context"21"crypto/tls"22"errors"23"fmt"24"net"25"net/http"26"path/filepath"27"time"28
29"github.com/go-redis/redis/v8"30"github.com/johanbrandhorst/certify"31"google.golang.org/grpc"32"google.golang.org/grpc/credentials"33"google.golang.org/grpc/credentials/insecure"34zapadapter "logur.dev/adapter/zap"35
36logger "d7y.io/dragonfly/v2/internal/dflog"37"d7y.io/dragonfly/v2/internal/dynconfig"38"d7y.io/dragonfly/v2/pkg/cache"39"d7y.io/dragonfly/v2/pkg/dfpath"40"d7y.io/dragonfly/v2/pkg/gc"41"d7y.io/dragonfly/v2/pkg/issuer"42"d7y.io/dragonfly/v2/pkg/net/ip"43pkgredis "d7y.io/dragonfly/v2/pkg/redis"44"d7y.io/dragonfly/v2/pkg/rpc"45managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"46securityclient "d7y.io/dragonfly/v2/pkg/rpc/security/client"47trainerclient "d7y.io/dragonfly/v2/pkg/rpc/trainer/client"48"d7y.io/dragonfly/v2/pkg/types"49"d7y.io/dragonfly/v2/scheduler/announcer"50"d7y.io/dragonfly/v2/scheduler/config"51"d7y.io/dragonfly/v2/scheduler/job"52"d7y.io/dragonfly/v2/scheduler/metrics"53"d7y.io/dragonfly/v2/scheduler/networktopology"54"d7y.io/dragonfly/v2/scheduler/resource"55"d7y.io/dragonfly/v2/scheduler/rpcserver"56"d7y.io/dragonfly/v2/scheduler/scheduling"57"d7y.io/dragonfly/v2/scheduler/scheduling/evaluator"58"d7y.io/dragonfly/v2/scheduler/storage"59)
60
61const (62// gracefulStopTimeout specifies a time limit for63// grpc server to complete a graceful shutdown.64gracefulStopTimeout = 10 * time.Minute65)
66
67// Server is the scheduler server.
68type Server struct {69// Server configuration.70config *config.Config71
72// GRPC server.73grpcServer *grpc.Server74
75// Metrics server.76metricsServer *http.Server77
78// Manager client.79managerClient managerclient.V280
81// Security client.82securityClient securityclient.V183
84// Trainer client.85trainerClient trainerclient.V186
87// Resource interface.88resource resource.Resource89
90// Dynamic config.91dynconfig config.DynconfigInterface92
93// Async job.94job job.Job95
96// Storage interface.97storage storage.Storage98
99// Announcer interface.100announcer announcer.Announcer101
102// Network topology interface.103networkTopology networktopology.NetworkTopology104
105// GC service.106gc gc.GC107}
108
109// New creates a new scheduler server.
110func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) {111s := &Server{config: cfg}112
113// Initialize Storage.114storage, err := storage.New(115d.DataDir(),116cfg.Storage.MaxSize,117cfg.Storage.MaxBackups,118cfg.Storage.BufferSize,119)120if err != nil {121return nil, err122}123s.storage = storage124
125// Initialize dial options of manager grpc client.126managerDialOptions := []grpc.DialOption{}127if cfg.Security.AutoIssueCert {128clientTransportCredentials, err := rpc.NewClientCredentials(cfg.Security.TLSPolicy, nil, []byte(cfg.Security.CACert))129if err != nil {130return nil, err131}132
133managerDialOptions = append(managerDialOptions, grpc.WithTransportCredentials(clientTransportCredentials))134} else {135managerDialOptions = append(managerDialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))136}137
138// Initialize manager client.139managerClient, err := managerclient.GetV2ByAddr(ctx, cfg.Manager.Addr, managerDialOptions...)140if err != nil {141return nil, err142}143s.managerClient = managerClient144
145// Initialize dial options of trainer grpc client.146if cfg.Trainer.Enable {147trainerDialOptions := []grpc.DialOption{}148if cfg.Security.AutoIssueCert {149clientTransportCredentials, err := rpc.NewClientCredentials(cfg.Security.TLSPolicy, nil, []byte(cfg.Security.CACert))150if err != nil {151return nil, err152}153
154trainerDialOptions = append(trainerDialOptions, grpc.WithTransportCredentials(clientTransportCredentials))155} else {156trainerDialOptions = append(trainerDialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))157}158
159// Initialize trainer client.160trainerClient, err := trainerclient.GetV1ByAddr(ctx, cfg.Trainer.Addr, trainerDialOptions...)161if err != nil {162return nil, err163}164s.trainerClient = trainerClient165}166
167// Initialize dial options of announcer.168announcerOptions := []announcer.Option{}169if s.trainerClient != nil {170announcerOptions = append(announcerOptions, announcer.WithTrainerClient(s.trainerClient))171}172
173// Initialize announcer.174announcer, err := announcer.New(cfg, s.managerClient, storage, announcerOptions...)175if err != nil {176return nil, err177}178s.announcer = announcer179
180// Initialize certify client.181var (182certifyClient *certify.Certify183clientTransportCredentials credentials.TransportCredentials184)185if cfg.Security.AutoIssueCert {186// Initialize security client.187securityClient, err := securityclient.GetV1(ctx, cfg.Manager.Addr, managerDialOptions...)188if err != nil {189return nil, err190}191s.securityClient = securityClient192
193certifyClient = &certify.Certify{194CommonName: types.SchedulerName,195Issuer: issuer.NewDragonflyIssuer(s.securityClient, issuer.WithValidityPeriod(cfg.Security.CertSpec.ValidityPeriod)),196RenewBefore: time.Hour,197CertConfig: &certify.CertConfig{198SubjectAlternativeNames: cfg.Security.CertSpec.DNSNames,199IPSubjectAlternativeNames: append(cfg.Security.CertSpec.IPAddresses, cfg.Server.AdvertiseIP),200},201IssueTimeout: 0,202Logger: zapadapter.New(logger.CoreLogger.Desugar()),203Cache: cache.NewCertifyMutliCache(204certify.NewMemCache(),205certify.DirCache(filepath.Join(d.CacheDir(), cache.CertifyCacheDirName, types.SchedulerName))),206}207
208clientTransportCredentials, err = rpc.NewClientCredentialsByCertify(cfg.Security.TLSPolicy, []byte(cfg.Security.CACert), certifyClient)209if err != nil {210return nil, err211}212
213// Issue a certificate to reduce first time delay.214if _, err := certifyClient.GetCertificate(&tls.ClientHelloInfo{215ServerName: cfg.Server.AdvertiseIP.String(),216}); err != nil {217return nil, err218}219}220
221// Initialize dynconfig client.222dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, config.WithTransportCredentials(clientTransportCredentials))223if err != nil {224return nil, err225}226s.dynconfig = dynconfig227
228// Initialize GC.229s.gc = gc.New(gc.WithLogger(logger.GCLogger))230
231// Initialize resource.232resource, err := resource.New(cfg, s.gc, dynconfig, resource.WithTransportCredentials(clientTransportCredentials))233if err != nil {234return nil, err235}236s.resource = resource237
238// Initialize redis client.239var rdb redis.UniversalClient240if pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {241rdb, err = pkgredis.NewRedis(&redis.UniversalOptions{242Addrs: cfg.Database.Redis.Addrs,243MasterName: cfg.Database.Redis.MasterName,244DB: cfg.Database.Redis.NetworkTopologyDB,245Username: cfg.Database.Redis.Username,246Password: cfg.Database.Redis.Password,247})248if err != nil {249return nil, err250}251}252
253// Initialize job service.254if cfg.Job.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {255s.job, err = job.New(cfg, resource)256if err != nil {257return nil, err258}259}260
261// Initialize options of evaluator.262evaluatorNetworkTopologyOptions := []evaluator.NetworkTopologyOption{}263// Initialize network topology service.264if cfg.Scheduler.Algorithm == evaluator.NetworkTopologyAlgorithm {265cache := cache.New(cfg.Scheduler.NetworkTopology.Cache.TTL, cfg.Scheduler.NetworkTopology.Cache.Interval)266s.networkTopology, err = networktopology.NewNetworkTopology(cfg.Scheduler.NetworkTopology, rdb, cache, resource, s.storage)267if err != nil {268return nil, err269}270
271evaluatorNetworkTopologyOptions = append(evaluatorNetworkTopologyOptions, evaluator.WithNetworkTopology(s.networkTopology))272}273
274// Initialize scheduling.275scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), evaluatorNetworkTopologyOptions...)276
277// Initialize server options of scheduler grpc server.278schedulerServerOptions := []grpc.ServerOption{}279if certifyClient != nil {280serverTransportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, cfg.Security.TLSVerify, []byte(cfg.Security.CACert), certifyClient)281if err != nil {282return nil, err283}284
285schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(serverTransportCredentials))286} else {287schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(insecure.NewCredentials()))288}289
290svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, s.networkTopology, schedulerServerOptions...)291s.grpcServer = svr292
293// Initialize metrics.294if cfg.Metrics.Enable {295s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer)296}297
298return s, nil299}
300
301// Serve starts the scheduler server.
302func (s *Server) Serve() error {303// Serve dynconfig.304go func() {305if err := s.dynconfig.Serve(); err != nil {306logger.Fatalf("dynconfig start failed %s", err.Error())307}308
309logger.Info("dynconfig start successfully")310}()311
312// Serve GC.313s.gc.Start()314logger.Info("gc start successfully")315
316// Serve Job.317if s.job != nil {318s.job.Serve()319logger.Info("job start successfully")320}321
322// Started metrics server.323if s.metricsServer != nil {324go func() {325logger.Infof("started metrics server at %s", s.metricsServer.Addr)326if err := s.metricsServer.ListenAndServe(); err != nil {327if err == http.ErrServerClosed {328return329}330
331logger.Fatalf("metrics server closed unexpect: %s", err.Error())332}333}()334}335
336// Serve announcer.337go func() {338s.announcer.Serve()339logger.Info("announcer start successfully")340}()341
342// Serve network topology.343if s.networkTopology != nil {344go func() {345s.networkTopology.Serve()346logger.Info("network topology start successfully")347}()348}349
350// Generate GRPC limit listener.351ip, ok := ip.FormatIP(s.config.Server.ListenIP.String())352if !ok {353return errors.New("format ip failed")354}355
356listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, s.config.Server.Port))357if err != nil {358logger.Fatalf("net listener failed to start: %s", err.Error())359}360defer listener.Close()361
362// Started GRPC server.363logger.Infof("started grpc server at %s://%s", listener.Addr().Network(), listener.Addr().String())364if err := s.grpcServer.Serve(listener); err != nil {365logger.Errorf("stoped grpc server: %s", err.Error())366return err367}368
369return nil370}
371
372// Stop stops the scheduler server.
373func (s *Server) Stop() {374// Stop dynconfig.375if err := s.dynconfig.Stop(); err != nil {376logger.Errorf("stop dynconfig failed %s", err.Error())377} else {378logger.Info("stop dynconfig closed")379}380
381// Stop resource.382if err := s.resource.Stop(); err != nil {383logger.Errorf("stop resource failed %s", err.Error())384} else {385logger.Info("stop resource closed")386}387
388// Clean download storage.389if err := s.storage.ClearDownload(); err != nil {390logger.Errorf("clean download storage failed %s", err.Error())391} else {392logger.Info("clean download storage completed")393}394
395// Clean network topology storage.396if err := s.storage.ClearNetworkTopology(); err != nil {397logger.Errorf("clean network topology storage failed %s", err.Error())398} else {399logger.Info("clean network topology storage completed")400}401
402// Stop GC.403s.gc.Stop()404logger.Info("gc closed")405
406// Stop metrics server.407if s.metricsServer != nil {408if err := s.metricsServer.Shutdown(context.Background()); err != nil {409logger.Errorf("metrics server failed to stop: %s", err.Error())410} else {411logger.Info("metrics server closed under request")412}413}414
415// Stop announcer.416s.announcer.Stop()417logger.Info("stop announcer closed")418
419// Stop manager client.420if s.managerClient != nil {421if err := s.managerClient.Close(); err != nil {422logger.Errorf("manager client failed to stop: %s", err.Error())423} else {424logger.Info("manager client closed")425}426}427
428// Stop trainer client.429if s.trainerClient != nil {430if err := s.trainerClient.Close(); err != nil {431logger.Errorf("trainer client failed to stop: %s", err.Error())432} else {433logger.Info("trainer client closed")434}435}436
437// Stop security client.438if s.securityClient != nil {439if err := s.securityClient.Close(); err != nil {440logger.Errorf("security client failed to stop: %s", err.Error())441} else {442logger.Info("security client closed")443}444}445
446// Stop network topology.447if s.networkTopology != nil {448s.networkTopology.Stop()449logger.Info("network topology closed")450}451
452// Stop GRPC server.453stopped := make(chan struct{})454go func() {455s.grpcServer.GracefulStop()456logger.Info("grpc server closed under request")457close(stopped)458}()459
460t := time.NewTimer(gracefulStopTimeout)461select {462case <-t.C:463s.grpcServer.Stop()464case <-stopped:465t.Stop()466}467}
468