kubelatte-ce
Форк от sbertech/kubelatte-ce
301 строка · 8.2 Кб
1package storage
2
3import (
4"context"
5"github.com/open-policy-agent/opa/rego"
6"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
7"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
8"gitverse.ru/synapse/kubelatte/pkg/operator/utils"
9"gitverse.ru/synapse/kubelatte/pkg/util"
10"gitverse.ru/synapse/kubelatte/pkg/util/env"
11"gitverse.ru/synapse/kubelatte/pkg/util/opa"
12corev1 "k8s.io/api/core/v1"
13metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14"sync"
15)
16
17var Storage *StorageController
18
19type StorageController struct {
20triggers map[string]v1alpha1.Trigger
21templates map[string]v1alpha1.Template
22scopes map[string]v1alpha1.Scope
23namespaces map[string]corev1.Namespace
24regoRules map[string]rego.PreparedEvalQuery
25mutex sync.Mutex
26initedFlag bool
27senderMode bool
28Sync bool
29stop context.CancelFunc
30}
31
32func (r *StorageController) Start(withInit bool, senderMode bool) {
33log := logger.FromContext(context.Background())
34r.triggers = map[string]v1alpha1.Trigger{}
35r.templates = map[string]v1alpha1.Template{}
36r.scopes = map[string]v1alpha1.Scope{}
37r.namespaces = map[string]corev1.Namespace{}
38r.regoRules = map[string]rego.PreparedEvalQuery{}
39if senderMode {
40r.senderMode = !util.IsCreatorOnlyRole()
41} else {
42r.senderMode = senderMode
43}
44
45ctx, stop := context.WithCancel(context.Background())
46r.stop = stop
47
48go r.startToListen(ctx)
49if withInit {
50r.InitStorage()
51}
52r.initedFlag = true
53log.Debug("StorageController: Start")
54}
55
56func (r *StorageController) Stop() {
57log := logger.FromContext(context.Background())
58r.stop()
59log.Debug("StorageController: Stop")
60}
61
62func (r *StorageController) startToListen(ctx context.Context) {
63for {
64select {
65case trigger := <-utils.StorageUpdateTrigger:
66go r.UpdateTrigger(trigger)
67case template := <-utils.StorageUpdateTemplate:
68go r.UpdateTemplate(template)
69case scope := <-utils.StorageUpdateScope:
70go r.UpdateScope(scope)
71case namespace := <-utils.StorageUpdateNamespace:
72go r.UpdateNamespace(namespace)
73case trigger := <-utils.StorageDeleteTrigger:
74go r.DeleteTrigger(trigger)
75case template := <-utils.StorageDeleteTemplate:
76go r.DeleteTemplate(template)
77case scope := <-utils.StorageDeleteScope:
78go r.DeleteScope(scope)
79case namespace := <-utils.StorageDeleteNamespace:
80go r.DeleteNamespace(namespace)
81case <-ctx.Done():
82return
83}
84}
85}
86
87func (r *StorageController) InitStorage() {
88r.mutex.Lock()
89ctx := context.Background()
90log := logger.FromContext(ctx)
91if utils.IsControllerNeeded("trigger") {
92//fix read from global cluster
93list, err := utils.Cl.Trigger(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
94if err != nil {
95log.Errorf("[StorageController] Failed initialize %s", err.Error())
96return
97}
98for _, trigger := range list.Items {
99r.triggers[trigger.GetNamespacedName()] = trigger
100if r.senderMode {
101utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": trigger}
102}
103}
104}
105if utils.IsControllerNeeded("template") {
106//fix read from global cluster
107tms, err := utils.Cl.Template(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
108if err != nil {
109log.Errorf("[StorageController] Failed initialize %s", err.Error())
110return
111}
112for _, tm := range tms.Items {
113r.templates[tm.GetNamespacedName()] = tm
114if r.senderMode {
115utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": tm}
116}
117}
118}
119
120if utils.IsControllerNeeded("scope") {
121ss, err := utils.Cl.Scope(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
122if err != nil {
123log.Errorf("[StorageController] Failed initialize %s", err.Error())
124return
125}
126for _, s := range ss.Items {
127r.scopes[s.GetNamespacedName()] = s
128if r.senderMode {
129utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": s}
130}
131}
132}
133r.mutex.Unlock()
134}
135
136func (r *StorageController) UpdateTrigger(trigger *v1alpha1.Trigger) {
137r.mutex.Lock()
138r.triggers[trigger.GetNamespacedName()] = *trigger
139r.mutex.Unlock()
140if r.senderMode {
141utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *trigger}
142}
143}
144
145func (r *StorageController) UpdateTemplate(template *v1alpha1.Template) {
146r.mutex.Lock()
147r.templates[template.GetNamespacedName()] = *template
148r.mutex.Unlock()
149
150if !r.senderMode {
151if template.Spec.Type == "rego" {
152r.UpdateRegoRule(template.GetNamespacedName(), template.Spec)
153} else {
154_, ok := r.regoRules[template.GetNamespacedName()]
155if ok {
156r.DeleteRegoRule(template.GetNamespacedName())
157}
158}
159}
160if r.senderMode {
161utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *template}
162}
163}
164
165func (r *StorageController) UpdateRegoRule(namespacedName string, ts v1alpha1.TemplateSpec) {
166precompileRule, _ := opa.Precompile(ts.Data)
167r.mutex.Lock()
168r.regoRules[namespacedName] = precompileRule
169r.mutex.Unlock()
170}
171
172func (r *StorageController) UpdateScope(scope v1alpha1.Scope) {
173r.mutex.Lock()
174r.scopes[scope.GetNamespacedName()] = scope
175r.mutex.Unlock()
176if r.senderMode {
177utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": scope}
178}
179}
180
181func (r *StorageController) UpdateNamespace(namespace corev1.Namespace) {
182r.mutex.Lock()
183ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace.ObjectMeta.Name,
184Namespace: namespace.ObjectMeta.Namespace, Labels: namespace.ObjectMeta.Labels,
185}}
186r.namespaces[namespace.ObjectMeta.Name] = ns
187r.mutex.Unlock()
188if r.senderMode {
189utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": namespace}
190}
191}
192
193func (r *StorageController) DeleteTrigger(trigger *v1alpha1.Trigger) {
194r.mutex.Lock()
195delete(r.triggers, trigger.GetNamespacedName())
196r.mutex.Unlock()
197if r.senderMode {
198utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *trigger}
199}
200}
201
202func (r *StorageController) DeleteTemplate(template *v1alpha1.Template) {
203r.mutex.Lock()
204delete(r.templates, template.GetNamespacedName())
205r.mutex.Unlock()
206
207if !r.senderMode {
208_, ok := r.regoRules[template.GetNamespacedName()]
209if ok {
210r.DeleteRegoRule(template.GetNamespacedName())
211}
212}
213if r.senderMode {
214utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *template}
215}
216}
217
218func (r *StorageController) DeleteRegoRule(namespacedName string) {
219r.mutex.Lock()
220delete(r.regoRules, namespacedName)
221r.mutex.Unlock()
222}
223
224func (r *StorageController) DeleteScope(scope v1alpha1.Scope) {
225r.mutex.Lock()
226delete(r.scopes, scope.GetNamespacedName())
227r.mutex.Unlock()
228if r.senderMode {
229utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": scope}
230}
231}
232
233func (r *StorageController) DeleteNamespace(namespace corev1.Namespace) {
234r.mutex.Lock()
235delete(r.namespaces, namespace.ObjectMeta.Name)
236r.mutex.Unlock()
237if r.senderMode {
238utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": namespace}
239}
240}
241
242func (r *StorageController) CheckTriggerTemplatePair() {
243log := logger.FromContext(context.Background())
244for _, trigger := range r.triggers {
245for _, config := range trigger.Spec.CreationConfigs {
246for _, ref := range config.TemplateRefs {
247_, ok := r.templates[ref]
248if !ok {
249log.Errorf("StorageController: CheckTriggerTemplatePair Trigger[%s] contains invalid template ref %s", trigger.Name, ref)
250}
251}
252}
253}
254}
255
256func (r *StorageController) GetScopes() map[string]v1alpha1.Scope {
257defer r.mutex.Unlock()
258r.mutex.Lock()
259return r.scopes
260}
261
262func (r *StorageController) GetTemplates() map[string]v1alpha1.Template {
263defer r.mutex.Unlock()
264r.mutex.Lock()
265return r.templates
266}
267
268func (r *StorageController) GetRegoRules() map[string]rego.PreparedEvalQuery {
269defer r.mutex.Unlock()
270r.mutex.Lock()
271return r.regoRules
272}
273
274func (r *StorageController) GetTriggers() map[string]v1alpha1.Trigger {
275defer r.mutex.Unlock()
276r.mutex.Lock()
277return r.triggers
278}
279
280func (r *StorageController) GetNamespaces() map[string]corev1.Namespace {
281defer r.mutex.Unlock()
282r.mutex.Lock()
283return r.namespaces
284}
285
286func (r *StorageController) GetScopeItems(scType string) []v1alpha1.Item {
287defer r.mutex.Unlock()
288r.mutex.Lock()
289var result []v1alpha1.Item
290for _, val := range r.scopes {
291if val.Spec.Type == scType {
292result = append(result, val.Spec.Items...)
293}
294}
295
296return result
297}
298
299func (r *StorageController) HealthCheck() bool {
300return r.initedFlag
301}
302