Dragonfly2

Форк
0
/
manager.go 
330 строк · 8.3 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package manager
18

19
import (
20
	"context"
21
	"crypto/tls"
22
	"embed"
23
	"io/fs"
24
	"net/http"
25
	"path"
26
	"time"
27

28
	"github.com/gin-contrib/static"
29
	"github.com/johanbrandhorst/certify"
30
	"google.golang.org/grpc"
31
	zapadapter "logur.dev/adapter/zap"
32

33
	logger "d7y.io/dragonfly/v2/internal/dflog"
34
	"d7y.io/dragonfly/v2/manager/cache"
35
	"d7y.io/dragonfly/v2/manager/config"
36
	"d7y.io/dragonfly/v2/manager/database"
37
	"d7y.io/dragonfly/v2/manager/job"
38
	"d7y.io/dragonfly/v2/manager/metrics"
39
	"d7y.io/dragonfly/v2/manager/permission/rbac"
40
	"d7y.io/dragonfly/v2/manager/router"
41
	"d7y.io/dragonfly/v2/manager/rpcserver"
42
	"d7y.io/dragonfly/v2/manager/searcher"
43
	"d7y.io/dragonfly/v2/manager/service"
44
	pkgcache "d7y.io/dragonfly/v2/pkg/cache"
45
	"d7y.io/dragonfly/v2/pkg/dfpath"
46
	"d7y.io/dragonfly/v2/pkg/issuer"
47
	"d7y.io/dragonfly/v2/pkg/objectstorage"
48
	"d7y.io/dragonfly/v2/pkg/rpc"
49
	"d7y.io/dragonfly/v2/pkg/types"
50
)
51

52
const (
53
	// gracefulStopTimeout specifies a time limit for
54
	// grpc server to complete a graceful shutdown.
55
	gracefulStopTimeout = 10 * time.Minute
56

57
	// assetsTargetPath is target path of embed assets.
58
	assetsTargetPath = "dist"
59
)
60

61
//go:embed dist/*
62
var assets embed.FS
63

64
type embedFileSystem struct {
65
	http.FileSystem
66
}
67

68
func (e embedFileSystem) Exists(prefix string, path string) bool {
69
	_, err := e.Open(path)
70
	if err != nil {
71
		return false
72
	}
73
	return true
74
}
75

76
func EmbedFolder(fsEmbed embed.FS, targetPath string) static.ServeFileSystem {
77
	fsys, err := fs.Sub(fsEmbed, targetPath)
78
	if err != nil {
79
		panic(err)
80
	}
81

82
	return embedFileSystem{
83
		FileSystem: http.FS(fsys),
84
	}
85
}
86

87
// Server is the manager server.
88
type Server struct {
89
	// Server configuration.
90
	config *config.Config
91

92
	// Job server.
93
	job *job.Job
94

95
	// GRPC server.
96
	grpcServer *grpc.Server
97

98
	// REST server.
99
	restServer *http.Server
100

101
	// Metrics server.
102
	metricsServer *http.Server
103
}
104

105
// New creates a new manager server.
106
func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
107
	s := &Server{config: cfg}
108

109
	// Initialize database.
110
	db, err := database.New(cfg)
111
	if err != nil {
112
		return nil, err
113
	}
114

115
	// Initialize enforcer.
116
	enforcer, err := rbac.NewEnforcer(db.DB)
117
	if err != nil {
118
		return nil, err
119
	}
120

121
	// Initialize cache.
122
	cache, err := cache.New(cfg)
123
	if err != nil {
124
		return nil, err
125
	}
126

127
	// Initialize searcher.
128
	searcher := searcher.New(d.PluginDir())
129

130
	// Initialize job.
131
	job, err := job.New(cfg, db.DB)
132
	if err != nil {
133
		return nil, err
134
	}
135
	s.job = job
136

137
	// Initialize object storage.
138
	var objectStorage objectstorage.ObjectStorage
139
	if cfg.ObjectStorage.Enable {
140
		objectStorage, err = objectstorage.New(
141
			cfg.ObjectStorage.Name,
142
			cfg.ObjectStorage.Region,
143
			cfg.ObjectStorage.Endpoint,
144
			cfg.ObjectStorage.AccessKey,
145
			cfg.ObjectStorage.SecretKey,
146
			objectstorage.WithS3ForcePathStyle(cfg.ObjectStorage.S3ForcePathStyle),
147
		)
148
		if err != nil {
149
			return nil, err
150
		}
151
	}
152

153
	// Initialize REST server.
154
	restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
155
	router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, EmbedFolder(assets, assetsTargetPath))
156
	if err != nil {
157
		return nil, err
158
	}
159
	s.restServer = &http.Server{
160
		Addr:    cfg.Server.REST.Addr,
161
		Handler: router,
162
	}
163

164
	// Initialize roles and check roles.
165
	err = rbac.InitRBAC(enforcer, router, db.DB)
166
	if err != nil {
167
		return nil, err
168
	}
169

170
	// Initialize signing certificate and tls credentials of grpc server.
171
	var options []rpcserver.Option
172
	if cfg.Security.AutoIssueCert {
173
		cert, err := tls.X509KeyPair([]byte(cfg.Security.CACert), []byte(cfg.Security.CAKey))
174
		if err != nil {
175
			return nil, err
176
		}
177

178
		certifyClient := &certify.Certify{
179
			CommonName: types.ManagerName,
180
			Issuer: issuer.NewDragonflyManagerIssuer(
181
				&cert,
182
				issuer.WithManagerValidityPeriod(cfg.Security.CertSpec.ValidityPeriod),
183
			),
184
			RenewBefore: time.Hour,
185
			CertConfig: &certify.CertConfig{
186
				SubjectAlternativeNames:   cfg.Security.CertSpec.DNSNames,
187
				IPSubjectAlternativeNames: append(cfg.Security.CertSpec.IPAddresses, cfg.Server.GRPC.AdvertiseIP),
188
			},
189
			IssueTimeout: 0,
190
			Logger:       zapadapter.New(logger.CoreLogger.Desugar()),
191
			Cache: pkgcache.NewCertifyMutliCache(
192
				certify.NewMemCache(),
193
				certify.DirCache(path.Join(d.CacheDir(), pkgcache.CertifyCacheDirName, types.ManagerName))),
194
		}
195

196
		// Issue a certificate to reduce first time delay.
197
		if _, err := certifyClient.GetCertificate(&tls.ClientHelloInfo{
198
			ServerName: cfg.Server.GRPC.AdvertiseIP.String(),
199
		}); err != nil {
200
			logger.Errorf("issue certificate error: %s", err.Error())
201
			return nil, err
202
		}
203

204
		// Manager GRPC server's tls varify must be false. If ClientCAs are required for client verification,
205
		// the client cannot call the IssueCertificate api.
206
		transportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, false, []byte(cfg.Security.CACert), certifyClient)
207
		if err != nil {
208
			return nil, err
209
		}
210

211
		options = append(
212
			options,
213
			// Set ca certificate for issuing certificate.
214
			rpcserver.WithSelfSignedCert(&cert),
215
			// Set tls credentials for grpc server.
216
			rpcserver.WithGRPCServerOptions([]grpc.ServerOption{grpc.Creds(transportCredentials)}),
217
		)
218
	}
219

220
	// Initialize GRPC server.
221
	_, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...)
222
	if err != nil {
223
		return nil, err
224
	}
225

226
	s.grpcServer = grpcServer
227

228
	// Initialize prometheus.
229
	if cfg.Metrics.Enable {
230
		s.metricsServer = metrics.New(&cfg.Metrics, grpcServer)
231
	}
232

233
	return s, nil
234
}
235

236
// Serve starts the manager server.
237
func (s *Server) Serve() error {
238
	// Started REST server.
239
	go func() {
240
		logger.Infof("started rest server at %s", s.restServer.Addr)
241
		if s.config.Server.REST.TLS != nil {
242
			if err := s.restServer.ListenAndServeTLS(s.config.Server.REST.TLS.Cert, s.config.Server.REST.TLS.Key); err != nil {
243
				if err == http.ErrServerClosed {
244
					return
245
				}
246
				logger.Fatalf("rest server closed unexpect: %v", err)
247
			}
248
		} else {
249
			if err := s.restServer.ListenAndServe(); err != nil {
250
				if err == http.ErrServerClosed {
251
					return
252
				}
253
				logger.Fatalf("rest server closed unexpect: %v", err)
254
			}
255
		}
256
	}()
257

258
	// Started metrics server.
259
	if s.metricsServer != nil {
260
		go func() {
261
			logger.Infof("started metrics server at %s", s.metricsServer.Addr)
262
			if err := s.metricsServer.ListenAndServe(); err != nil {
263
				if err == http.ErrServerClosed {
264
					return
265
				}
266
				logger.Fatalf("metrics server closed unexpect: %v", err)
267
			}
268
		}()
269
	}
270

271
	// Started job server.
272
	go func() {
273
		logger.Info("started job server")
274
		s.job.Serve()
275
	}()
276

277
	// Generate GRPC listener.
278
	lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.ListenIP.String(), s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End)
279
	if err != nil {
280
		logger.Fatalf("net listener failed to start: %v", err)
281
	}
282
	defer lis.Close()
283

284
	// Started GRPC server.
285
	logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String())
286
	if err := s.grpcServer.Serve(lis); err != nil {
287
		logger.Errorf("stoped grpc server: %+v", err)
288
		return err
289
	}
290

291
	return nil
292
}
293

294
// Stop stops the manager server.
295
func (s *Server) Stop() {
296
	// Stop REST server.
297
	if err := s.restServer.Shutdown(context.Background()); err != nil {
298
		logger.Errorf("rest server failed to stop: %+v", err)
299
	} else {
300
		logger.Info("rest server closed under request")
301
	}
302

303
	// Stop metrics server.
304
	if s.metricsServer != nil {
305
		if err := s.metricsServer.Shutdown(context.Background()); err != nil {
306
			logger.Errorf("metrics server failed to stop: %+v", err)
307
		} else {
308
			logger.Info("metrics server closed under request")
309
		}
310
	}
311

312
	// Stop job server.
313
	s.job.Stop()
314

315
	// Stop GRPC server.
316
	stopped := make(chan struct{})
317
	go func() {
318
		s.grpcServer.GracefulStop()
319
		logger.Info("grpc server closed under request")
320
		close(stopped)
321
	}()
322

323
	t := time.NewTimer(gracefulStopTimeout)
324
	select {
325
	case <-t.C:
326
		s.grpcServer.Stop()
327
	case <-stopped:
328
		t.Stop()
329
	}
330
}
331

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.