kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
/
storage.go 
301 строка · 8.2 Кб
1
package storage
2

3
import (
4
	"context"
5
	"github.com/open-policy-agent/opa/rego"
6
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/api/v1alpha1"
7
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"
8
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/utils"
9
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util"
10
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/env"
11
	"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/opa"
12
	corev1 "k8s.io/api/core/v1"
13
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
	"sync"
15
)
16

17
var Storage *StorageController
18

19
type StorageController struct {
20
	triggers   map[string]v1alpha1.Trigger
21
	templates  map[string]v1alpha1.Template
22
	scopes     map[string]v1alpha1.Scope
23
	namespaces map[string]corev1.Namespace
24
	regoRules  map[string]rego.PreparedEvalQuery
25
	mutex      sync.Mutex
26
	initedFlag bool
27
	senderMode bool
28
	Sync       bool
29
	stop       context.CancelFunc
30
}
31

32
func (r *StorageController) Start(withInit bool, senderMode bool) {
33
	log := logger.FromContext(context.Background())
34
	r.triggers = map[string]v1alpha1.Trigger{}
35
	r.templates = map[string]v1alpha1.Template{}
36
	r.scopes = map[string]v1alpha1.Scope{}
37
	r.namespaces = map[string]corev1.Namespace{}
38
	r.regoRules = map[string]rego.PreparedEvalQuery{}
39
	if senderMode {
40
		r.senderMode = !util.IsCreatorOnlyRole()
41
	} else {
42
		r.senderMode = senderMode
43
	}
44

45
	ctx, stop := context.WithCancel(context.Background())
46
	r.stop = stop
47

48
	go r.startToListen(ctx)
49
	if withInit {
50
		r.InitStorage()
51
	}
52
	r.initedFlag = true
53
	log.Debug("StorageController: Start")
54
}
55

56
func (r *StorageController) Stop() {
57
	log := logger.FromContext(context.Background())
58
	r.stop()
59
	log.Debug("StorageController: Stop")
60
}
61

62
func (r *StorageController) startToListen(ctx context.Context) {
63
	for {
64
		select {
65
		case trigger := <-utils.StorageUpdateTrigger:
66
			go r.UpdateTrigger(trigger)
67
		case template := <-utils.StorageUpdateTemplate:
68
			go r.UpdateTemplate(template)
69
		case scope := <-utils.StorageUpdateScope:
70
			go r.UpdateScope(scope)
71
		case namespace := <-utils.StorageUpdateNamespace:
72
			go r.UpdateNamespace(namespace)
73
		case trigger := <-utils.StorageDeleteTrigger:
74
			go r.DeleteTrigger(trigger)
75
		case template := <-utils.StorageDeleteTemplate:
76
			go r.DeleteTemplate(template)
77
		case scope := <-utils.StorageDeleteScope:
78
			go r.DeleteScope(scope)
79
		case namespace := <-utils.StorageDeleteNamespace:
80
			go r.DeleteNamespace(namespace)
81
		case <-ctx.Done():
82
			return
83
		}
84
	}
85
}
86

87
func (r *StorageController) InitStorage() {
88
	r.mutex.Lock()
89
	ctx := context.Background()
90
	log := logger.FromContext(ctx)
91
	if utils.IsControllerNeeded("trigger") {
92
		//fix read from global cluster
93
		list, err := utils.Cl.Trigger(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
94
		if err != nil {
95
			log.Errorf("[StorageController] Failed initialize %s", err.Error())
96
			return
97
		}
98
		for _, trigger := range list.Items {
99
			r.triggers[trigger.GetNamespacedName()] = trigger
100
			if r.senderMode {
101
				utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": trigger}
102
			}
103
		}
104
	}
105
	if utils.IsControllerNeeded("template") {
106
		//fix read from global cluster
107
		tms, err := utils.Cl.Template(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
108
		if err != nil {
109
			log.Errorf("[StorageController] Failed initialize %s", err.Error())
110
			return
111
		}
112
		for _, tm := range tms.Items {
113
			r.templates[tm.GetNamespacedName()] = tm
114
			if r.senderMode {
115
				utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": tm}
116
			}
117
		}
118
	}
119

120
	if utils.IsControllerNeeded("scope") {
121
		ss, err := utils.Cl.Scope(env.OperatorNamespace).List(ctx, metav1.ListOptions{})
122
		if err != nil {
123
			log.Errorf("[StorageController] Failed initialize %s", err.Error())
124
			return
125
		}
126
		for _, s := range ss.Items {
127
			r.scopes[s.GetNamespacedName()] = s
128
			if r.senderMode {
129
				utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": s}
130
			}
131
		}
132
	}
133
	r.mutex.Unlock()
134
}
135

136
func (r *StorageController) UpdateTrigger(trigger *v1alpha1.Trigger) {
137
	r.mutex.Lock()
138
	r.triggers[trigger.GetNamespacedName()] = *trigger
139
	r.mutex.Unlock()
140
	if r.senderMode {
141
		utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *trigger}
142
	}
143
}
144

145
func (r *StorageController) UpdateTemplate(template *v1alpha1.Template) {
146
	r.mutex.Lock()
147
	r.templates[template.GetNamespacedName()] = *template
148
	r.mutex.Unlock()
149

150
	if !r.senderMode {
151
		if template.Spec.Type == "rego" {
152
			r.UpdateRegoRule(template.GetNamespacedName(), template.Spec)
153
		} else {
154
			_, ok := r.regoRules[template.GetNamespacedName()]
155
			if ok {
156
				r.DeleteRegoRule(template.GetNamespacedName())
157
			}
158
		}
159
	}
160
	if r.senderMode {
161
		utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": *template}
162
	}
163
}
164

165
func (r *StorageController) UpdateRegoRule(namespacedName string, ts v1alpha1.TemplateSpec) {
166
	precompileRule, _ := opa.Precompile(ts.Data)
167
	r.mutex.Lock()
168
	r.regoRules[namespacedName] = precompileRule
169
	r.mutex.Unlock()
170
}
171

172
func (r *StorageController) UpdateScope(scope v1alpha1.Scope) {
173
	r.mutex.Lock()
174
	r.scopes[scope.GetNamespacedName()] = scope
175
	r.mutex.Unlock()
176
	if r.senderMode {
177
		utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": scope}
178
	}
179
}
180

181
func (r *StorageController) UpdateNamespace(namespace corev1.Namespace) {
182
	r.mutex.Lock()
183
	ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace.ObjectMeta.Name,
184
		Namespace: namespace.ObjectMeta.Namespace, Labels: namespace.ObjectMeta.Labels,
185
	}}
186
	r.namespaces[namespace.ObjectMeta.Name] = ns
187
	r.mutex.Unlock()
188
	if r.senderMode {
189
		utils.SyncAddNewObject <- map[string]interface{}{"type": "update", "obj": namespace}
190
	}
191
}
192

193
func (r *StorageController) DeleteTrigger(trigger *v1alpha1.Trigger) {
194
	r.mutex.Lock()
195
	delete(r.triggers, trigger.GetNamespacedName())
196
	r.mutex.Unlock()
197
	if r.senderMode {
198
		utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *trigger}
199
	}
200
}
201

202
func (r *StorageController) DeleteTemplate(template *v1alpha1.Template) {
203
	r.mutex.Lock()
204
	delete(r.templates, template.GetNamespacedName())
205
	r.mutex.Unlock()
206

207
	if !r.senderMode {
208
		_, ok := r.regoRules[template.GetNamespacedName()]
209
		if ok {
210
			r.DeleteRegoRule(template.GetNamespacedName())
211
		}
212
	}
213
	if r.senderMode {
214
		utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": *template}
215
	}
216
}
217

218
func (r *StorageController) DeleteRegoRule(namespacedName string) {
219
	r.mutex.Lock()
220
	delete(r.regoRules, namespacedName)
221
	r.mutex.Unlock()
222
}
223

224
func (r *StorageController) DeleteScope(scope v1alpha1.Scope) {
225
	r.mutex.Lock()
226
	delete(r.scopes, scope.GetNamespacedName())
227
	r.mutex.Unlock()
228
	if r.senderMode {
229
		utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": scope}
230
	}
231
}
232

233
func (r *StorageController) DeleteNamespace(namespace corev1.Namespace) {
234
	r.mutex.Lock()
235
	delete(r.namespaces, namespace.ObjectMeta.Name)
236
	r.mutex.Unlock()
237
	if r.senderMode {
238
		utils.SyncAddNewObject <- map[string]interface{}{"type": "remove", "obj": namespace}
239
	}
240
}
241

242
func (r *StorageController) CheckTriggerTemplatePair() {
243
	log := logger.FromContext(context.Background())
244
	for _, trigger := range r.triggers {
245
		for _, config := range trigger.Spec.CreationConfigs {
246
			for _, ref := range config.TemplateRefs {
247
				_, ok := r.templates[ref]
248
				if !ok {
249
					log.Errorf("StorageController: CheckTriggerTemplatePair Trigger[%s] contains invalid template ref %s", trigger.Name, ref)
250
				}
251
			}
252
		}
253
	}
254
}
255

256
func (r *StorageController) GetScopes() map[string]v1alpha1.Scope {
257
	defer r.mutex.Unlock()
258
	r.mutex.Lock()
259
	return r.scopes
260
}
261

262
func (r *StorageController) GetTemplates() map[string]v1alpha1.Template {
263
	defer r.mutex.Unlock()
264
	r.mutex.Lock()
265
	return r.templates
266
}
267

268
func (r *StorageController) GetRegoRules() map[string]rego.PreparedEvalQuery {
269
	defer r.mutex.Unlock()
270
	r.mutex.Lock()
271
	return r.regoRules
272
}
273

274
func (r *StorageController) GetTriggers() map[string]v1alpha1.Trigger {
275
	defer r.mutex.Unlock()
276
	r.mutex.Lock()
277
	return r.triggers
278
}
279

280
func (r *StorageController) GetNamespaces() map[string]corev1.Namespace {
281
	defer r.mutex.Unlock()
282
	r.mutex.Lock()
283
	return r.namespaces
284
}
285

286
func (r *StorageController) GetScopeItems(scType string) []v1alpha1.Item {
287
	defer r.mutex.Unlock()
288
	r.mutex.Lock()
289
	var result []v1alpha1.Item
290
	for _, val := range r.scopes {
291
		if val.Spec.Type == scType {
292
			result = append(result, val.Spec.Items...)
293
		}
294
	}
295

296
	return result
297
}
298

299
func (r *StorageController) HealthCheck() bool {
300
	return r.initedFlag
301
}
302

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.