11
"github.com/bakito/go-log-logr-adapter/adapter"
12
http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus"
13
"github.com/slok/go-http-metrics/middleware"
14
"github.com/slok/go-http-metrics/middleware/std"
15
"google.golang.org/grpc"
16
"google.golang.org/grpc/keepalive"
18
dp_server "github.com/kumahq/kuma/pkg/config/dp-server"
19
config_types "github.com/kumahq/kuma/pkg/config/types"
20
"github.com/kumahq/kuma/pkg/core"
21
"github.com/kumahq/kuma/pkg/core/runtime/component"
22
"github.com/kumahq/kuma/pkg/metrics"
25
var log = core.Log.WithName("dp-server")
28
grpcMaxConcurrentStreams = 1000000
29
grpcKeepAliveTime = 15 * time.Second
32
type Filter func(writer http.ResponseWriter, request *http.Request) bool
35
config dp_server.DpServerConfig
36
httpMux *http.ServeMux
37
grpcServer *grpc.Server
39
promMiddleware middleware.Middleware
42
var _ component.Component = &DpServer{}
44
func NewDpServer(config dp_server.DpServerConfig, metrics metrics.Metrics, filter Filter) *DpServer {
45
grpcOptions := []grpc.ServerOption{
46
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
47
grpc.KeepaliveParams(keepalive.ServerParameters{
48
Time: grpcKeepAliveTime,
49
Timeout: grpcKeepAliveTime,
51
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
52
MinTime: grpcKeepAliveTime,
53
PermitWithoutStream: true,
56
grpcOptions = append(grpcOptions, metrics.GRPCServerInterceptors()...)
57
grpcServer := grpc.NewServer(grpcOptions...)
59
promMiddleware := middleware.New(middleware.Config{
60
Recorder: http_prometheus.NewRecorder(http_prometheus.Config{
68
httpMux: http.NewServeMux(),
69
grpcServer: grpcServer,
71
promMiddleware: promMiddleware,
75
func (d *DpServer) Start(stop <-chan struct{}) error {
77
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
78
if tlsConfig.MinVersion, err = config_types.TLSVersion(d.config.TlsMinVersion); err != nil {
81
if tlsConfig.MaxVersion, err = config_types.TLSVersion(d.config.TlsMaxVersion); err != nil {
84
if tlsConfig.CipherSuites, err = config_types.TLSCiphers(d.config.TlsCipherSuites); err != nil {
87
server := &http.Server{
88
ReadHeaderTimeout: d.config.ReadHeaderTimeout.Duration,
89
Addr: fmt.Sprintf(":%d", d.config.Port),
90
Handler: http.HandlerFunc(d.handle),
92
ErrorLog: adapter.ToStd(log),
95
errChan := make(chan error)
99
if err := server.ListenAndServeTLS(d.config.TlsCertFile, d.config.TlsKeyFile); err != nil {
100
if err != http.ErrServerClosed {
101
log.Error(err, "terminated with an error")
106
log.Info("terminated normally")
108
log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
113
return server.Shutdown(context.Background())
114
case err := <-errChan:
119
func (d *DpServer) NeedLeaderElection() bool {
123
func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) {
124
if !d.filter(writer, request) {
128
if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") {
129
d.grpcServer.ServeHTTP(writer, request)
133
std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request)
137
func (d *DpServer) HTTPMux() *http.ServeMux {
141
func (d *DpServer) GrpcServer() *grpc.Server {
145
func (d *DpServer) SetFilter(filter Filter) {