kubelatte-ce
Форк от sbertech/kubelatte-ce
301 строка · 8.2 Кб
1package storage2
3import (4"context"5"github.com/open-policy-agent/opa/rego"6"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/api/v1alpha1"7"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"8"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/utils"9"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util"10"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/env"11"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/opa"12corev1 "k8s.io/api/core/v1"13metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"14"sync"15)
16
17var Storage *StorageController18
19type StorageController struct {20triggers map[string]v1alpha1.Trigger21templates map[string]v1alpha1.Template22scopes map[string]v1alpha1.Scope23namespaces map[string]corev1.Namespace24regoRules map[string]rego.PreparedEvalQuery25mutex sync.Mutex26initedFlag bool27senderMode bool28Sync bool29stop context.CancelFunc30}
31
32func (r *StorageController) Start(withInit bool, senderMode bool) {33log := logger.FromContext(context.Background())34r.triggers = map[string]v1alpha1.Trigger{}35r.templates = map[string]v1alpha1.Template{}36r.scopes = map[string]v1alpha1.Scope{}37r.namespaces = map[string]corev1.Namespace{}38r.regoRules = map[string]rego.PreparedEvalQuery{}39if senderMode {40r.senderMode = !util.IsCreatorOnlyRole()41} else {42r.senderMode = senderMode43}44
45ctx, stop := context.WithCancel(context.Background())46r.stop = stop47
48go r.startToListen(ctx)49if withInit {50r.InitStorage()51}52r.initedFlag = true53log.Debug("StorageController: Start")54}
55
56func (r *StorageController) Stop() {57log := logger.FromContext(context.Background())58r.stop()59log.Debug("StorageController: Stop")60}
61
62func (r *StorageController) startToListen(ctx context.Context) {63for {64select {65case trigger := <-utils.StorageUpdateTrigger:66go r.UpdateTrigger(trigger)67case template := <-utils.StorageUpdateTemplate:68go r.UpdateTemplate(template)69case scope := <-utils.StorageUpdateScope:70go r.UpdateScope(scope)71case namespace := <-utils.StorageUpdateNamespace:72go r.UpdateNamespace(namespace)73case trigger := <-utils.StorageDeleteTrigger:74go r.DeleteTrigger(trigger)75case template := <-utils.StorageDeleteTemplate:76go r.DeleteTemplate(template)77case scope := <-utils.StorageDeleteScope:78go r.DeleteScope(scope)79case namespace := <-utils.StorageDeleteNamespace:80go r.DeleteNamespace(namespace)81case <-ctx.Done():82return83}84}85}
86
87func (r *StorageController) InitStorage() {88r.mutex.Lock()89ctx := context.Background()90log := logger.FromContext(ctx)91if utils.IsControllerNeeded("trigger") {92//fix read from global cluster93list, err := utils.Cl.Trigger(env.OperatorNamespace).List(ctx, metav1.ListOptions{})94if err != nil {95log.Errorf("[StorageController] Failed initialize %s", err.Error())96return97}98for _, trigger := range list.Items {99r.triggers[trigger.GetNamespacedName()] = trigger100if r.senderMode {101utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": trigger}102}103}104}105if utils.IsControllerNeeded("template") {106//fix read from global cluster107tms, err := utils.Cl.Template(env.OperatorNamespace).List(ctx, metav1.ListOptions{})108if err != nil {109log.Errorf("[StorageController] Failed initialize %s", err.Error())110return111}112for _, tm := range tms.Items {113r.templates[tm.GetNamespacedName()] = tm114if r.senderMode {115utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": tm}116}117}118}119
120if utils.IsControllerNeeded("scope") {121ss, err := utils.Cl.Scope(env.OperatorNamespace).List(ctx, metav1.ListOptions{})122if err != nil {123log.Errorf("[StorageController] Failed initialize %s", err.Error())124return125}126for _, s := range ss.Items {127r.scopes[s.GetNamespacedName()] = s128if r.senderMode {129utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": s}130}131}132}133r.mutex.Unlock()134}
135
136func (r *StorageController) UpdateTrigger(trigger *v1alpha1.Trigger) {137r.mutex.Lock()138r.triggers[trigger.GetNamespacedName()] = *trigger139r.mutex.Unlock()140if r.senderMode {141utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *trigger}142}143}
144
145func (r *StorageController) UpdateTemplate(template *v1alpha1.Template) {146r.mutex.Lock()147r.templates[template.GetNamespacedName()] = *template148r.mutex.Unlock()149
150if !r.senderMode {151if template.Spec.Type == "rego" {152r.UpdateRegoRule(template.GetNamespacedName(), template.Spec)153} else {154_, ok := r.regoRules[template.GetNamespacedName()]155if ok {156r.DeleteRegoRule(template.GetNamespacedName())157}158}159}160if r.senderMode {161utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *template}162}163}
164
165func (r *StorageController) UpdateRegoRule(namespacedName string, ts v1alpha1.TemplateSpec) {166precompileRule, _ := opa.Precompile(ts.Data)167r.mutex.Lock()168r.regoRules[namespacedName] = precompileRule169r.mutex.Unlock()170}
171
172func (r *StorageController) UpdateScope(scope v1alpha1.Scope) {173r.mutex.Lock()174r.scopes[scope.GetNamespacedName()] = scope175r.mutex.Unlock()176if r.senderMode {177utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": scope}178}179}
180
181func (r *StorageController) UpdateNamespace(namespace corev1.Namespace) {182r.mutex.Lock()183ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace.ObjectMeta.Name,184Namespace: namespace.ObjectMeta.Namespace, Labels: namespace.ObjectMeta.Labels,185}}186r.namespaces[namespace.ObjectMeta.Name] = ns187r.mutex.Unlock()188if r.senderMode {189utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": namespace}190}191}
192
193func (r *StorageController) DeleteTrigger(trigger *v1alpha1.Trigger) {194r.mutex.Lock()195delete(r.triggers, trigger.GetNamespacedName())196r.mutex.Unlock()197if r.senderMode {198utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *trigger}199}200}
201
202func (r *StorageController) DeleteTemplate(template *v1alpha1.Template) {203r.mutex.Lock()204delete(r.templates, template.GetNamespacedName())205r.mutex.Unlock()206
207if !r.senderMode {208_, ok := r.regoRules[template.GetNamespacedName()]209if ok {210r.DeleteRegoRule(template.GetNamespacedName())211}212}213if r.senderMode {214utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *template}215}216}
217
218func (r *StorageController) DeleteRegoRule(namespacedName string) {219r.mutex.Lock()220delete(r.regoRules, namespacedName)221r.mutex.Unlock()222}
223
224func (r *StorageController) DeleteScope(scope v1alpha1.Scope) {225r.mutex.Lock()226delete(r.scopes, scope.GetNamespacedName())227r.mutex.Unlock()228if r.senderMode {229utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": scope}230}231}
232
233func (r *StorageController) DeleteNamespace(namespace corev1.Namespace) {234r.mutex.Lock()235delete(r.namespaces, namespace.ObjectMeta.Name)236r.mutex.Unlock()237if r.senderMode {238utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": namespace}239}240}
241
242func (r *StorageController) CheckTriggerTemplatePair() {243log := logger.FromContext(context.Background())244for _, trigger := range r.triggers {245for _, config := range trigger.Spec.CreationConfigs {246for _, ref := range config.TemplateRefs {247_, ok := r.templates[ref]248if !ok {249log.Errorf("StorageController: CheckTriggerTemplatePair Trigger[%s] contains invalid template ref %s", trigger.Name, ref)250}251}252}253}254}
255
256func (r *StorageController) GetScopes() map[string]v1alpha1.Scope {257defer r.mutex.Unlock()258r.mutex.Lock()259return r.scopes260}
261
262func (r *StorageController) GetTemplates() map[string]v1alpha1.Template {263defer r.mutex.Unlock()264r.mutex.Lock()265return r.templates266}
267
268func (r *StorageController) GetRegoRules() map[string]rego.PreparedEvalQuery {269defer r.mutex.Unlock()270r.mutex.Lock()271return r.regoRules272}
273
274func (r *StorageController) GetTriggers() map[string]v1alpha1.Trigger {275defer r.mutex.Unlock()276r.mutex.Lock()277return r.triggers278}
279
280func (r *StorageController) GetNamespaces() map[string]corev1.Namespace {281defer r.mutex.Unlock()282r.mutex.Lock()283return r.namespaces284}
285
286func (r *StorageController) GetScopeItems(scType string) []v1alpha1.Item {287defer r.mutex.Unlock()288r.mutex.Lock()289var result []v1alpha1.Item290for _, val := range r.scopes {291if val.Spec.Type == scType {292result = append(result, val.Spec.Items...)293}294}295
296return result297}
298
299func (r *StorageController) HealthCheck() bool {300return r.initedFlag301}
302