kubelatte-ce
Форк от sbertech/kubelatte-ce
293 строки · 7.7 Кб
1package utils2
3import (4"context"5"crypto/sha1"6"encoding/hex"7"fmt"8"github.com/prometheus/client_golang/prometheus"9"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"10cachev1alpha1 "gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"11"gitverse.ru/synapse/kubelatte/pkg/kubeapi"12"gitverse.ru/synapse/kubelatte/pkg/observability/logger"13"gitverse.ru/synapse/kubelatte/pkg/operator/controllers/clientset"14"gitverse.ru/synapse/kubelatte/pkg/util"15"gitverse.ru/synapse/kubelatte/pkg/util/env"16types1 "gitverse.ru/synapse/kubelatte/pkg/util/types"17corev1 "k8s.io/api/core/v1"18metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"19v1 "k8s.io/apimachinery/pkg/apis/meta/v1"20"k8s.io/apimachinery/pkg/types"21"k8s.io/client-go/kubernetes"22"math/rand"23"path/filepath"24ctrl "sigs.k8s.io/controller-runtime"25"strings"26"sync"27)
28
29var (30Mux sync.Mutex31EventOperation EventsType
32Cl clientset.V1Alpha1Interface33IsInit = IsInitSync{Mu: sync.Mutex{}}34kubeClient kubernetes.Interface35)
36
37var (38PmTemplatesTotal = prometheus.NewGauge(prometheus.GaugeOpts{39Name: "kblt_templates_total",40Help: "The total number of processed templates",41})42PmTriggersTotal = prometheus.NewGauge(prometheus.GaugeOpts{43Name: "kblt_triggers_total",44Help: "The total number of processed triggers",45})46)
47
48type EventWithHash struct {49EventType EventsType
50Kind string51Name string52Uid string53Conditions []v1alpha1.Condition54Spec any
55OldSpec any
56}
57
58var (59FactoryCreateRequest = make(chan v1alpha1.TriggerInstance)60
61WatcherUpdateResyncPeriod = make(chan uint64)62WatcherAddNewObjectTo = make(chan v1alpha1.TriggerInstance)63WatcherRemoveObjectFrom = make(chan v1alpha1.TriggerInstance)64
65StorageUpdateTemplate = make(chan *v1alpha1.Template)66StorageUpdateTrigger = make(chan *v1alpha1.Trigger)67StorageUpdateScope = make(chan v1alpha1.Scope)68StorageUpdateNamespace = make(chan corev1.Namespace)69StorageDeleteTemplate = make(chan *v1alpha1.Template)70StorageDeleteTrigger = make(chan *v1alpha1.Trigger)71StorageDeleteScope = make(chan v1alpha1.Scope)72StorageDeleteNamespace = make(chan corev1.Namespace)73
74SyncUpdateReSyncPeriod = make(chan uint64)75SyncAddNewObject = make(chan map[string]interface{})76
77EventWithHashCh = make(chan EventWithHash)78EventStartedLeader = make(chan struct{})79)
80
81type IsInitSync struct {82IsInit bool83Mu sync.Mutex84}
85
86type configsType string87
88const (89TriggerType configsType = "trigger"90TemplateType configsType = "template"91scopeType configsType = "rule"92)
93
94type EventsType string95
96const (97AddEvent EventsType = "added"98UpdateEvent EventsType = "updated"99DeleteEvent EventsType = "removed"100)
101
102const (103LeaseLockName = "leader-operator"104ServiceReceiver = "Openshift API server"105FactoryResourceRemoved = "REMOVED"106FactoryResourceCreateFailed = "Failed"107)
108
109type ReconcilerInterface interface {110Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)111SetupWithManager(mgr ctrl.Manager) error112AddOrUpdateEventHndl(ctx context.Context, req ctrl.Request) (err error)113HealthCheck() bool114DeleteEventHndl(req ctrl.Request) error115}
116
117type TemplateReconciler interface {118ReconcilerInterface
119UpdateStatus(ctx context.Context, template v1alpha1.Template) (ctrl.Result, error)120}
121type ScopeReconciler interface {122ReconcilerInterface
123UpdateStatus(ctx context.Context, scope v1alpha1.Scope) (ctrl.Result, error)124}
125
126type TriggerReconciler interface {127ReconcilerInterface
128UpdateStatus(ctx context.Context, trigger v1alpha1.Trigger) (ctrl.Result, error)129}
130
131type Status struct {132TypeConfig string `yaml:"typeConfig"`133NameConfig string `yaml:"nameConfig"`134Status v1alpha1.ConditionStatus `yaml:"status"`135Message string `yaml:"message"`136}
137
138func GetRqUID(clusterName, namespace string, typeConf configsType, nameConf string) string {139return filepath.Join(clusterName, namespace, string(typeConf), nameConf)140}
141
142func PrometheusInit() {143prometheus.MustRegister(PmTemplatesTotal)144prometheus.MustRegister(PmTriggersTotal)145}
146
147func CheckHashInStatus(obj []v1alpha1.Condition) bool {148for _, condition := range obj {149if condition.Type == v1alpha1.LastHandledHash {150return true151}152}153return false154}
155
156func CompareHashInStatus(obj []v1alpha1.Condition, hash string) bool {157for _, condition := range obj {158if condition.Type == v1alpha1.LastHandledHash && strings.Compare(condition.Message, hash) == 0 {159return false160}161}162return true163}
164
165func AddHashCondition(data []byte) v1alpha1.Condition {166return v1alpha1.Condition{167Type: v1alpha1.LastHandledHash,168Status: v1alpha1.ConditionTrue,169LastTransitionTime: metav1.Now(),170Message: GetHash(data),171}172}
173
174func GetHash(data []byte) string {175h := sha1.New()176h.Write(data)177sha1Hash := hex.EncodeToString(h.Sum(nil))178return sha1Hash179}
180
181func GetShortHash(message string) string {182return GetHash([]byte(message))[0:10]183}
184
185func GetRandomPart() string {186return fmt.Sprintf("%d", rand.Int())187}
188
189func IsControllerNeeded(name string) bool {190switch name {191case "scope":192return util.IsFullRole() || env.KbltMutator || env.KbltValidator193case "trigger":194return util.IsFullRole() || env.KbltMutator || env.KbltCreator195case "template":196return util.IsFullRole() || env.KbltMutator || env.KbltCreator || env.KbltValidator197case "triggerInstance":198return util.IsFullRole() || env.KbltCreator199}200return false201}
202
203func GetOwnerRef(obj map[string]interface{}) (*metav1.OwnerReference, error) {204metadata, ok := obj["metadata"]205if !ok {206return nil, fmt.Errorf("GetOwnerRef failed. Metadata not found")207}208
209name, ok := metadata.(map[string]interface{})["name"]210if !ok {211return nil, fmt.Errorf("GetOwnerRef failed. Name not found")212}213
214uid, ok := metadata.(map[string]interface{})["uid"]215if !ok {216return nil, fmt.Errorf("GetOwnerRef failed. UID not found")217}218
219apiVersion, ok := obj["apiVersion"]220if !ok {221return nil, fmt.Errorf("GetOwnerRef failed. ApiVersion not found")222}223
224kind, ok := obj["kind"]225if !ok {226return nil, fmt.Errorf("GetOwnerRef failed. Kind not found")227}228
229return &metav1.OwnerReference{230APIVersion: apiVersion.(string),231Kind: kind.(string),232Name: name.(string),233UID: types.UID(uid.(string)),234}, nil235}
236
237func RemoveCondition(conditions []v1alpha1.Condition, ct v1alpha1.ConditionType) []v1alpha1.Condition {238for i, cond := range conditions {239if cond.Type == ct {240return append(conditions[:i], conditions[i+1:]...)241}242}243return conditions244}
245
246func CheckNamespace(name string, selector *types1.NamespaceSelector) bool {247log := logger.FromContext(context.Background())248if selector == nil {249log.Warn("Namespace selector is not set")250return false251}252
253var clt kubernetes.Interface254if kubeClient != nil {255clt = kubeClient256} else {257clt = kubeapi.GetClient()258}259
260ns, err := clt.CoreV1().Namespaces().Get(context.Background(), name, v1.GetOptions{})261if err != nil {262log.Errorf("Can't fetch namespace data, error %s", err.Error())263return false264}265return selector.MatchNamespace(ns)266}
267
268func HandleUnhandledTIs(ns string) {269if !env.KbltCreator {270return271}272log := logger.FromContext(context.Background())273
274triggerInstances, err := Cl.TriggerInstance().List(context.Background(), ns, metav1.ListOptions{})275if err != nil {276log.Errorf("[CheckUnhandledTIs] Failed List triggerInstances %s", err)277return278}279
280for _, ti := range triggerInstances.Items {281if len(ti.Status.ResourceStatus) != 0 { // if ti was handled already, skip it282continue283}284
285handleTI(context.Background(), ti)286}287}
288
289func handleTI(ctx context.Context, ti cachev1alpha1.TriggerInstance) {290log := logger.FromContext(ctx)291log.Debugf("TI %v was not handled, start handling it", ti.Name)292FactoryCreateRequest <- ti293}
294