Dragonfly2
330 строк · 8.3 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package manager
18
19import (
20"context"
21"crypto/tls"
22"embed"
23"io/fs"
24"net/http"
25"path"
26"time"
27
28"github.com/gin-contrib/static"
29"github.com/johanbrandhorst/certify"
30"google.golang.org/grpc"
31zapadapter "logur.dev/adapter/zap"
32
33logger "d7y.io/dragonfly/v2/internal/dflog"
34"d7y.io/dragonfly/v2/manager/cache"
35"d7y.io/dragonfly/v2/manager/config"
36"d7y.io/dragonfly/v2/manager/database"
37"d7y.io/dragonfly/v2/manager/job"
38"d7y.io/dragonfly/v2/manager/metrics"
39"d7y.io/dragonfly/v2/manager/permission/rbac"
40"d7y.io/dragonfly/v2/manager/router"
41"d7y.io/dragonfly/v2/manager/rpcserver"
42"d7y.io/dragonfly/v2/manager/searcher"
43"d7y.io/dragonfly/v2/manager/service"
44pkgcache "d7y.io/dragonfly/v2/pkg/cache"
45"d7y.io/dragonfly/v2/pkg/dfpath"
46"d7y.io/dragonfly/v2/pkg/issuer"
47"d7y.io/dragonfly/v2/pkg/objectstorage"
48"d7y.io/dragonfly/v2/pkg/rpc"
49"d7y.io/dragonfly/v2/pkg/types"
50)
51
52const (
53// gracefulStopTimeout specifies a time limit for
54// grpc server to complete a graceful shutdown.
55gracefulStopTimeout = 10 * time.Minute
56
57// assetsTargetPath is target path of embed assets.
58assetsTargetPath = "dist"
59)
60
61//go:embed dist/*
62var assets embed.FS
63
64type embedFileSystem struct {
65http.FileSystem
66}
67
68func (e embedFileSystem) Exists(prefix string, path string) bool {
69_, err := e.Open(path)
70if err != nil {
71return false
72}
73return true
74}
75
76func EmbedFolder(fsEmbed embed.FS, targetPath string) static.ServeFileSystem {
77fsys, err := fs.Sub(fsEmbed, targetPath)
78if err != nil {
79panic(err)
80}
81
82return embedFileSystem{
83FileSystem: http.FS(fsys),
84}
85}
86
87// Server is the manager server.
88type Server struct {
89// Server configuration.
90config *config.Config
91
92// Job server.
93job *job.Job
94
95// GRPC server.
96grpcServer *grpc.Server
97
98// REST server.
99restServer *http.Server
100
101// Metrics server.
102metricsServer *http.Server
103}
104
105// New creates a new manager server.
106func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
107s := &Server{config: cfg}
108
109// Initialize database.
110db, err := database.New(cfg)
111if err != nil {
112return nil, err
113}
114
115// Initialize enforcer.
116enforcer, err := rbac.NewEnforcer(db.DB)
117if err != nil {
118return nil, err
119}
120
121// Initialize cache.
122cache, err := cache.New(cfg)
123if err != nil {
124return nil, err
125}
126
127// Initialize searcher.
128searcher := searcher.New(d.PluginDir())
129
130// Initialize job.
131job, err := job.New(cfg, db.DB)
132if err != nil {
133return nil, err
134}
135s.job = job
136
137// Initialize object storage.
138var objectStorage objectstorage.ObjectStorage
139if cfg.ObjectStorage.Enable {
140objectStorage, err = objectstorage.New(
141cfg.ObjectStorage.Name,
142cfg.ObjectStorage.Region,
143cfg.ObjectStorage.Endpoint,
144cfg.ObjectStorage.AccessKey,
145cfg.ObjectStorage.SecretKey,
146objectstorage.WithS3ForcePathStyle(cfg.ObjectStorage.S3ForcePathStyle),
147)
148if err != nil {
149return nil, err
150}
151}
152
153// Initialize REST server.
154restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
155router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, EmbedFolder(assets, assetsTargetPath))
156if err != nil {
157return nil, err
158}
159s.restServer = &http.Server{
160Addr: cfg.Server.REST.Addr,
161Handler: router,
162}
163
164// Initialize roles and check roles.
165err = rbac.InitRBAC(enforcer, router, db.DB)
166if err != nil {
167return nil, err
168}
169
170// Initialize signing certificate and tls credentials of grpc server.
171var options []rpcserver.Option
172if cfg.Security.AutoIssueCert {
173cert, err := tls.X509KeyPair([]byte(cfg.Security.CACert), []byte(cfg.Security.CAKey))
174if err != nil {
175return nil, err
176}
177
178certifyClient := &certify.Certify{
179CommonName: types.ManagerName,
180Issuer: issuer.NewDragonflyManagerIssuer(
181&cert,
182issuer.WithManagerValidityPeriod(cfg.Security.CertSpec.ValidityPeriod),
183),
184RenewBefore: time.Hour,
185CertConfig: &certify.CertConfig{
186SubjectAlternativeNames: cfg.Security.CertSpec.DNSNames,
187IPSubjectAlternativeNames: append(cfg.Security.CertSpec.IPAddresses, cfg.Server.GRPC.AdvertiseIP),
188},
189IssueTimeout: 0,
190Logger: zapadapter.New(logger.CoreLogger.Desugar()),
191Cache: pkgcache.NewCertifyMutliCache(
192certify.NewMemCache(),
193certify.DirCache(path.Join(d.CacheDir(), pkgcache.CertifyCacheDirName, types.ManagerName))),
194}
195
196// Issue a certificate to reduce first time delay.
197if _, err := certifyClient.GetCertificate(&tls.ClientHelloInfo{
198ServerName: cfg.Server.GRPC.AdvertiseIP.String(),
199}); err != nil {
200logger.Errorf("issue certificate error: %s", err.Error())
201return nil, err
202}
203
204// Manager GRPC server's tls varify must be false. If ClientCAs are required for client verification,
205// the client cannot call the IssueCertificate api.
206transportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, false, []byte(cfg.Security.CACert), certifyClient)
207if err != nil {
208return nil, err
209}
210
211options = append(
212options,
213// Set ca certificate for issuing certificate.
214rpcserver.WithSelfSignedCert(&cert),
215// Set tls credentials for grpc server.
216rpcserver.WithGRPCServerOptions([]grpc.ServerOption{grpc.Creds(transportCredentials)}),
217)
218}
219
220// Initialize GRPC server.
221_, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...)
222if err != nil {
223return nil, err
224}
225
226s.grpcServer = grpcServer
227
228// Initialize prometheus.
229if cfg.Metrics.Enable {
230s.metricsServer = metrics.New(&cfg.Metrics, grpcServer)
231}
232
233return s, nil
234}
235
236// Serve starts the manager server.
237func (s *Server) Serve() error {
238// Started REST server.
239go func() {
240logger.Infof("started rest server at %s", s.restServer.Addr)
241if s.config.Server.REST.TLS != nil {
242if err := s.restServer.ListenAndServeTLS(s.config.Server.REST.TLS.Cert, s.config.Server.REST.TLS.Key); err != nil {
243if err == http.ErrServerClosed {
244return
245}
246logger.Fatalf("rest server closed unexpect: %v", err)
247}
248} else {
249if err := s.restServer.ListenAndServe(); err != nil {
250if err == http.ErrServerClosed {
251return
252}
253logger.Fatalf("rest server closed unexpect: %v", err)
254}
255}
256}()
257
258// Started metrics server.
259if s.metricsServer != nil {
260go func() {
261logger.Infof("started metrics server at %s", s.metricsServer.Addr)
262if err := s.metricsServer.ListenAndServe(); err != nil {
263if err == http.ErrServerClosed {
264return
265}
266logger.Fatalf("metrics server closed unexpect: %v", err)
267}
268}()
269}
270
271// Started job server.
272go func() {
273logger.Info("started job server")
274s.job.Serve()
275}()
276
277// Generate GRPC listener.
278lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.ListenIP.String(), s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End)
279if err != nil {
280logger.Fatalf("net listener failed to start: %v", err)
281}
282defer lis.Close()
283
284// Started GRPC server.
285logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String())
286if err := s.grpcServer.Serve(lis); err != nil {
287logger.Errorf("stoped grpc server: %+v", err)
288return err
289}
290
291return nil
292}
293
294// Stop stops the manager server.
295func (s *Server) Stop() {
296// Stop REST server.
297if err := s.restServer.Shutdown(context.Background()); err != nil {
298logger.Errorf("rest server failed to stop: %+v", err)
299} else {
300logger.Info("rest server closed under request")
301}
302
303// Stop metrics server.
304if s.metricsServer != nil {
305if err := s.metricsServer.Shutdown(context.Background()); err != nil {
306logger.Errorf("metrics server failed to stop: %+v", err)
307} else {
308logger.Info("metrics server closed under request")
309}
310}
311
312// Stop job server.
313s.job.Stop()
314
315// Stop GRPC server.
316stopped := make(chan struct{})
317go func() {
318s.grpcServer.GracefulStop()
319logger.Info("grpc server closed under request")
320close(stopped)
321}()
322
323t := time.NewTimer(gracefulStopTimeout)
324select {
325case <-t.C:
326s.grpcServer.Stop()
327case <-stopped:
328t.Stop()
329}
330}
331