kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
165 строк · 4.2 Кб
1
package leader
2

3
import (
4
	"context"
5
	"fmt"
6
	"github.com/go-logr/logr"
7
	"gitverse.ru/synapse/kubelatte/pkg/kubeapi"
8
	"gitverse.ru/synapse/kubelatte/pkg/modules"
9
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
10
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger/lib"
11
	initManager "gitverse.ru/synapse/kubelatte/pkg/operator/init_manager"
12
	"gitverse.ru/synapse/kubelatte/pkg/operator/utils"
13
	"gitverse.ru/synapse/kubelatte/pkg/sideeffect/grpc"
14
	"gitverse.ru/synapse/kubelatte/pkg/storage"
15
	"gitverse.ru/synapse/kubelatte/pkg/util/env"
16
	"net/http"
17
	"os"
18
	ctrl "sigs.k8s.io/controller-runtime"
19
	s8log "sigs.k8s.io/controller-runtime/pkg/log"
20
	"sigs.k8s.io/controller-runtime/pkg/manager"
21
)
22

23
var Mgr manager.Manager
24

25
var (
26
	initMgr   initManager.InitManagerI
27
	templateR utils.ReconcilerInterface
28
	triggerR  utils.ReconcilerInterface
29
	scopeR    utils.ReconcilerInterface
30

31
	storageController *storage.StorageController
32
)
33

34
func start(actors modules.ActorsOperator) {
35
	log := logger.FromContext(context.Background())
36
	utils.IsInit.Mu.Lock()
37

38
	if utils.IsInit.IsInit {
39
		log.Debug("Already init")
40
		return
41
	}
42

43
	actors.Audit()
44

45
	ctx := ctrl.SetupSignalHandler()
46

47
	initCtx, cancelInitCtx := context.WithCancel(context.Background())
48
	defer cancelInitCtx()
49

50
	go actors.Initer.Start(initCtx)
51

52
	log.Debug("[KBLT][Operator] STORAGE started")
53

54
	log.Debugf("STORAGE started")
55
	startStorage()
56

57
	if utils.IsControllerNeeded("scope") {
58
		log.Info("[KBLT][Operator] Controller SCOPE started")
59
		startReconciler(actors.ScoperReconciler)
60
	}
61

62
	if utils.IsControllerNeeded("trigger") {
63
		log.Info("[KBLT][Operator] Controller TRIGGER started")
64
		startReconciler(actors.TriggerReconciler)
65
	}
66

67
	if utils.IsControllerNeeded("template") {
68
		log.Info("[KBLT][Operator] Controller TEMPLATE started")
69
		startReconciler(actors.TemplateReconciler)
70
	}
71

72
	actors.Watcher.StartWatcher()
73
	actors.Factory.StartFactory()
74

75
	if utils.IsControllerNeeded("triggerInstance") {
76
		log.Info("[KBLT][Operator] Controller TRIGGER_INSTANCE started")
77
		go startTriggerInstanceReconciler(ctx, actors.TriggerInstanceReconciler)
78
	}
79

80
	log.Info("[KBLT][Operator] Controller NAMESPACE started")
81
	go startNamespaceReconciler(ctx, actors.NamespaceReconciler)
82

83
	kbltCli, err := kubeapi.GetKBLTClient()
84
	if err != nil {
85
		log.Errorf("failed get KBLTKubeClient")
86
		return
87
	}
88

89
	actors.SideEffectExecutor.SetKbltCli(kbltCli)
90
	actors.SideEffectExecutor.SetKubeCli(kubeapi.GetClient())
91

92
	stopGrpcServer := grpc.StartExecuteServer(8687, actors.SideEffectExecutor)
93

94
	utils.IsInit.IsInit = true
95
	utils.IsInit.Mu.Unlock()
96

97
	ctrl.SetLogger(logr.Logger{}.WithSink(s8log.NullLogSink{}))
98

99
	//+kubebuilder:scaffold:builder
100
	if err := Mgr.AddHealthzCheck("healthz", Healthz); err != nil {
101
		log.Fatalf("Unable to set up health check: %s", err.Error())
102
		cancelInitCtx()
103
		os.Exit(1)
104
	}
105
	if err := Mgr.AddReadyzCheck("readyz", Readyz); err != nil {
106
		log.Fatal(fmt.Sprintf("Unable to set up ready check: %s", err.Error()))
107
		cancelInitCtx()
108
		os.Exit(1)
109
	}
110

111
	StartLoggerServer(ctx)
112

113
	if err := Mgr.Start(ctx); err != nil {
114
		log.Fatalf("Problem running manager: %s", err.Error())
115
		stopGrpcServer()
116
		cancelInitCtx()
117
		os.Exit(1)
118
	}
119
}
120

121
func StartLoggerServer(ctx context.Context) {
122
	log := logger.FromContext(ctx)
123
	addr := fmt.Sprintf(":%d", env.KbltLoggerPort)
124
	router := http.NewServeMux()
125

126
	router.Handle("/logging", lib.AtomLevel)
127
	log.Info(fmt.Sprintf("See log level by path %s", fmt.Sprintf("http://localhost%s/logging", addr)))
128

129
	go func() {
130
		if err := http.ListenAndServe(addr, router); err != nil {
131
			log.Errorf(fmt.Sprintf("Error start debug level server: %s", err.Error()))
132
			os.Exit(1)
133
		}
134
		<-ctx.Done()
135
		log.Debugf("Stop logger server")
136
		os.Exit(0)
137
	}()
138
}
139

140
type opts func()
141

142
func startStorage(f ...opts) {
143
	// [S1] Must start before storage
144
	storageController = &storage.StorageController{Sync: true}
145
	storage.Storage = storageController
146

147
	for _, op := range f {
148
		op()
149
	}
150
	// [S1] end
151

152
	storageController.Start(true, false)
153
}
154

155
func SetLeaderAndStart(actors modules.ActorsOperator) {
156
	start(actors)
157
	setGlobals(actors)
158
}
159

160
func setGlobals(actors modules.ActorsOperator) {
161
	initMgr = actors.Initer
162
	templateR = actors.TemplateReconciler
163
	triggerR = actors.TriggerReconciler
164
	scopeR = actors.ScoperReconciler
165
}
166

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.