kubelatte-ce
Форк от sbertech/kubelatte-ce
293 строки · 7.7 Кб
1package utils
2
3import (
4"context"
5"crypto/sha1"
6"encoding/hex"
7"fmt"
8"github.com/prometheus/client_golang/prometheus"
9"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/api/v1alpha1"
10cachev1alpha1 "gitverse.ru/ktrntrsv/kubelatte-ce/pkg/api/v1alpha1"
11"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/kubeapi"
12"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"
13"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/controllers/clientset"
14"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util"
15"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/env"
16types1 "gitverse.ru/ktrntrsv/kubelatte-ce/pkg/util/types"
17corev1 "k8s.io/api/core/v1"
18metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20"k8s.io/apimachinery/pkg/types"
21"k8s.io/client-go/kubernetes"
22"math/rand"
23"path/filepath"
24ctrl "sigs.k8s.io/controller-runtime"
25"strings"
26"sync"
27)
28
29var (
30Mux sync.Mutex
31EventOperation EventsType
32Cl clientset.V1Alpha1Interface
33IsInit = IsInitSync{Mu: sync.Mutex{}}
34kubeClient kubernetes.Interface
35)
36
37var (
38PmTemplatesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
39Name: "kblt_templates_total",
40Help: "The total number of processed templates",
41})
42PmTriggersTotal = prometheus.NewGauge(prometheus.GaugeOpts{
43Name: "kblt_triggers_total",
44Help: "The total number of processed triggers",
45})
46)
47
48type EventWithHash struct {
49EventType EventsType
50Kind string
51Name string
52Uid string
53Conditions []v1alpha1.Condition
54Spec any
55OldSpec any
56}
57
58var (
59FactoryCreateRequest = make(chan v1alpha1.TriggerInstance)
60
61WatcherUpdateResyncPeriod = make(chan uint64)
62WatcherAddNewObjectTo = make(chan v1alpha1.TriggerInstance)
63WatcherRemoveObjectFrom = make(chan v1alpha1.TriggerInstance)
64
65StorageUpdateTemplate = make(chan *v1alpha1.Template)
66StorageUpdateTrigger = make(chan *v1alpha1.Trigger)
67StorageUpdateScope = make(chan v1alpha1.Scope)
68StorageUpdateNamespace = make(chan corev1.Namespace)
69StorageDeleteTemplate = make(chan *v1alpha1.Template)
70StorageDeleteTrigger = make(chan *v1alpha1.Trigger)
71StorageDeleteScope = make(chan v1alpha1.Scope)
72StorageDeleteNamespace = make(chan corev1.Namespace)
73
74SyncUpdateReSyncPeriod = make(chan uint64)
75SyncAddNewObject = make(chan map[string]interface{})
76
77EventWithHashCh = make(chan EventWithHash)
78EventStartedLeader = make(chan struct{})
79)
80
81type IsInitSync struct {
82IsInit bool
83Mu sync.Mutex
84}
85
86type configsType string
87
88const (
89TriggerType configsType = "trigger"
90TemplateType configsType = "template"
91scopeType configsType = "rule"
92)
93
94type EventsType string
95
96const (
97AddEvent EventsType = "added"
98UpdateEvent EventsType = "updated"
99DeleteEvent EventsType = "removed"
100)
101
102const (
103LeaseLockName = "leader-operator"
104ServiceReceiver = "Openshift API server"
105FactoryResourceRemoved = "REMOVED"
106FactoryResourceCreateFailed = "Failed"
107)
108
109type ReconcilerInterface interface {
110Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
111SetupWithManager(mgr ctrl.Manager) error
112AddOrUpdateEventHndl(ctx context.Context, req ctrl.Request) (err error)
113HealthCheck() bool
114DeleteEventHndl(req ctrl.Request) error
115}
116
117type TemplateReconciler interface {
118ReconcilerInterface
119UpdateStatus(ctx context.Context, template v1alpha1.Template) (ctrl.Result, error)
120}
121type ScopeReconciler interface {
122ReconcilerInterface
123UpdateStatus(ctx context.Context, scope v1alpha1.Scope) (ctrl.Result, error)
124}
125
126type TriggerReconciler interface {
127ReconcilerInterface
128UpdateStatus(ctx context.Context, trigger v1alpha1.Trigger) (ctrl.Result, error)
129}
130
131type Status struct {
132TypeConfig string `yaml:"typeConfig"`
133NameConfig string `yaml:"nameConfig"`
134Status v1alpha1.ConditionStatus `yaml:"status"`
135Message string `yaml:"message"`
136}
137
138func GetRqUID(clusterName, namespace string, typeConf configsType, nameConf string) string {
139return filepath.Join(clusterName, namespace, string(typeConf), nameConf)
140}
141
142func PrometheusInit() {
143prometheus.MustRegister(PmTemplatesTotal)
144prometheus.MustRegister(PmTriggersTotal)
145}
146
147func CheckHashInStatus(obj []v1alpha1.Condition) bool {
148for _, condition := range obj {
149if condition.Type == v1alpha1.LastHandledHash {
150return true
151}
152}
153return false
154}
155
156func CompareHashInStatus(obj []v1alpha1.Condition, hash string) bool {
157for _, condition := range obj {
158if condition.Type == v1alpha1.LastHandledHash && strings.Compare(condition.Message, hash) == 0 {
159return false
160}
161}
162return true
163}
164
165func AddHashCondition(data []byte) v1alpha1.Condition {
166return v1alpha1.Condition{
167Type: v1alpha1.LastHandledHash,
168Status: v1alpha1.ConditionTrue,
169LastTransitionTime: metav1.Now(),
170Message: GetHash(data),
171}
172}
173
174func GetHash(data []byte) string {
175h := sha1.New()
176h.Write(data)
177sha1Hash := hex.EncodeToString(h.Sum(nil))
178return sha1Hash
179}
180
181func GetShortHash(message string) string {
182return GetHash([]byte(message))[0:10]
183}
184
185func GetRandomPart() string {
186return fmt.Sprintf("%d", rand.Int())
187}
188
189func IsControllerNeeded(name string) bool {
190switch name {
191case "scope":
192return util.IsFullRole() || env.KbltMutator || env.KbltValidator
193case "trigger":
194return util.IsFullRole() || env.KbltMutator || env.KbltCreator
195case "template":
196return util.IsFullRole() || env.KbltMutator || env.KbltCreator || env.KbltValidator
197case "triggerInstance":
198return util.IsFullRole() || env.KbltCreator
199}
200return false
201}
202
203func GetOwnerRef(obj map[string]interface{}) (*metav1.OwnerReference, error) {
204metadata, ok := obj["metadata"]
205if !ok {
206return nil, fmt.Errorf("GetOwnerRef failed. Metadata not found")
207}
208
209name, ok := metadata.(map[string]interface{})["name"]
210if !ok {
211return nil, fmt.Errorf("GetOwnerRef failed. Name not found")
212}
213
214uid, ok := metadata.(map[string]interface{})["uid"]
215if !ok {
216return nil, fmt.Errorf("GetOwnerRef failed. UID not found")
217}
218
219apiVersion, ok := obj["apiVersion"]
220if !ok {
221return nil, fmt.Errorf("GetOwnerRef failed. ApiVersion not found")
222}
223
224kind, ok := obj["kind"]
225if !ok {
226return nil, fmt.Errorf("GetOwnerRef failed. Kind not found")
227}
228
229return &metav1.OwnerReference{
230APIVersion: apiVersion.(string),
231Kind: kind.(string),
232Name: name.(string),
233UID: types.UID(uid.(string)),
234}, nil
235}
236
237func RemoveCondition(conditions []v1alpha1.Condition, ct v1alpha1.ConditionType) []v1alpha1.Condition {
238for i, cond := range conditions {
239if cond.Type == ct {
240return append(conditions[:i], conditions[i+1:]...)
241}
242}
243return conditions
244}
245
246func CheckNamespace(name string, selector *types1.NamespaceSelector) bool {
247log := logger.FromContext(context.Background())
248if selector == nil {
249log.Warn("Namespace selector is not set")
250return false
251}
252
253var clt kubernetes.Interface
254if kubeClient != nil {
255clt = kubeClient
256} else {
257clt = kubeapi.GetClient()
258}
259
260ns, err := clt.CoreV1().Namespaces().Get(context.Background(), name, v1.GetOptions{})
261if err != nil {
262log.Errorf("Can't fetch namespace data, error %s", err.Error())
263return false
264}
265return selector.MatchNamespace(ns)
266}
267
268func HandleUnhandledTIs(ns string) {
269if !env.KbltCreator {
270return
271}
272log := logger.FromContext(context.Background())
273
274triggerInstances, err := Cl.TriggerInstance().List(context.Background(), ns, metav1.ListOptions{})
275if err != nil {
276log.Errorf("[CheckUnhandledTIs] Failed List triggerInstances %s", err)
277return
278}
279
280for _, ti := range triggerInstances.Items {
281if len(ti.Status.ResourceStatus) != 0 { // if ti was handled already, skip it
282continue
283}
284
285handleTI(context.Background(), ti)
286}
287}
288
289func handleTI(ctx context.Context, ti cachev1alpha1.TriggerInstance) {
290log := logger.FromContext(ctx)
291log.Debugf("TI %v was not handled, start handling it", ti.Name)
292FactoryCreateRequest <- ti
293}
294