kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
/
webhook.go 
318 строк · 10.1 Кб
1
package webhook
2

3
import (
4
	"context"
5
	"crypto/tls"
6
	"crypto/x509"
7
	"encoding/json"
8
	"fmt"
9
	"gitverse.ru/synapse/kubelatte/pkg/modules"
10
	"gitverse.ru/synapse/kubelatte/pkg/webhook/router"
11
	"io"
12
	"net/http"
13
	"os"
14
	"strings"
15
	"sync"
16

17
	"gitverse.ru/synapse/kubelatte/pkg/api/common"
18
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
19
	admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
20

21
	"github.com/pkg/errors"
22
	"gitverse.ru/synapse/kubelatte/pkg/util"
23
	"gitverse.ru/synapse/kubelatte/pkg/webhook/config"
24
	"k8s.io/api/admission/v1beta1"
25
	corev1 "k8s.io/api/core/v1"
26
	"k8s.io/apimachinery/pkg/runtime"
27
	"k8s.io/apimachinery/pkg/runtime/serializer"
28
)
29

30
var (
31
	runtimeScheme = runtime.NewScheme()
32
	codecs        = serializer.NewCodecFactory(runtimeScheme)
33
	deserializer  = codecs.UniversalDeserializer()
34
)
35

36
type serverState int
37

38
const (
39
	ServerReady serverState = iota
40
	ServerConfigsError
41
	ServerCertificatesError
42
	ServerPending
43
	ServerNotReady
44
)
45

46
var ServerStateText = map[serverState]string{
47
	ServerReady:             "OK",
48
	ServerConfigsError:      "trigger or mutating config problem",
49
	ServerCertificatesError: "server certificates problem",
50
	ServerPending:           "server busy",
51
	ServerNotReady:          "server is not ready",
52
}
53

54
func init() {
55
	_ = admissionregistrationv1beta1.AddToScheme(runtimeScheme)
56
	_ = corev1.AddToScheme(runtimeScheme)
57
}
58

59
type Server struct {
60
	act                 modules.ActorsInjector
61
	tlsServer           *http.Server
62
	httpServer          *http.Server
63
	config              *config.WebhookConfig
64
	certificateReloader util.CertificateReloader
65
	state               serverState
66
	mux                 *sync.Mutex
67
}
68

69
func NewWebhookServer(cfg *config.WebhookConfig, certificateReloader util.CertificateReloader, mux *sync.Mutex, actors modules.ActorsInjector) *Server {
70
	srv := &Server{
71
		mux:                 mux,
72
		config:              cfg,
73
		act:                 actors,
74
		certificateReloader: certificateReloader,
75
	}
76
	return srv
77
}
78

79
func getIncomingRequestBody(ctx context.Context, w http.ResponseWriter, r *http.Request, api string) ([]byte, error) {
80
	log := logger.FromContext(ctx)
81
	contentType := r.Header.Get("Content-Type")
82
	if contentType != "application/json" {
83
		log.Errorf("api=%s, message=invalid Content-Type expect `application/json`, contentType=%s ", api, contentType)
84
		http.Error(w, "invalid Content-Type expect `application/json`", http.StatusUnsupportedMediaType)
85
		return nil, errors.New(fmt.Sprintf("invalid Content-Type %s", contentType))
86
	}
87

88
	var body []byte
89
	if r.Body != nil {
90
		if data, err := io.ReadAll(r.Body); err == nil {
91
			body = data
92
		}
93
	}
94

95
	if len(body) == 0 {
96
		log.Errorf("api=%s, message=empty body received, contentType=%s, headers=%s", api, contentType, r.Header)
97
		http.Error(w, "empty body", http.StatusBadRequest)
98
		return nil, errors.New("empty body received")
99
	}
100

101
	prettyS := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(string(body), "\"", "'"), " ", ""), "\n", " ")
102
	log.Infof("api=%s, message=incoming request, body=%s", api, prettyS)
103
	return body, nil
104
}
105

106
func createAdmissionResponse(ctx context.Context, w http.ResponseWriter, ar *v1beta1.AdmissionReview, admissionResponse *v1beta1.AdmissionResponse) {
107
	log := logger.FromContext(ctx)
108
	admissionReview := v1beta1.AdmissionReview{}
109

110
	if admissionResponse != nil {
111
		admissionReview.Response = admissionResponse
112
		if ar.Request != nil {
113
			admissionReview.Response.UID = ar.Request.UID
114
		}
115
	}
116

117
	resp, err := json.Marshal(admissionReview)
118
	if err != nil {
119
		log.Errorf("reason=json.Marshal, message=could not encode response, err=%s", err.Error())
120
		http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
121
		return
122
	}
123

124
	w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
125
	if _, err := w.Write(resp); err != nil {
126
		log.Errorf("reason=w.Write, message=could not write response, err=%s", err.Error())
127
		http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
128
		return
129
	}
130
	log.Info(string(resp))
131
}
132

133
func parseRequest(ctx context.Context, request *v1beta1.AdmissionRequest, body []byte) (*common.ARFields, map[string]interface{}, error) {
134
	log := logger.FromContext(ctx)
135

136
	var obj map[string]interface{}
137
	if request.Operation != v1beta1.Delete && request.Object.Raw != nil {
138
		err := json.Unmarshal(request.Object.Raw, &obj)
139
		if err != nil {
140
			return nil, nil, err
141
		}
142
	}
143

144
	var oldObj map[string]interface{}
145
	if request.Operation != v1beta1.Create && request.OldObject.Raw != nil {
146
		err := json.Unmarshal(request.OldObject.Raw, &oldObj)
147
		if err != nil {
148
			return nil, nil, err
149
		}
150
	}
151
	var original map[string]interface{}
152
	err := json.Unmarshal(body, &original)
153
	if err != nil {
154
		return nil, nil, err
155
	}
156

157
	origlRequest, ok := original["request"]
158
	if !ok {
159
		log.Debug("there is no \"request\" field in body")
160
	}
161

162
	originalRequest := origlRequest.(map[string]interface{})
163

164
	return &common.ARFields{
165
		Kind:      request.Kind,
166
		Namespace: request.Namespace,
167
		UserInfo:  common.ARUserInfo{Username: request.UserInfo.Username},
168
		Operation: request.Operation,
169
		Object:    obj,
170
		OldObject: oldObj,
171
	}, originalRequest, nil
172
}
173

174
func (whsvr *Server) Set(s serverState) {
175
	log := logger.FromContext(context.Background())
176
	log.Infof("Change server status to: %s ", ServerStateText[s])
177
	whsvr.state = s
178
}
179

180
// Start starts webhook server
181
func (whsvr *Server) Start() (doneListeningTLSChannel, doneListeningHTTPChannel chan bool, err error) {
182
	log := logger.FromContext(context.Background())
183
	wconfig := whsvr.config
184

185
	var tlsConfig *tls.Config
186
	var startTLS bool
187

188
	if wconfig.TLSPort != 0 {
189
		crt := wconfig.CertFilePath
190
		key := wconfig.KeyFilePath
191
		if crt == "" {
192
			return nil, nil, errors.New("api=Start, reason=wconfig.CertFilePath, err=since the TLS port is set, you need to specify the file containing the x509 Certificate for HTTPS --cert-file-path or SERVER_KEY env")
193
		}
194
		if key == "" {
195
			return nil, nil, errors.New("api=Start, reason=wconfig.KeyFilePath, err=since the TLS port is set, you need to specify the file containing the x509 private key --key-file-path or SERVER_CERT env")
196
		}
197
		if _, err := os.OpenFile(crt, os.O_RDONLY, 0644); errors.Is(err, os.ErrNotExist) {
198
			return nil, nil, errors.Errorf("api=Start, reason=wconfig.CertFilePath, err=certificate file %s dont exist", crt)
199
		}
200
		if _, err := os.OpenFile(key, os.O_RDONLY, 0600); errors.Is(err, os.ErrNotExist) {
201
			return nil, nil, errors.Errorf("api=Start, reason=wconfig.KeyFilePath, err=key file %s dont exist", key)
202
		}
203

204
		startTLS = true
205
	}
206

207
	isTLS := whsvr.certificateReloader != nil
208

209
	if startTLS {
210
		if isTLS {
211
			err := whsvr.certificateReloader.Start()
212
			if err != nil {
213
				log.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())
214
				return nil, nil, errors.Errorf("api=Start, reason=certReloader.Start, certFilePath=%q, keyFilePath=%q, caFilePath=%q, err=%s", wconfig.CertFilePath, wconfig.KeyFilePath, wconfig.CaFilePath, err.Error())
215
			}
216

217
			tlsConfig = &tls.Config{
218
				GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
219
					return whsvr.certificateReloader.GetCertificate()
220
				},
221
			}
222
			if !wconfig.AllowDeprecatedTLSConfig {
223
				tlsConfig.MinVersion = tls.VersionTLS12
224
				tlsConfig.CipherSuites = []uint16{
225
					tls.TLS_AES_256_GCM_SHA384,
226
					tls.TLS_AES_128_GCM_SHA256,
227
					tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
228
					tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
229
					tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
230
					tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
231
				}
232
			}
233

234
			if wconfig.CaFilePath != "" {
235
				caCert, err := os.ReadFile(wconfig.CaFilePath)
236
				if err != nil {
237
					log.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())
238
					return nil, nil, errors.Errorf("api=Start, reason=ioutil.ReadFile, caFilePath=%q, err=%s", wconfig.CaFilePath, err.Error())
239
				}
240
				caCertPool := x509.NewCertPool()
241
				caCertPool.AppendCertsFromPEM(caCert)
242
				tlsConfig.RootCAs = caCertPool
243
			}
244
		}
245
	}
246

247
	handlers := router.HandlerMap{
248
		router.Mutate:    whsvr.MutateHandler,
249
		router.Validate:  whsvr.ValidateHandler,
250
		router.Immutable: whsvr.ImmutableHandler,
251
		router.Healthz:   whsvr.Healthz,
252
		router.Liveness:  whsvr.Liveness,
253
		router.Readyness: whsvr.Readyness,
254
	}
255

256
	whsvr.act.SetHandlers(handlers)
257
	router := whsvr.act.RegisterHandlers(fmt.Sprintf(":%v", wconfig.HTTPPort))
258

259
	if startTLS {
260
		// We create two servers: one that serves https requests, and another that serves http requests only.
261
		whsvr.tlsServer = &http.Server{
262
			Addr:      fmt.Sprintf(":%v", wconfig.TLSPort),
263
			TLSConfig: tlsConfig,
264
		}
265
		whsvr.tlsServer.Handler = router
266

267
		// Channel to indicate when the server stopped listening for some reason
268
		doneListeningTLSChannel = make(chan bool)
269

270
		// start webhook server in new routine
271
		go func() {
272
			if isTLS {
273
				if err := whsvr.tlsServer.ListenAndServeTLS("", ""); err != nil {
274
					log.Errorf("api=Start, message=failed to listen and serve webhook server: %s", err.Error())
275
				}
276
				doneListeningTLSChannel <- true
277
			}
278
		}()
279

280
	}
281

282
	whsvr.httpServer = &http.Server{
283
		Addr: fmt.Sprintf(":%v", wconfig.HTTPPort),
284
	}
285

286
	whsvr.httpServer.Handler = router
287

288
	// Channel to indicate when the server stopped listening for some reason
289
	doneListeningHTTPChannel = make(chan bool)
290

291
	log.Info("api=Start, message=start http webhook server")
292

293
	go func() {
294
		if err := whsvr.httpServer.ListenAndServe(); err != nil {
295
			log.Errorf("api=Start, reason='failed to listen and serve webhook HTTP server: %s'", err.Error())
296
		}
297
		doneListeningHTTPChannel <- true
298
	}()
299

300
	return doneListeningTLSChannel, doneListeningHTTPChannel, nil
301
}
302

303
// Stop stops webhook server
304
func (whsvr *Server) Stop() {
305
	log := logger.FromContext(context.Background())
306
	log.Info("api=Stop, reason='shutting down webhook server gracefully...'")
307
	if whsvr.tlsServer != nil {
308
		_ = whsvr.tlsServer.Shutdown(context.Background())
309
	}
310
	if whsvr.httpServer != nil {
311
		_ = whsvr.httpServer.Shutdown(context.Background())
312
	}
313
	if whsvr.certificateReloader != nil {
314
		if whsvr.certificateReloader.IsRunning() {
315
			whsvr.certificateReloader.Stop()
316
		}
317
	}
318
}
319

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.