kubelatte-ce
Форк от sbertech/kubelatte-ce
165 строк · 4.2 Кб
1package leader
2
3import (
4"context"
5"fmt"
6"github.com/go-logr/logr"
7"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/kubeapi"
8"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/modules"
9"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"
10"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger/lib"
11initManager "gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/init_manager"
12"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/utils"
13"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/sideeffect/grpc"
14"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/storage"
15"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/env"
16"net/http"
17"os"
18ctrl "sigs.k8s.io/controller-runtime"
19s8log "sigs.k8s.io/controller-runtime/pkg/log"
20"sigs.k8s.io/controller-runtime/pkg/manager"
21)
22
23var Mgr manager.Manager
24
25var (
26initMgr initManager.InitManagerI
27templateR utils.ReconcilerInterface
28triggerR utils.ReconcilerInterface
29scopeR utils.ReconcilerInterface
30
31storageController *storage.StorageController
32)
33
34func start(actors modules.ActorsOperator) {
35log := logger.FromContext(context.Background())
36utils.IsInit.Mu.Lock()
37
38if utils.IsInit.IsInit {
39log.Debug("Already init")
40return
41}
42
43actors.Audit()
44
45ctx := ctrl.SetupSignalHandler()
46
47initCtx, cancelInitCtx := context.WithCancel(context.Background())
48defer cancelInitCtx()
49
50go actors.Initer.Start(initCtx)
51
52log.Debug("[KBLT][Operator] STORAGE started")
53
54log.Debugf("STORAGE started")
55startStorage()
56
57if utils.IsControllerNeeded("scope") {
58log.Info("[KBLT][Operator] Controller SCOPE started")
59startReconciler(actors.ScoperReconciler)
60}
61
62if utils.IsControllerNeeded("trigger") {
63log.Info("[KBLT][Operator] Controller TRIGGER started")
64startReconciler(actors.TriggerReconciler)
65}
66
67if utils.IsControllerNeeded("template") {
68log.Info("[KBLT][Operator] Controller TEMPLATE started")
69startReconciler(actors.TemplateReconciler)
70}
71
72actors.Watcher.StartWatcher()
73actors.Factory.StartFactory()
74
75if utils.IsControllerNeeded("triggerInstance") {
76log.Info("[KBLT][Operator] Controller TRIGGER_INSTANCE started")
77go startTriggerInstanceReconciler(ctx, actors.TriggerInstanceReconciler)
78}
79
80log.Info("[KBLT][Operator] Controller NAMESPACE started")
81go startNamespaceReconciler(ctx, actors.NamespaceReconciler)
82
83kbltCli, err := kubeapi.GetKBLTClient()
84if err != nil {
85log.Errorf("failed get KBLTKubeClient")
86return
87}
88
89actors.SideEffectExecutor.SetKbltCli(kbltCli)
90actors.SideEffectExecutor.SetKubeCli(kubeapi.GetClient())
91
92stopGrpcServer := grpc.StartExecuteServer(8687, actors.SideEffectExecutor)
93
94utils.IsInit.IsInit = true
95utils.IsInit.Mu.Unlock()
96
97ctrl.SetLogger(logr.Logger{}.WithSink(s8log.NullLogSink{}))
98
99//+kubebuilder:scaffold:builder
100if err := Mgr.AddHealthzCheck("healthz", Healthz); err != nil {
101log.Fatalf("Unable to set up health check: %s", err.Error())
102cancelInitCtx()
103os.Exit(1)
104}
105if err := Mgr.AddReadyzCheck("readyz", Readyz); err != nil {
106log.Fatal(fmt.Sprintf("Unable to set up ready check: %s", err.Error()))
107cancelInitCtx()
108os.Exit(1)
109}
110
111StartLoggerServer(ctx)
112
113if err := Mgr.Start(ctx); err != nil {
114log.Fatalf("Problem running manager: %s", err.Error())
115stopGrpcServer()
116cancelInitCtx()
117os.Exit(1)
118}
119}
120
121func StartLoggerServer(ctx context.Context) {
122log := logger.FromContext(ctx)
123addr := fmt.Sprintf(":%d", env.KbltLoggerPort)
124router := http.NewServeMux()
125
126router.Handle("/logging", lib.AtomLevel)
127log.Info(fmt.Sprintf("See log level by path %s", fmt.Sprintf("http://localhost%s/logging", addr)))
128
129go func() {
130if err := http.ListenAndServe(addr, router); err != nil {
131log.Errorf(fmt.Sprintf("Error start debug level server: %s", err.Error()))
132os.Exit(1)
133}
134<-ctx.Done()
135log.Debugf("Stop logger server")
136os.Exit(0)
137}()
138}
139
140type opts func()
141
142func startStorage(f ...opts) {
143// [S1] Must start before storage
144storageController = &storage.StorageController{Sync: true}
145storage.Storage = storageController
146
147for _, op := range f {
148op()
149}
150// [S1] end
151
152storageController.Start(true, false)
153}
154
155func SetLeaderAndStart(actors modules.ActorsOperator) {
156start(actors)
157setGlobals(actors)
158}
159
160func setGlobals(actors modules.ActorsOperator) {
161initMgr = actors.Initer
162templateR = actors.TemplateReconciler
163triggerR = actors.TriggerReconciler
164scopeR = actors.ScoperReconciler
165}
166