kubelatte-ce
Форк от sbertech/kubelatte-ce
165 строк · 4.2 Кб
1package leader2
3import (4"context"5"fmt"6"github.com/go-logr/logr"7"gitverse.ru/synapse/kubelatte/pkg/kubeapi"8"gitverse.ru/synapse/kubelatte/pkg/modules"9"gitverse.ru/synapse/kubelatte/pkg/observability/logger"10"gitverse.ru/synapse/kubelatte/pkg/observability/logger/lib"11initManager "gitverse.ru/synapse/kubelatte/pkg/operator/init_manager"12"gitverse.ru/synapse/kubelatte/pkg/operator/utils"13"gitverse.ru/synapse/kubelatte/pkg/sideeffect/grpc"14"gitverse.ru/synapse/kubelatte/pkg/storage"15"gitverse.ru/synapse/kubelatte/pkg/util/env"16"net/http"17"os"18ctrl "sigs.k8s.io/controller-runtime"19s8log "sigs.k8s.io/controller-runtime/pkg/log"20"sigs.k8s.io/controller-runtime/pkg/manager"21)
22
23var Mgr manager.Manager24
25var (26initMgr initManager.InitManagerI27templateR utils.ReconcilerInterface28triggerR utils.ReconcilerInterface29scopeR utils.ReconcilerInterface30
31storageController *storage.StorageController32)
33
34func start(actors modules.ActorsOperator) {35log := logger.FromContext(context.Background())36utils.IsInit.Mu.Lock()37
38if utils.IsInit.IsInit {39log.Debug("Already init")40return41}42
43actors.Audit()44
45ctx := ctrl.SetupSignalHandler()46
47initCtx, cancelInitCtx := context.WithCancel(context.Background())48defer cancelInitCtx()49
50go actors.Initer.Start(initCtx)51
52log.Debug("[KBLT][Operator] STORAGE started")53
54log.Debugf("STORAGE started")55startStorage()56
57if utils.IsControllerNeeded("scope") {58log.Info("[KBLT][Operator] Controller SCOPE started")59startReconciler(actors.ScoperReconciler)60}61
62if utils.IsControllerNeeded("trigger") {63log.Info("[KBLT][Operator] Controller TRIGGER started")64startReconciler(actors.TriggerReconciler)65}66
67if utils.IsControllerNeeded("template") {68log.Info("[KBLT][Operator] Controller TEMPLATE started")69startReconciler(actors.TemplateReconciler)70}71
72actors.Watcher.StartWatcher()73actors.Factory.StartFactory()74
75if utils.IsControllerNeeded("triggerInstance") {76log.Info("[KBLT][Operator] Controller TRIGGER_INSTANCE started")77go startTriggerInstanceReconciler(ctx, actors.TriggerInstanceReconciler)78}79
80log.Info("[KBLT][Operator] Controller NAMESPACE started")81go startNamespaceReconciler(ctx, actors.NamespaceReconciler)82
83kbltCli, err := kubeapi.GetKBLTClient()84if err != nil {85log.Errorf("failed get KBLTKubeClient")86return87}88
89actors.SideEffectExecutor.SetKbltCli(kbltCli)90actors.SideEffectExecutor.SetKubeCli(kubeapi.GetClient())91
92stopGrpcServer := grpc.StartExecuteServer(8687, actors.SideEffectExecutor)93
94utils.IsInit.IsInit = true95utils.IsInit.Mu.Unlock()96
97ctrl.SetLogger(logr.Logger{}.WithSink(s8log.NullLogSink{}))98
99//+kubebuilder:scaffold:builder100if err := Mgr.AddHealthzCheck("healthz", Healthz); err != nil {101log.Fatalf("Unable to set up health check: %s", err.Error())102cancelInitCtx()103os.Exit(1)104}105if err := Mgr.AddReadyzCheck("readyz", Readyz); err != nil {106log.Fatal(fmt.Sprintf("Unable to set up ready check: %s", err.Error()))107cancelInitCtx()108os.Exit(1)109}110
111StartLoggerServer(ctx)112
113if err := Mgr.Start(ctx); err != nil {114log.Fatalf("Problem running manager: %s", err.Error())115stopGrpcServer()116cancelInitCtx()117os.Exit(1)118}119}
120
121func StartLoggerServer(ctx context.Context) {122log := logger.FromContext(ctx)123addr := fmt.Sprintf(":%d", env.KbltLoggerPort)124router := http.NewServeMux()125
126router.Handle("/logging", lib.AtomLevel)127log.Info(fmt.Sprintf("See log level by path %s", fmt.Sprintf("http://localhost%s/logging", addr)))128
129go func() {130if err := http.ListenAndServe(addr, router); err != nil {131log.Errorf(fmt.Sprintf("Error start debug level server: %s", err.Error()))132os.Exit(1)133}134<-ctx.Done()135log.Debugf("Stop logger server")136os.Exit(0)137}()138}
139
140type opts func()141
142func startStorage(f ...opts) {143// [S1] Must start before storage144storageController = &storage.StorageController{Sync: true}145storage.Storage = storageController146
147for _, op := range f {148op()149}150// [S1] end151
152storageController.Start(true, false)153}
154
155func SetLeaderAndStart(actors modules.ActorsOperator) {156start(actors)157setGlobals(actors)158}
159
160func setGlobals(actors modules.ActorsOperator) {161initMgr = actors.Initer162templateR = actors.TemplateReconciler163triggerR = actors.TriggerReconciler164scopeR = actors.ScoperReconciler165}
166