kubelatte-ce
Форк от sbertech/kubelatte-ce
318 строк · 10.1 Кб
1package webhook2
3import (4"context"5"crypto/tls"6"crypto/x509"7"encoding/json"8"fmt"9"gitverse.ru/synapse/kubelatte/pkg/modules"10"gitverse.ru/synapse/kubelatte/pkg/webhook/router"11"io"12"net/http"13"os"14"strings"15"sync"16
17"gitverse.ru/synapse/kubelatte/pkg/api/common"18"gitverse.ru/synapse/kubelatte/pkg/observability/logger"19admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"20
21"github.com/pkg/errors"22"gitverse.ru/synapse/kubelatte/pkg/util"23"gitverse.ru/synapse/kubelatte/pkg/webhook/config"24"k8s.io/api/admission/v1beta1"25corev1 "k8s.io/api/core/v1"26"k8s.io/apimachinery/pkg/runtime"27"k8s.io/apimachinery/pkg/runtime/serializer"28)
29
30var (31runtimeScheme = runtime.NewScheme()32codecs = serializer.NewCodecFactory(runtimeScheme)33deserializer = codecs.UniversalDeserializer()34)
35
36type serverState int37
38const (39ServerReady serverState = iota40ServerConfigsError
41ServerCertificatesError
42ServerPending
43ServerNotReady
44)
45
46var ServerStateText = map[serverState]string{47ServerReady: "OK",48ServerConfigsError: "trigger or mutating config problem",49ServerCertificatesError: "server certificates problem",50ServerPending: "server busy",51ServerNotReady: "server is not ready",52}
53
54func init() {55_ = admissionregistrationv1beta1.AddToScheme(runtimeScheme)56_ = corev1.AddToScheme(runtimeScheme)57}
58
59type Server struct {60act modules.ActorsInjector61tlsServer *http.Server62httpServer *http.Server63config *config.WebhookConfig64certificateReloader util.CertificateReloader65state serverState
66mux *sync.Mutex67}
68
69func NewWebhookServer(cfg *config.WebhookConfig, certificateReloader util.CertificateReloader, mux *sync.Mutex, actors modules.ActorsInjector) *Server {70srv := &Server{71mux: mux,72config: cfg,73act: actors,74certificateReloader: certificateReloader,75}76return srv77}
78
79func getIncomingRequestBody(ctx context.Context, w http.ResponseWriter, r *http.Request, api string) ([]byte, error) {80log := logger.FromContext(ctx)81contentType := r.Header.Get("Content-Type")82if contentType != "application/json" {83log.Errorf("api=%s, message=invalid Content-Type expect `application/json`, contentType=%s ", api, contentType)84http.Error(w, "invalid Content-Type expect `application/json`", http.StatusUnsupportedMediaType)85return nil, errors.New(fmt.Sprintf("invalid Content-Type %s", contentType))86}87
88var body []byte89if r.Body != nil {90if data, err := io.ReadAll(r.Body); err == nil {91body = data92}93}94
95if len(body) == 0 {96log.Errorf("api=%s, message=empty body received, contentType=%s, headers=%s", api, contentType, r.Header)97http.Error(w, "empty body", http.StatusBadRequest)98return nil, errors.New("empty body received")99}100
101prettyS := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(string(body), "\"", "'"), " ", ""), "\n", " ")102log.Infof("api=%s, message=incoming request, body=%s", api, prettyS)103return body, nil104}
105
106func createAdmissionResponse(ctx context.Context, w http.ResponseWriter, ar *v1beta1.AdmissionReview, admissionResponse *v1beta1.AdmissionResponse) {107log := logger.FromContext(ctx)108admissionReview := v1beta1.AdmissionReview{}109
110if admissionResponse != nil {111admissionReview.Response = admissionResponse112if ar.Request != nil {113admissionReview.Response.UID = ar.Request.UID114}115}116
117resp, err := json.Marshal(admissionReview)118if err != nil {119log.Errorf("reason=json.Marshal, message=could not encode response, err=%s", err.Error())120http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)121return122}123
124w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")125if _, err := w.Write(resp); err != nil {126log.Errorf("reason=w.Write, message=could not write response, err=%s", err.Error())127http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)128return129}130log.Info(string(resp))131}
132
133func parseRequest(ctx context.Context, request *v1beta1.AdmissionRequest, body []byte) (*common.ARFields, map[string]interface{}, error) {134log := logger.FromContext(ctx)135
136var obj map[string]interface{}137if request.Operation != v1beta1.Delete && request.Object.Raw != nil {138err := json.Unmarshal(request.Object.Raw, &obj)139if err != nil {140return nil, nil, err141}142}143
144var oldObj map[string]interface{}145if request.Operation != v1beta1.Create && request.OldObject.Raw != nil {146err := json.Unmarshal(request.OldObject.Raw, &oldObj)147if err != nil {148return nil, nil, err149}150}151var original map[string]interface{}152err := json.Unmarshal(body, &original)153if err != nil {154return nil, nil, err155}156
157origlRequest, ok := original["request"]158if !ok {159log.Debug("there is no \"request\" field in body")160}161
162originalRequest := origlRequest.(map[string]interface{})163
164return &common.ARFields{165Kind: request.Kind,166Namespace: request.Namespace,167UserInfo: common.ARUserInfo{Username: request.UserInfo.Username},168Operation: request.Operation,169Object: obj,170OldObject: oldObj,171}, originalRequest, nil172}
173
174func (whsvr *Server) Set(s serverState) {175log := logger.FromContext(context.Background())176log.Infof("Change server status to: %s ", ServerStateText[s])177whsvr.state = s178}
179
180// Start starts webhook server
181func (whsvr *Server) Start() (doneListeningTLSChannel, doneListeningHTTPChannel chan bool, err error) {182log := logger.FromContext(context.Background())183wconfig := whsvr.config184
185var tlsConfig *tls.Config186var startTLS bool187
188if wconfig.TLSPort != 0 {189crt := wconfig.CertFilePath190key := wconfig.KeyFilePath191if crt == "" {192return nil, nil, errors.New("api=Start, reason=wconfig.CertFilePath, err=since the TLS port is set, you need to specify the file containing the x509 Certificate for HTTPS --cert-file-path or SERVER_KEY env")193}194if key == "" {195return nil, nil, errors.New("api=Start, reason=wconfig.KeyFilePath, err=since the TLS port is set, you need to specify the file containing the x509 private key --key-file-path or SERVER_CERT env")196}197if _, err := os.OpenFile(crt, os.O_RDONLY, 0644); errors.Is(err, os.ErrNotExist) {198return nil, nil, errors.Errorf("api=Start, reason=wconfig.CertFilePath, err=certificate file %s dont exist", crt)199}200if _, err := os.OpenFile(key, os.O_RDONLY, 0600); errors.Is(err, os.ErrNotExist) {201return nil, nil, errors.Errorf("api=Start, reason=wconfig.KeyFilePath, err=key file %s dont exist", key)202}203
204startTLS = true205}206
207isTLS := whsvr.certificateReloader != nil208
209if startTLS {210if isTLS {211err := whsvr.certificateReloader.Start()212if err != nil {213log.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())214return nil, nil, errors.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())215}216
217tlsConfig = &tls.Config{218GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {219return whsvr.certificateReloader.GetCertificate()220},221}222if !wconfig.AllowDeprecatedTLSConfig {223tlsConfig.MinVersion = tls.VersionTLS12224tlsConfig.CipherSuites = []uint16{225tls.TLS_AES_256_GCM_SHA384,226tls.TLS_AES_128_GCM_SHA256,227tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,228tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,229tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,230tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,231}232}233
234if wconfig.CaFilePath != "" {235caCert, err := os.ReadFile(wconfig.CaFilePath)236if err != nil {237log.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())238return nil, nil, errors.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())239}240caCertPool := x509.NewCertPool()241caCertPool.AppendCertsFromPEM(caCert)242tlsConfig.RootCAs = caCertPool243}244}245}246
247handlers := router.HandlerMap{248router.Mutate: whsvr.MutateHandler,249router.Validate: whsvr.ValidateHandler,250router.Immutable: whsvr.ImmutableHandler,251router.Healthz: whsvr.Healthz,252router.Liveness: whsvr.Liveness,253router.Readyness: whsvr.Readyness,254}255
256whsvr.act.SetHandlers(handlers)257router := whsvr.act.RegisterHandlers(fmt.Sprintf(":%v", wconfig.HTTPPort))258
259if startTLS {260// We create two servers: one that serves https requests, and another that serves http requests only.261whsvr.tlsServer = &http.Server{262Addr: fmt.Sprintf(":%v", wconfig.TLSPort),263TLSConfig: tlsConfig,264}265whsvr.tlsServer.Handler = router266
267// Channel to indicate when the server stopped listening for some reason268doneListeningTLSChannel = make(chan bool)269
270// start webhook server in new routine271go func() {272if isTLS {273if err := whsvr.tlsServer.ListenAndServeTLS("", ""); err != nil {274log.Errorf("api=Start, message=failed to listen and serve webhook server: %s", err.Error())275}276doneListeningTLSChannel <- true277}278}()279
280}281
282whsvr.httpServer = &http.Server{283Addr: fmt.Sprintf(":%v", wconfig.HTTPPort),284}285
286whsvr.httpServer.Handler = router287
288// Channel to indicate when the server stopped listening for some reason289doneListeningHTTPChannel = make(chan bool)290
291log.Info("api=Start, message=start http webhook server")292
293go func() {294if err := whsvr.httpServer.ListenAndServe(); err != nil {295log.Errorf("api=Start, reason='failed to listen and serve webhook HTTP server: %s'", err.Error())296}297doneListeningHTTPChannel <- true298}()299
300return doneListeningTLSChannel, doneListeningHTTPChannel, nil301}
302
303// Stop stops webhook server
304func (whsvr *Server) Stop() {305log := logger.FromContext(context.Background())306log.Info("api=Stop, reason='shutting down webhook server gracefully...'")307if whsvr.tlsServer != nil {308_ = whsvr.tlsServer.Shutdown(context.Background())309}310if whsvr.httpServer != nil {311_ = whsvr.httpServer.Shutdown(context.Background())312}313if whsvr.certificateReloader != nil {314if whsvr.certificateReloader.IsRunning() {315whsvr.certificateReloader.Stop()316}317}318}
319