kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
293 строки · 7.7 Кб
1
package utils
2

3
import (
4
	"context"
5
	"crypto/sha1"
6
	"encoding/hex"
7
	"fmt"
8
	"github.com/prometheus/client_golang/prometheus"
9
	"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
10
	cachev1alpha1 "gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
11
	"gitverse.ru/synapse/kubelatte/pkg/kubeapi"
12
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
13
	"gitverse.ru/synapse/kubelatte/pkg/operator/controllers/clientset"
14
	"gitverse.ru/synapse/kubelatte/pkg/util"
15
	"gitverse.ru/synapse/kubelatte/pkg/util/env"
16
	types1 "gitverse.ru/synapse/kubelatte/pkg/util/types"
17
	corev1 "k8s.io/api/core/v1"
18
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
	"k8s.io/apimachinery/pkg/types"
21
	"k8s.io/client-go/kubernetes"
22
	"math/rand"
23
	"path/filepath"
24
	ctrl "sigs.k8s.io/controller-runtime"
25
	"strings"
26
	"sync"
27
)
28

29
var (
30
	Mux            sync.Mutex
31
	EventOperation EventsType
32
	Cl             clientset.V1Alpha1Interface
33
	IsInit         = IsInitSync{Mu: sync.Mutex{}}
34
	kubeClient     kubernetes.Interface
35
)
36

37
var (
38
	PmTemplatesTotal = prometheus.NewGauge(prometheus.GaugeOpts{
39
		Name: "kblt_templates_total",
40
		Help: "The total number of processed templates",
41
	})
42
	PmTriggersTotal = prometheus.NewGauge(prometheus.GaugeOpts{
43
		Name: "kblt_triggers_total",
44
		Help: "The total number of processed triggers",
45
	})
46
)
47

48
type EventWithHash struct {
49
	EventType  EventsType
50
	Kind       string
51
	Name       string
52
	Uid        string
53
	Conditions []v1alpha1.Condition
54
	Spec       any
55
	OldSpec    any
56
}
57

58
var (
59
	FactoryCreateRequest = make(chan v1alpha1.TriggerInstance)
60

61
	WatcherUpdateResyncPeriod = make(chan uint64)
62
	WatcherAddNewObjectTo     = make(chan v1alpha1.TriggerInstance)
63
	WatcherRemoveObjectFrom   = make(chan v1alpha1.TriggerInstance)
64

65
	StorageUpdateTemplate  = make(chan *v1alpha1.Template)
66
	StorageUpdateTrigger   = make(chan *v1alpha1.Trigger)
67
	StorageUpdateScope     = make(chan v1alpha1.Scope)
68
	StorageUpdateNamespace = make(chan corev1.Namespace)
69
	StorageDeleteTemplate  = make(chan *v1alpha1.Template)
70
	StorageDeleteTrigger   = make(chan *v1alpha1.Trigger)
71
	StorageDeleteScope     = make(chan v1alpha1.Scope)
72
	StorageDeleteNamespace = make(chan corev1.Namespace)
73

74
	SyncUpdateReSyncPeriod = make(chan uint64)
75
	SyncAddNewObject       = make(chan map[string]interface{})
76

77
	EventWithHashCh    = make(chan EventWithHash)
78
	EventStartedLeader = make(chan struct{})
79
)
80

81
type IsInitSync struct {
82
	IsInit bool
83
	Mu     sync.Mutex
84
}
85

86
type configsType string
87

88
const (
89
	TriggerType  configsType = "trigger"
90
	TemplateType configsType = "template"
91
	scopeType    configsType = "rule"
92
)
93

94
type EventsType string
95

96
const (
97
	AddEvent    EventsType = "added"
98
	UpdateEvent EventsType = "updated"
99
	DeleteEvent EventsType = "removed"
100
)
101

102
const (
103
	LeaseLockName               = "leader-operator"
104
	ServiceReceiver             = "Openshift API server"
105
	FactoryResourceRemoved      = "REMOVED"
106
	FactoryResourceCreateFailed = "Failed"
107
)
108

109
type ReconcilerInterface interface {
110
	Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
111
	SetupWithManager(mgr ctrl.Manager) error
112
	AddOrUpdateEventHndl(ctx context.Context, req ctrl.Request) (err error)
113
	HealthCheck() bool
114
	DeleteEventHndl(req ctrl.Request) error
115
}
116

117
type TemplateReconciler interface {
118
	ReconcilerInterface
119
	UpdateStatus(ctx context.Context, template v1alpha1.Template) (ctrl.Result, error)
120
}
121
type ScopeReconciler interface {
122
	ReconcilerInterface
123
	UpdateStatus(ctx context.Context, scope v1alpha1.Scope) (ctrl.Result, error)
124
}
125

126
type TriggerReconciler interface {
127
	ReconcilerInterface
128
	UpdateStatus(ctx context.Context, trigger v1alpha1.Trigger) (ctrl.Result, error)
129
}
130

131
type Status struct {
132
	TypeConfig string                   `yaml:"typeConfig"`
133
	NameConfig string                   `yaml:"nameConfig"`
134
	Status     v1alpha1.ConditionStatus `yaml:"status"`
135
	Message    string                   `yaml:"message"`
136
}
137

138
func GetRqUID(clusterName, namespace string, typeConf configsType, nameConf string) string {
139
	return filepath.Join(clusterName, namespace, string(typeConf), nameConf)
140
}
141

142
func PrometheusInit() {
143
	prometheus.MustRegister(PmTemplatesTotal)
144
	prometheus.MustRegister(PmTriggersTotal)
145
}
146

147
func CheckHashInStatus(obj []v1alpha1.Condition) bool {
148
	for _, condition := range obj {
149
		if condition.Type == v1alpha1.LastHandledHash {
150
			return true
151
		}
152
	}
153
	return false
154
}
155

156
func CompareHashInStatus(obj []v1alpha1.Condition, hash string) bool {
157
	for _, condition := range obj {
158
		if condition.Type == v1alpha1.LastHandledHash && strings.Compare(condition.Message, hash) == 0 {
159
			return false
160
		}
161
	}
162
	return true
163
}
164

165
func AddHashCondition(data []byte) v1alpha1.Condition {
166
	return v1alpha1.Condition{
167
		Type:               v1alpha1.LastHandledHash,
168
		Status:             v1alpha1.ConditionTrue,
169
		LastTransitionTime: metav1.Now(),
170
		Message:            GetHash(data),
171
	}
172
}
173

174
func GetHash(data []byte) string {
175
	h := sha1.New()
176
	h.Write(data)
177
	sha1Hash := hex.EncodeToString(h.Sum(nil))
178
	return sha1Hash
179
}
180

181
func GetShortHash(message string) string {
182
	return GetHash([]byte(message))[0:10]
183
}
184

185
func GetRandomPart() string {
186
	return fmt.Sprintf("%d", rand.Int())
187
}
188

189
func IsControllerNeeded(name string) bool {
190
	switch name {
191
	case "scope":
192
		return util.IsFullRole() || env.KbltMutator || env.KbltValidator
193
	case "trigger":
194
		return util.IsFullRole() || env.KbltMutator || env.KbltCreator
195
	case "template":
196
		return util.IsFullRole() || env.KbltMutator || env.KbltCreator || env.KbltValidator
197
	case "triggerInstance":
198
		return util.IsFullRole() || env.KbltCreator
199
	}
200
	return false
201
}
202

203
func GetOwnerRef(obj map[string]interface{}) (*metav1.OwnerReference, error) {
204
	metadata, ok := obj["metadata"]
205
	if !ok {
206
		return nil, fmt.Errorf("GetOwnerRef failed. Metadata not found")
207
	}
208

209
	name, ok := metadata.(map[string]interface{})["name"]
210
	if !ok {
211
		return nil, fmt.Errorf("GetOwnerRef failed. Name not found")
212
	}
213

214
	uid, ok := metadata.(map[string]interface{})["uid"]
215
	if !ok {
216
		return nil, fmt.Errorf("GetOwnerRef failed. UID not found")
217
	}
218

219
	apiVersion, ok := obj["apiVersion"]
220
	if !ok {
221
		return nil, fmt.Errorf("GetOwnerRef failed. ApiVersion not found")
222
	}
223

224
	kind, ok := obj["kind"]
225
	if !ok {
226
		return nil, fmt.Errorf("GetOwnerRef failed. Kind not found")
227
	}
228

229
	return &metav1.OwnerReference{
230
		APIVersion: apiVersion.(string),
231
		Kind:       kind.(string),
232
		Name:       name.(string),
233
		UID:        types.UID(uid.(string)),
234
	}, nil
235
}
236

237
func RemoveCondition(conditions []v1alpha1.Condition, ct v1alpha1.ConditionType) []v1alpha1.Condition {
238
	for i, cond := range conditions {
239
		if cond.Type == ct {
240
			return append(conditions[:i], conditions[i+1:]...)
241
		}
242
	}
243
	return conditions
244
}
245

246
func CheckNamespace(name string, selector *types1.NamespaceSelector) bool {
247
	log := logger.FromContext(context.Background())
248
	if selector == nil {
249
		log.Warn("Namespace selector is not set")
250
		return false
251
	}
252

253
	var clt kubernetes.Interface
254
	if kubeClient != nil {
255
		clt = kubeClient
256
	} else {
257
		clt = kubeapi.GetClient()
258
	}
259

260
	ns, err := clt.CoreV1().Namespaces().Get(context.Background(), name, v1.GetOptions{})
261
	if err != nil {
262
		log.Errorf("Can't fetch namespace data, error %s", err.Error())
263
		return false
264
	}
265
	return selector.MatchNamespace(ns)
266
}
267

268
func HandleUnhandledTIs(ns string) {
269
	if !env.KbltCreator {
270
		return
271
	}
272
	log := logger.FromContext(context.Background())
273

274
	triggerInstances, err := Cl.TriggerInstance().List(context.Background(), ns, metav1.ListOptions{})
275
	if err != nil {
276
		log.Errorf("[CheckUnhandledTIs] Failed List triggerInstances %s", err)
277
		return
278
	}
279

280
	for _, ti := range triggerInstances.Items {
281
		if len(ti.Status.ResourceStatus) != 0 { // if ti was handled already, skip it
282
			continue
283
		}
284

285
		handleTI(context.Background(), ti)
286
	}
287
}
288

289
func handleTI(ctx context.Context, ti cachev1alpha1.TriggerInstance) {
290
	log := logger.FromContext(ctx)
291
	log.Debugf("TI %v was not handled, start handling it", ti.Name)
292
	FactoryCreateRequest <- ti
293
}
294

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.