В 22:00 МСК будет объявлен перерыв - 10 минут. Вы отдыхаете - мы обновляем!

kubelatte-ce

Форк от sbertech/kubelatte-ce
Форк
2
229 строк · 6.1 Кб
1
package main
2

3
import (
4
	"context"
5
	"crypto/tls"
6
	"crypto/x509"
7
	"flag"
8
	"fmt"
9
	wait "github.com/asaf-shitrit/go-wait"
10
	"github.com/spf13/afero"
11
	"gitverse.ru/synapse/kubelatte/pkg/api/common"
12
	"gitverse.ru/synapse/kubelatte/pkg/modules"
13
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
14
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger/lib"
15
	"gitverse.ru/synapse/kubelatte/pkg/util"
16
	"gitverse.ru/synapse/kubelatte/pkg/util/env"
17
	kbltwebhook "gitverse.ru/synapse/kubelatte/pkg/webhook"
18
	"gitverse.ru/synapse/kubelatte/pkg/webhook/config"
19
	"io"
20
	kclientset "k8s.io/client-go/kubernetes"
21
	"net/http"
22
	"os"
23
	"os/signal"
24
	"sigs.k8s.io/controller-runtime/pkg/webhook"
25
	"sync"
26
	"syscall"
27
	"time"
28

29
	"gitverse.ru/synapse/kubelatte/pkg/operator/leader"
30

31
	_ "k8s.io/client-go/plugin/pkg/client/auth"
32
	ctrl "sigs.k8s.io/controller-runtime"
33
	//+kubebuilder:scaffold:imports
34
)
35

36
func waitConnectionToKubeAPI() {
37
	ctx := context.Background()
38
	logs := logger.FromContext(ctx)
39

40
	cfg, err := ctrl.GetConfig()
41
	if err != nil {
42
		logs.Error(err.Error())
43
		os.Exit(1)
44
	}
45

46
	t := &http.Transport{}
47

48
	if !env.WithoutCert {
49
		var cert tls.Certificate
50
		if cfg.CertFile != "" && cfg.KeyFile != "" {
51
			cert, err = tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
52
			if err != nil {
53
				logs.Fatal(fmt.Sprintf("Error creating x509 keypair from client cert file %s and client key file %s",
54
					cfg.CertFile, cfg.KeyFile))
55
				os.Exit(1)
56
			}
57
		}
58

59
		caCert, err := os.ReadFile(cfg.CAFile)
60
		if err != nil {
61
			logs.Fatal(fmt.Sprintf("Error opening cert file %s, Error: %s", cfg.CAFile, err))
62
			os.Exit(1)
63
		}
64
		caCertPool := x509.NewCertPool()
65
		caCertPool.AppendCertsFromPEM(caCert)
66

67
		t = &http.Transport{
68
			TLSClientConfig: &tls.Config{
69
				Certificates: []tls.Certificate{cert},
70
				RootCAs:      caCertPool,
71
			},
72
		}
73
	}
74

75
	client := http.Client{Transport: t, Timeout: 15 * time.Second}
76
	var bearer = "Bearer " + cfg.BearerToken
77
	var uri = cfg.Host + "/livez"
78

79
	checkFunc := func() (bool, error) {
80
		rq, err := http.NewRequest("GET", uri, nil)
81
		if err != nil {
82
			logs.Error(err.Error())
83
			return false, nil
84
		}
85

86
		if cfg.BearerToken != "" {
87
			rq.Header.Add("Authorization", bearer)
88
		}
89

90
		resp, err := client.Do(rq)
91
		if err != nil {
92
			logs.Error(err.Error())
93
			return false, nil
94
		}
95
		defer func(Body io.ReadCloser) {
96
			_ = Body.Close()
97
		}(resp.Body)
98

99
		if resp.StatusCode == http.StatusOK {
100
			logs.Infof("Kubernetes APIServer available. Status request: %d", resp.StatusCode)
101
			return true, nil
102
		}
103
		logs.Error(fmt.Sprintf("Kubernetes APIServer unavailable. Status request: %d. Reconecting...", resp.StatusCode))
104
		return false, nil
105
	}
106

107
	options := &wait.BackoffOptions{
108
		BaselineDuration: time.Second,
109
		Limit:            10 * time.Second,
110
		Multiplier:       2,
111
		Jitter:           1,
112
	}
113

114
	ctx = context.Background()
115
	if err = wait.Backoff(ctx, checkFunc, options); err != nil {
116
		logs.Infof("Unable to start operator: %s", err.Error())
117
		os.Exit(1)
118
	}
119
}
120

121
func main() {
122
	common.InitScheme(common.Scheme)
123

124
	var err error
125

126
	env.InitEnvOperator()
127
	env.InitEnvWebhookServer()
128

129
	lib.LoggersInit(util.GetLogLevel())
130
	waitConnectionToKubeAPI()
131

132
	var metricsAddr string
133
	var enableLeaderElection bool
134
	var probeAddr string
135
	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
136
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
137
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
138
		"Enable leader election for controller manager. "+
139
			"Enabling this will ensure there is only one active controller manager.")
140
	flag.Parse()
141

142
	log := logger.FromContext(context.Background())
143

144
	//start server
145
	go func() {
146
		fs := afero.NewOsFs()
147

148
		webhookConfig, cerr := config.Load()
149
		if cerr != nil {
150
			log.Errorf("api=main, reason=webhook.NewWebhookServer, err=%v", cerr)
151
			wc, err := config.NewWebhookConfig()
152
			if err != nil {
153
				panic(err)
154
			}
155
			webhookConfig = wc
156
		}
157

158
		var certReloader *util.CertificatePKIReloader
159
		if webhookConfig.CertFilePath != "" && webhookConfig.KeyFilePath != "" {
160
			certReloader = util.NewCertificatePKIReloaderFull(fs, webhookConfig.CertFilePath, webhookConfig.KeyFilePath, time.Minute*15)
161
		}
162

163
		actors := modules.GetActors()
164

165
		whsrv := kbltwebhook.NewWebhookServer(webhookConfig, certReloader, &sync.Mutex{}, actors)
166
		doneListeningTLSChannel, doneListeningHTTPChannel, err := whsrv.Start()
167
		if err != nil {
168
			log.Errorf("api=main, reason=whsrv.Start, err=%v", err)
169
			panic(err)
170
		}
171

172
		if cerr != nil {
173
			whsrv.Set(kbltwebhook.ServerConfigsError)
174
		}
175

176
		// listening OS shutdown entrySignal
177
		signalChan := make(chan os.Signal, 1)
178
		signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
179

180
		stopped := false
181
		// Wait until we receive either a termination entrySignal, or the server stops by itself for some reason
182
		select {
183
		case entrySignal := <-signalChan:
184
			log.Warn("Received a termination entrySignal. SIG=", entrySignal)
185
		case stopped = <-doneListeningTLSChannel:
186
			log.Warn("TLS Server has stopped on it's own... exiting.")
187
		case stopped = <-doneListeningHTTPChannel:
188
			log.Warn("HTTP Server has stopped on it's own... exiting.")
189
		}
190

191
		if !stopped {
192
			whsrv.Stop()
193
		}
194
		log.Info("Webhook server exited successfully.")
195

196
	}()
197

198
	opt := ctrl.Options{
199
		Logger:                 lib.LoggerSTD,
200
		Scheme:                 common.Scheme,
201
		MetricsBindAddress:     metricsAddr,
202
		HealthProbeBindAddress: probeAddr,
203
		WebhookServer:          webhook.NewServer(webhook.Options{Port: env.KbltMainPort}),
204
	}
205

206
	if env.LocalNamespaceMode {
207
		opt.Cache.Namespaces = append(opt.Cache.Namespaces, env.OperatorNamespace)
208
	} else {
209
		log.Fatal(fmt.Sprintf("Invalid environment variable LOCAL_NAMESPACE_MODE: use 'true' or 'false'"))
210
		os.Exit(1)
211
	}
212

213
	leader.Mgr, err = ctrl.NewManager(ctrl.GetConfigOrDie(), opt)
214
	if err != nil {
215
		log.Fatal("Unable to start manager")
216
		os.Exit(1)
217
	}
218

219
	actors := modules.ActorsOp
220
	actors.Permissioner.Init(
221
		kclientset.NewForConfigOrDie(leader.Mgr.GetConfig()),
222
		env.OperatorNamespace,
223
		env.KbltMutator,
224
		env.KbltCreator,
225
		env.KbltValidator,
226
		env.KbltSideEffect,
227
	).StartCheck()
228
	leader.SetLeaderAndStart(actors)
229
}
230

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.