kubelatte-ce
Форк от sbertech/kubelatte-ce
318 строк · 10.1 Кб
1package webhook
2
3import (
4"context"
5"crypto/tls"
6"crypto/x509"
7"encoding/json"
8"fmt"
9"gitverse.ru/synapse/kubelatte/pkg/modules"
10"gitverse.ru/synapse/kubelatte/pkg/webhook/router"
11"io"
12"net/http"
13"os"
14"strings"
15"sync"
16
17"gitverse.ru/synapse/kubelatte/pkg/api/common"
18"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
19admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
20
21"github.com/pkg/errors"
22"gitverse.ru/synapse/kubelatte/pkg/util"
23"gitverse.ru/synapse/kubelatte/pkg/webhook/config"
24"k8s.io/api/admission/v1beta1"
25corev1 "k8s.io/api/core/v1"
26"k8s.io/apimachinery/pkg/runtime"
27"k8s.io/apimachinery/pkg/runtime/serializer"
28)
29
30var (
31runtimeScheme = runtime.NewScheme()
32codecs = serializer.NewCodecFactory(runtimeScheme)
33deserializer = codecs.UniversalDeserializer()
34)
35
36type serverState int
37
38const (
39ServerReady serverState = iota
40ServerConfigsError
41ServerCertificatesError
42ServerPending
43ServerNotReady
44)
45
46var ServerStateText = map[serverState]string{
47ServerReady: "OK",
48ServerConfigsError: "trigger or mutating config problem",
49ServerCertificatesError: "server certificates problem",
50ServerPending: "server busy",
51ServerNotReady: "server is not ready",
52}
53
54func init() {
55_ = admissionregistrationv1beta1.AddToScheme(runtimeScheme)
56_ = corev1.AddToScheme(runtimeScheme)
57}
58
59type Server struct {
60act modules.ActorsInjector
61tlsServer *http.Server
62httpServer *http.Server
63config *config.WebhookConfig
64certificateReloader util.CertificateReloader
65state serverState
66mux *sync.Mutex
67}
68
69func NewWebhookServer(cfg *config.WebhookConfig, certificateReloader util.CertificateReloader, mux *sync.Mutex, actors modules.ActorsInjector) *Server {
70srv := &Server{
71mux: mux,
72config: cfg,
73act: actors,
74certificateReloader: certificateReloader,
75}
76return srv
77}
78
79func getIncomingRequestBody(ctx context.Context, w http.ResponseWriter, r *http.Request, api string) ([]byte, error) {
80log := logger.FromContext(ctx)
81contentType := r.Header.Get("Content-Type")
82if contentType != "application/json" {
83log.Errorf("api=%s, message=invalid Content-Type expect `application/json`, contentType=%s ", api, contentType)
84http.Error(w, "invalid Content-Type expect `application/json`", http.StatusUnsupportedMediaType)
85return nil, errors.New(fmt.Sprintf("invalid Content-Type %s", contentType))
86}
87
88var body []byte
89if r.Body != nil {
90if data, err := io.ReadAll(r.Body); err == nil {
91body = data
92}
93}
94
95if len(body) == 0 {
96log.Errorf("api=%s, message=empty body received, contentType=%s, headers=%s", api, contentType, r.Header)
97http.Error(w, "empty body", http.StatusBadRequest)
98return nil, errors.New("empty body received")
99}
100
101prettyS := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(string(body), "\"", "'"), " ", ""), "\n", " ")
102log.Infof("api=%s, message=incoming request, body=%s", api, prettyS)
103return body, nil
104}
105
106func createAdmissionResponse(ctx context.Context, w http.ResponseWriter, ar *v1beta1.AdmissionReview, admissionResponse *v1beta1.AdmissionResponse) {
107log := logger.FromContext(ctx)
108admissionReview := v1beta1.AdmissionReview{}
109
110if admissionResponse != nil {
111admissionReview.Response = admissionResponse
112if ar.Request != nil {
113admissionReview.Response.UID = ar.Request.UID
114}
115}
116
117resp, err := json.Marshal(admissionReview)
118if err != nil {
119log.Errorf("reason=json.Marshal, message=could not encode response, err=%s", err.Error())
120http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
121return
122}
123
124w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
125if _, err := w.Write(resp); err != nil {
126log.Errorf("reason=w.Write, message=could not write response, err=%s", err.Error())
127http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
128return
129}
130log.Info(string(resp))
131}
132
133func parseRequest(ctx context.Context, request *v1beta1.AdmissionRequest, body []byte) (*common.ARFields, map[string]interface{}, error) {
134log := logger.FromContext(ctx)
135
136var obj map[string]interface{}
137if request.Operation != v1beta1.Delete && request.Object.Raw != nil {
138err := json.Unmarshal(request.Object.Raw, &obj)
139if err != nil {
140return nil, nil, err
141}
142}
143
144var oldObj map[string]interface{}
145if request.Operation != v1beta1.Create && request.OldObject.Raw != nil {
146err := json.Unmarshal(request.OldObject.Raw, &oldObj)
147if err != nil {
148return nil, nil, err
149}
150}
151var original map[string]interface{}
152err := json.Unmarshal(body, &original)
153if err != nil {
154return nil, nil, err
155}
156
157origlRequest, ok := original["request"]
158if !ok {
159log.Debug("there is no \"request\" field in body")
160}
161
162originalRequest := origlRequest.(map[string]interface{})
163
164return &common.ARFields{
165Kind: request.Kind,
166Namespace: request.Namespace,
167UserInfo: common.ARUserInfo{Username: request.UserInfo.Username},
168Operation: request.Operation,
169Object: obj,
170OldObject: oldObj,
171}, originalRequest, nil
172}
173
174func (whsvr *Server) Set(s serverState) {
175log := logger.FromContext(context.Background())
176log.Infof("Change server status to: %s ", ServerStateText[s])
177whsvr.state = s
178}
179
180// Start starts webhook server
181func (whsvr *Server) Start() (doneListeningTLSChannel, doneListeningHTTPChannel chan bool, err error) {
182log := logger.FromContext(context.Background())
183wconfig := whsvr.config
184
185var tlsConfig *tls.Config
186var startTLS bool
187
188if wconfig.TLSPort != 0 {
189crt := wconfig.CertFilePath
190key := wconfig.KeyFilePath
191if crt == "" {
192return nil, nil, errors.New("api=Start, reason=wconfig.CertFilePath, err=since the TLS port is set, you need to specify the file containing the x509 Certificate for HTTPS --cert-file-path or SERVER_KEY env")
193}
194if key == "" {
195return nil, nil, errors.New("api=Start, reason=wconfig.KeyFilePath, err=since the TLS port is set, you need to specify the file containing the x509 private key --key-file-path or SERVER_CERT env")
196}
197if _, err := os.OpenFile(crt, os.O_RDONLY, 0644); errors.Is(err, os.ErrNotExist) {
198return nil, nil, errors.Errorf("api=Start, reason=wconfig.CertFilePath, err=certificate file %s dont exist", crt)
199}
200if _, err := os.OpenFile(key, os.O_RDONLY, 0600); errors.Is(err, os.ErrNotExist) {
201return nil, nil, errors.Errorf("api=Start, reason=wconfig.KeyFilePath, err=key file %s dont exist", key)
202}
203
204startTLS = true
205}
206
207isTLS := whsvr.certificateReloader != nil
208
209if startTLS {
210if isTLS {
211err := whsvr.certificateReloader.Start()
212if err != nil {
213log.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())
214return nil, nil, errors.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())
215}
216
217tlsConfig = &tls.Config{
218GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
219return whsvr.certificateReloader.GetCertificate()
220},
221}
222if !wconfig.AllowDeprecatedTLSConfig {
223tlsConfig.MinVersion = tls.VersionTLS12
224tlsConfig.CipherSuites = []uint16{
225tls.TLS_AES_256_GCM_SHA384,
226tls.TLS_AES_128_GCM_SHA256,
227tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
228tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
229tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
230tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
231}
232}
233
234if wconfig.CaFilePath != "" {
235caCert, err := os.ReadFile(wconfig.CaFilePath)
236if err != nil {
237log.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())
238return nil, nil, errors.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())
239}
240caCertPool := x509.NewCertPool()
241caCertPool.AppendCertsFromPEM(caCert)
242tlsConfig.RootCAs = caCertPool
243}
244}
245}
246
247handlers := router.HandlerMap{
248router.Mutate: whsvr.MutateHandler,
249router.Validate: whsvr.ValidateHandler,
250router.Immutable: whsvr.ImmutableHandler,
251router.Healthz: whsvr.Healthz,
252router.Liveness: whsvr.Liveness,
253router.Readyness: whsvr.Readyness,
254}
255
256whsvr.act.SetHandlers(handlers)
257router := whsvr.act.RegisterHandlers(fmt.Sprintf(":%v", wconfig.HTTPPort))
258
259if startTLS {
260// We create two servers: one that serves https requests, and another that serves http requests only.
261whsvr.tlsServer = &http.Server{
262Addr: fmt.Sprintf(":%v", wconfig.TLSPort),
263TLSConfig: tlsConfig,
264}
265whsvr.tlsServer.Handler = router
266
267// Channel to indicate when the server stopped listening for some reason
268doneListeningTLSChannel = make(chan bool)
269
270// start webhook server in new routine
271go func() {
272if isTLS {
273if err := whsvr.tlsServer.ListenAndServeTLS("", ""); err != nil {
274log.Errorf("api=Start, message=failed to listen and serve webhook server: %s", err.Error())
275}
276doneListeningTLSChannel <- true
277}
278}()
279
280}
281
282whsvr.httpServer = &http.Server{
283Addr: fmt.Sprintf(":%v", wconfig.HTTPPort),
284}
285
286whsvr.httpServer.Handler = router
287
288// Channel to indicate when the server stopped listening for some reason
289doneListeningHTTPChannel = make(chan bool)
290
291log.Info("api=Start, message=start http webhook server")
292
293go func() {
294if err := whsvr.httpServer.ListenAndServe(); err != nil {
295log.Errorf("api=Start, reason='failed to listen and serve webhook HTTP server: %s'", err.Error())
296}
297doneListeningHTTPChannel <- true
298}()
299
300return doneListeningTLSChannel, doneListeningHTTPChannel, nil
301}
302
303// Stop stops webhook server
304func (whsvr *Server) Stop() {
305log := logger.FromContext(context.Background())
306log.Info("api=Stop, reason='shutting down webhook server gracefully...'")
307if whsvr.tlsServer != nil {
308_ = whsvr.tlsServer.Shutdown(context.Background())
309}
310if whsvr.httpServer != nil {
311_ = whsvr.httpServer.Shutdown(context.Background())
312}
313if whsvr.certificateReloader != nil {
314if whsvr.certificateReloader.IsRunning() {
315whsvr.certificateReloader.Stop()
316}
317}
318}
319