kubelatte-ce
Форк от sbertech/kubelatte-ce
229 строк · 6.1 Кб
1package main
2
3import (
4"context"
5"crypto/tls"
6"crypto/x509"
7"flag"
8"fmt"
9wait "github.com/asaf-shitrit/go-wait"
10"github.com/spf13/afero"
11"gitverse.ru/synapse/kubelatte/pkg/api/common"
12"gitverse.ru/synapse/kubelatte/pkg/modules"
13"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
14"gitverse.ru/synapse/kubelatte/pkg/observability/logger/lib"
15"gitverse.ru/synapse/kubelatte/pkg/util"
16"gitverse.ru/synapse/kubelatte/pkg/util/env"
17kbltwebhook "gitverse.ru/synapse/kubelatte/pkg/webhook"
18"gitverse.ru/synapse/kubelatte/pkg/webhook/config"
19"io"
20kclientset "k8s.io/client-go/kubernetes"
21"net/http"
22"os"
23"os/signal"
24"sigs.k8s.io/controller-runtime/pkg/webhook"
25"sync"
26"syscall"
27"time"
28
29"gitverse.ru/synapse/kubelatte/pkg/operator/leader"
30
31_ "k8s.io/client-go/plugin/pkg/client/auth"
32ctrl "sigs.k8s.io/controller-runtime"
33//+kubebuilder:scaffold:imports
34)
35
36func waitConnectionToKubeAPI() {
37ctx := context.Background()
38logs := logger.FromContext(ctx)
39
40cfg, err := ctrl.GetConfig()
41if err != nil {
42logs.Error(err.Error())
43os.Exit(1)
44}
45
46t := &http.Transport{}
47
48if !env.WithoutCert {
49var cert tls.Certificate
50if cfg.CertFile != "" && cfg.KeyFile != "" {
51cert, err = tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
52if err != nil {
53logs.Fatal(fmt.Sprintf("Error creating x509 keypair from client cert file %s and client key file %s",
54cfg.CertFile, cfg.KeyFile))
55os.Exit(1)
56}
57}
58
59caCert, err := os.ReadFile(cfg.CAFile)
60if err != nil {
61logs.Fatal(fmt.Sprintf("Error opening cert file %s, Error: %s", cfg.CAFile, err))
62os.Exit(1)
63}
64caCertPool := x509.NewCertPool()
65caCertPool.AppendCertsFromPEM(caCert)
66
67t = &http.Transport{
68TLSClientConfig: &tls.Config{
69Certificates: []tls.Certificate{cert},
70RootCAs: caCertPool,
71},
72}
73}
74
75client := http.Client{Transport: t, Timeout: 15 * time.Second}
76var bearer = "Bearer " + cfg.BearerToken
77var uri = cfg.Host + "/livez"
78
79checkFunc := func() (bool, error) {
80rq, err := http.NewRequest("GET", uri, nil)
81if err != nil {
82logs.Error(err.Error())
83return false, nil
84}
85
86if cfg.BearerToken != "" {
87rq.Header.Add("Authorization", bearer)
88}
89
90resp, err := client.Do(rq)
91if err != nil {
92logs.Error(err.Error())
93return false, nil
94}
95defer func(Body io.ReadCloser) {
96_ = Body.Close()
97}(resp.Body)
98
99if resp.StatusCode == http.StatusOK {
100logs.Infof("Kubernetes APIServer available. Status request: %d", resp.StatusCode)
101return true, nil
102}
103logs.Error(fmt.Sprintf("Kubernetes APIServer unavailable. Status request: %d. Reconecting...", resp.StatusCode))
104return false, nil
105}
106
107options := &wait.BackoffOptions{
108BaselineDuration: time.Second,
109Limit: 10 * time.Second,
110Multiplier: 2,
111Jitter: 1,
112}
113
114ctx = context.Background()
115if err = wait.Backoff(ctx, checkFunc, options); err != nil {
116logs.Infof("Unable to start operator: %s", err.Error())
117os.Exit(1)
118}
119}
120
121func main() {
122common.InitScheme(common.Scheme)
123
124var err error
125
126env.InitEnvOperator()
127env.InitEnvWebhookServer()
128
129lib.LoggersInit(util.GetLogLevel())
130waitConnectionToKubeAPI()
131
132var metricsAddr string
133var enableLeaderElection bool
134var probeAddr string
135flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
136flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
137flag.BoolVar(&enableLeaderElection, "leader-elect", false,
138"Enable leader election for controller manager. "+
139"Enabling this will ensure there is only one active controller manager.")
140flag.Parse()
141
142log := logger.FromContext(context.Background())
143
144//start server
145go func() {
146fs := afero.NewOsFs()
147
148webhookConfig, cerr := config.Load()
149if cerr != nil {
150log.Errorf("api=main, reason=webhook.NewWebhookServer, err=%v", cerr)
151wc, err := config.NewWebhookConfig()
152if err != nil {
153panic(err)
154}
155webhookConfig = wc
156}
157
158var certReloader *util.CertificatePKIReloader
159if webhookConfig.CertFilePath != "" && webhookConfig.KeyFilePath != "" {
160certReloader = util.NewCertificatePKIReloaderFull(fs, webhookConfig.CertFilePath, webhookConfig.KeyFilePath, time.Minute*15)
161}
162
163actors := modules.GetActors()
164
165whsrv := kbltwebhook.NewWebhookServer(webhookConfig, certReloader, &sync.Mutex{}, actors)
166doneListeningTLSChannel, doneListeningHTTPChannel, err := whsrv.Start()
167if err != nil {
168log.Errorf("api=main, reason=whsrv.Start, err=%v", err)
169panic(err)
170}
171
172if cerr != nil {
173whsrv.Set(kbltwebhook.ServerConfigsError)
174}
175
176// listening OS shutdown entrySignal
177signalChan := make(chan os.Signal, 1)
178signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
179
180stopped := false
181// Wait until we receive either a termination entrySignal, or the server stops by itself for some reason
182select {
183case entrySignal := <-signalChan:
184log.Warn("Received a termination entrySignal. SIG=", entrySignal)
185case stopped = <-doneListeningTLSChannel:
186log.Warn("TLS Server has stopped on it's own... exiting.")
187case stopped = <-doneListeningHTTPChannel:
188log.Warn("HTTP Server has stopped on it's own... exiting.")
189}
190
191if !stopped {
192whsrv.Stop()
193}
194log.Info("Webhook server exited successfully.")
195
196}()
197
198opt := ctrl.Options{
199Logger: lib.LoggerSTD,
200Scheme: common.Scheme,
201MetricsBindAddress: metricsAddr,
202HealthProbeBindAddress: probeAddr,
203WebhookServer: webhook.NewServer(webhook.Options{Port: env.KbltMainPort}),
204}
205
206if env.LocalNamespaceMode {
207opt.Cache.Namespaces = append(opt.Cache.Namespaces, env.OperatorNamespace)
208} else {
209log.Fatal(fmt.Sprintf("Invalid environment variable LOCAL_NAMESPACE_MODE: use 'true' or 'false'"))
210os.Exit(1)
211}
212
213leader.Mgr, err = ctrl.NewManager(ctrl.GetConfigOrDie(), opt)
214if err != nil {
215log.Fatal("Unable to start manager")
216os.Exit(1)
217}
218
219actors := modules.ActorsOp
220actors.Permissioner.Init(
221kclientset.NewForConfigOrDie(leader.Mgr.GetConfig()),
222env.OperatorNamespace,
223env.KbltMutator,
224env.KbltCreator,
225env.KbltValidator,
226env.KbltSideEffect,
227).StartCheck()
228leader.SetLeaderAndStart(actors)
229}
230