kuma

Форк
0
/
server.go 
147 строк · 4.1 Кб
1
package server
2

3
import (
4
	"context"
5
	"crypto/tls"
6
	"fmt"
7
	"net/http"
8
	"strings"
9
	"time"
10

11
	"github.com/bakito/go-log-logr-adapter/adapter"
12
	http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus"
13
	"github.com/slok/go-http-metrics/middleware"
14
	"github.com/slok/go-http-metrics/middleware/std"
15
	"google.golang.org/grpc"
16
	"google.golang.org/grpc/keepalive"
17

18
	dp_server "github.com/kumahq/kuma/pkg/config/dp-server"
19
	config_types "github.com/kumahq/kuma/pkg/config/types"
20
	"github.com/kumahq/kuma/pkg/core"
21
	"github.com/kumahq/kuma/pkg/core/runtime/component"
22
	"github.com/kumahq/kuma/pkg/metrics"
23
)
24

25
var log = core.Log.WithName("dp-server")
26

27
const (
28
	grpcMaxConcurrentStreams = 1000000
29
	grpcKeepAliveTime        = 15 * time.Second
30
)
31

32
type Filter func(writer http.ResponseWriter, request *http.Request) bool
33

34
type DpServer struct {
35
	config         dp_server.DpServerConfig
36
	httpMux        *http.ServeMux
37
	grpcServer     *grpc.Server
38
	filter         Filter
39
	promMiddleware middleware.Middleware
40
}
41

42
var _ component.Component = &DpServer{}
43

44
func NewDpServer(config dp_server.DpServerConfig, metrics metrics.Metrics, filter Filter) *DpServer {
45
	grpcOptions := []grpc.ServerOption{
46
		grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
47
		grpc.KeepaliveParams(keepalive.ServerParameters{
48
			Time:    grpcKeepAliveTime,
49
			Timeout: grpcKeepAliveTime,
50
		}),
51
		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
52
			MinTime:             grpcKeepAliveTime,
53
			PermitWithoutStream: true,
54
		}),
55
	}
56
	grpcOptions = append(grpcOptions, metrics.GRPCServerInterceptors()...)
57
	grpcServer := grpc.NewServer(grpcOptions...)
58

59
	promMiddleware := middleware.New(middleware.Config{
60
		Recorder: http_prometheus.NewRecorder(http_prometheus.Config{
61
			Registry: metrics,
62
			Prefix:   "dp_server",
63
		}),
64
	})
65

66
	return &DpServer{
67
		config:         config,
68
		httpMux:        http.NewServeMux(),
69
		grpcServer:     grpcServer,
70
		filter:         filter,
71
		promMiddleware: promMiddleware,
72
	}
73
}
74

75
func (d *DpServer) Start(stop <-chan struct{}) error {
76
	var err error
77
	tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} // To make gosec pass this is always set after
78
	if tlsConfig.MinVersion, err = config_types.TLSVersion(d.config.TlsMinVersion); err != nil {
79
		return err
80
	}
81
	if tlsConfig.MaxVersion, err = config_types.TLSVersion(d.config.TlsMaxVersion); err != nil {
82
		return err
83
	}
84
	if tlsConfig.CipherSuites, err = config_types.TLSCiphers(d.config.TlsCipherSuites); err != nil {
85
		return err
86
	}
87
	server := &http.Server{
88
		ReadHeaderTimeout: d.config.ReadHeaderTimeout.Duration,
89
		Addr:              fmt.Sprintf(":%d", d.config.Port),
90
		Handler:           http.HandlerFunc(d.handle),
91
		TLSConfig:         tlsConfig,
92
		ErrorLog:          adapter.ToStd(log),
93
	}
94

95
	errChan := make(chan error)
96

97
	go func() {
98
		defer close(errChan)
99
		if err := server.ListenAndServeTLS(d.config.TlsCertFile, d.config.TlsKeyFile); err != nil {
100
			if err != http.ErrServerClosed {
101
				log.Error(err, "terminated with an error")
102
				errChan <- err
103
				return
104
			}
105
		}
106
		log.Info("terminated normally")
107
	}()
108
	log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
109

110
	select {
111
	case <-stop:
112
		log.Info("stopping")
113
		return server.Shutdown(context.Background())
114
	case err := <-errChan:
115
		return err
116
	}
117
}
118

119
func (d *DpServer) NeedLeaderElection() bool {
120
	return false
121
}
122

123
func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) {
124
	if !d.filter(writer, request) {
125
		return
126
	}
127
	// add filter function that will be in runtime, and we will implement it in kong-mesh
128
	if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") {
129
		d.grpcServer.ServeHTTP(writer, request)
130
	} else {
131
		// we only want to measure HTTP not GRPC requests because they can mess up metrics
132
		// for example ADS bi-directional stream counts as one really long request
133
		std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request)
134
	}
135
}
136

137
func (d *DpServer) HTTPMux() *http.ServeMux {
138
	return d.httpMux
139
}
140

141
func (d *DpServer) GrpcServer() *grpc.Server {
142
	return d.grpcServer
143
}
144

145
func (d *DpServer) SetFilter(filter Filter) {
146
	d.filter = filter
147
}
148

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.