kubelatte-ce
Форк от sbertech/kubelatte-ce
336 строк · 10.9 Кб
1package creation
2
3import (
4"context"
5"encoding/json"
6errors2 "errors"
7"fmt"
8"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
9"gitverse.ru/synapse/kubelatte/pkg/kubeapi"
10"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
11utils "gitverse.ru/synapse/kubelatte/pkg/operator/utils"
12"gitverse.ru/synapse/kubelatte/pkg/util"
13"gitverse.ru/synapse/kubelatte/pkg/util/env"
14"k8s.io/apimachinery/pkg/api/errors"
15metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16"k8s.io/client-go/kubernetes"
17"sigs.k8s.io/yaml"
18)
19
20type FactoryI interface {
21StartFactory()
22StopFactory()
23Make(ctx context.Context, message v1alpha1.TriggerInstance) error
24}
25
26var ErrAllResourcesRemoved = errors2.New("err all resources removed for ti")
27
28type FactoryController struct {
29Client *kubernetes.Clientset
30renderer RendererI
31stop context.CancelFunc
32act FactoryI
33}
34
35func NewFactoryController(client *kubernetes.Clientset, renderer RendererI, act FactoryI) FactoryI {
36factory := &FactoryController{Client: client, renderer: renderer}
37if act == nil {
38act = factory
39}
40factory.act = act
41return factory
42}
43
44func (f *FactoryController) StopFactory() {
45f.stop()
46log := logger.FromContext(context.Background())
47log.Debug("FactoryController: Stop")
48}
49
50func (f *FactoryController) StartFactory() {
51log := logger.FromContext(context.Background())
52log.Debug("FactoryController: Start")
53ctx, stop := context.WithCancel(context.Background())
54f.stop = stop
55go f.startToListen(ctx)
56}
57
58func (f *FactoryController) startToListen(ctx context.Context) {
59log := logger.FromContext(ctx)
60for {
61select {
62case v := <-utils.FactoryCreateRequest:
63go func() {
64err := f.act.Make(ctx, v)
65if err != nil {
66log.Error(err.Error())
67}
68}()
69case <-ctx.Done():
70return
71}
72}
73}
74
75func (f *FactoryController) Make(ctx context.Context, tiObject v1alpha1.TriggerInstance) (err error) {
76log := logger.FromContext(ctx)
77log.Infof("FactoryController: Make %s", tiObject.GetNamespacedName())
78
79if len(tiObject.Status.ResourceStatus) > 0 {
80// resources were created only check status and watch
81allRemoved := true
82for _, resource := range tiObject.Status.ResourceStatus {
83if resource.Status != utils.FactoryResourceRemoved {
84allRemoved = false
85}
86}
87if allRemoved {
88return fmt.Errorf("FactoryController: %w %s, ignore instance", ErrAllResourcesRemoved, tiObject.GetNamespacedName())
89}
90// one or more resources are alive so keep watching
91} else {
92// resources were not created, full logic
93if tiObject.Spec.Data != "" {
94var dataObj map[string]interface{}
95unErr := yaml.Unmarshal([]byte(tiObject.Spec.Data), &dataObj)
96if unErr != nil {
97return errors2.Join(util.ErrUnmarshalling)
98}
99tiObject.Spec.DataObj = dataObj
100}
101
102renders := f.renderer.GetRenders(ctx, &tiObject)
103for _, templateRender := range renders {
104creErr := f.createResource(ctx, &tiObject, templateRender)
105if creErr != nil {
106log.Error(creErr.Error())
107err = errors2.Join(creErr)
108}
109}
110}
111
112utils.WatcherAddNewObjectTo <- tiObject
113return err
114}
115
116func (f *FactoryController) checkIfProcessed(name string, ti *v1alpha1.TriggerInstance, checkIsRemoved bool) bool {
117for i := 0; i < len(ti.Status.ResourceStatus); i++ {
118if ti.Status.ResourceStatus[i].Name == name {
119if checkIsRemoved && ti.Status.ResourceStatus[i].Status == utils.FactoryResourceRemoved {
120return true
121} else if !checkIsRemoved {
122return true
123}
124}
125}
126return false
127}
128
129func (f *FactoryController) getResourceName(tiName, kind string, render util.RenderItem) (string, error) {
130resourceName := tiName
131
132if render.CreationIsUniqueName {
133resourceName = fmt.Sprintf("%s-%s", tiName, utils.GetShortHash(utils.GetRandomPart()+render.Template.Name+kind))
134} else {
135var template map[string]interface{}
136err := yaml.Unmarshal([]byte(render.Render), &template)
137if err != nil {
138return "", err
139}
140
141metadata, ok := template["metadata"]
142if ok && metadata != nil {
143name, ok := metadata.(map[string]interface{})["name"].(string)
144if ok {
145resourceName = name
146}
147}
148}
149
150return resourceName, nil
151}
152
153func (f *FactoryController) getResourceFromKubeApi(ctx context.Context, kind, apiVersion, name, namespace string) ([]byte, error) {
154resourceMeta := kubeapi.GetResourceMeta(f.Client, kind, apiVersion)
155if resourceMeta == nil {
156return nil, fmt.Errorf("failed to get meta for resource : ns %s; apiVersion %s; kind %s ", namespace, apiVersion, kind)
157}
158
159res, err := kubeapi.GetResource(ctx, resourceMeta, name, namespace)
160return res, err
161}
162
163func (f *FactoryController) createResource(ctx context.Context, tiObject *v1alpha1.TriggerInstance, render util.RenderItem) error {
164log := logger.FromContext(ctx)
165
166kind := render.Template.Spec.Kind
167version := render.Template.Spec.ApiVersion
168
169resourceName, err := f.getResourceName(tiObject.Name, kind, render)
170if err != nil {
171return f.returnErrorWithUpdateStatus(ctx, tiObject,
172version, kind, resourceName,
173fmt.Errorf("FactoryController: failed to get name for resource : err %s, version %s, r %s ", err.Error(), version, kind),
174)
175}
176
177namespaceName := tiObject.Namespace
178_, err = f.getResourceFromKubeApi(ctx, kind, version, resourceName, namespaceName)
179if err != nil {
180if !errors.IsNotFound(err) {
181return f.returnErrorWithUpdateStatus(ctx, tiObject,
182version, kind, resourceName,
183fmt.Errorf("FactoryController: failed to check resource in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error()),
184)
185}
186} else {
187// resource exist so we can not move next
188log.Errorf("FactoryController: Resource %s kind %s exist in ns %s, fail creation", resourceName, kind, namespaceName)
189return nil
190}
191
192template, err := f.modifyTemplate(ctx, tiObject, render, version, kind, resourceName, namespaceName)
193if err != nil {
194return err
195}
196
197log.Debugf("FactoryController: Resource %s kind %s \nbody %v", resourceName, kind, template)
198
199if f.checkIfProcessed(resourceName, tiObject, false) {
200log.Warnf("FactoryController: Object %s already processed", resourceName)
201return nil
202}
203
204resourceMeta := kubeapi.GetResourceMeta(f.Client, kind, version)
205if resourceMeta == nil {
206return f.returnErrorWithUpdateStatus(ctx, tiObject,
207version, kind, resourceName,
208fmt.Errorf("FactoryController: failed to get meta for resource : ns %s v %s r %s ", namespaceName, version, kind),
209)
210}
211
212jsBody, err := json.Marshal(template)
213
214var object map[string]interface{}
215raw, err, fullErr := kubeapi.CreateResourceInKubeApi(ctx, f.Client.RESTClient(), resourceMeta, jsBody, namespaceName)
216
217log.Debugf("FactoryController: RAW Resource %s kind %s \nbody %v", resourceName, kind, string(raw))
218
219if err != nil {
220resourceMeta = kubeapi.InvalidateCacheAndTryGetResourceMeta(f.Client, kind, version)
221
222if resourceMeta == nil {
223return f.returnErrorWithUpdateStatus(ctx, tiObject,
224version, kind, resourceName,
225fmt.Errorf("FactoryController: failed to get meta for resource : ns %s v %s r %s ", namespaceName, version, kind),
226)
227}
228
229raw, err, fullErr = kubeapi.CreateResourceInKubeApi(ctx, f.Client.RESTClient(), resourceMeta, jsBody, namespaceName)
230log.Debugf("FactoryController: Updated RAW Resource %s kind %s \nbody %v", resourceName, kind, string(raw))
231if err != nil {
232return f.returnErrorWithUpdateStatus(ctx, tiObject,
233version, kind, resourceName,
234fmt.Errorf("FactoryController: Create resource failed in: ns %s v %s r %s error %s", namespaceName, version, kind, fullErr),
235)
236}
237}
238err = json.Unmarshal(raw, &object)
239if err != nil {
240return f.returnErrorWithUpdateStatus(ctx, tiObject,
241version, kind, resourceName,
242fmt.Errorf("FactoryController: Unmarshal resource failed in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error()),
243)
244}
245
246var status map[string]interface{}
247phase := "Apply"
248
249status, ok := object["status"].(map[string]interface{})
250if ok {
251phs, ok := status["phase"]
252if ok {
253phase = phs.(string)
254}
255}
256
257data, err := yaml.Marshal(status)
258if err != nil {
259log.Errorf("FactoryController: Update trigger status failed: %s", err.Error())
260}
261
262f.updateTIResourceStatus(ctx, tiObject, resourceName, "", phase, string(data), kind, version)
263
264return nil
265}
266
267func (f *FactoryController) modifyTemplate(ctx context.Context, tiObject *v1alpha1.TriggerInstance, render util.RenderItem, version string, kind string, resourceName string, namespaceName string) (map[string]interface{}, error) {
268var template map[string]interface{}
269err := yaml.Unmarshal([]byte(render.Render), &template)
270if err != nil {
271return nil, fmt.Errorf("FactoryController: failed to unmarshal template resource in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error())
272}
273
274metadata, ok := template["metadata"]
275if !ok || metadata == nil {
276template["metadata"] = make(map[string]interface{})
277metadata = template["metadata"]
278}
279
280metadata.(map[string]interface{})["name"] = resourceName
281metadata.(map[string]interface{})["namespace"] = namespaceName
282metadata.(map[string]interface{})["ownerReferences"] = []map[string]string{
2830: {
284"apiVersion": tiObject.APIVersion,
285"kind": tiObject.Kind,
286"name": tiObject.Name,
287"uid": string(tiObject.UID),
288},
289}
290
291labels, ok := metadata.(map[string]interface{})["labels"]
292if !ok || labels == nil {
293template["metadata"].(map[string]interface{})["labels"] = make(map[string]interface{})
294labels = template["metadata"].(map[string]interface{})["labels"]
295}
296
297labels.(map[string]interface{})[env.KbltCreatorLabelKey] = env.KbltCreatorLabelValue
298if tiObject.Labels != nil {
299for k, v := range tiObject.Labels {
300labels.(map[string]interface{})[k] = v
301}
302}
303
304annotations, ok := metadata.(map[string]interface{})["annotations"]
305if !ok || annotations == nil {
306template["metadata"].(map[string]interface{})["annotations"] = make(map[string]interface{})
307annotations = template["metadata"].(map[string]interface{})["annotations"]
308}
309if tiObject.Annotations != nil {
310for k, v := range tiObject.Annotations {
311annotations.(map[string]interface{})[k] = v
312}
313}
314return template, err
315}
316
317func (f *FactoryController) returnErrorWithUpdateStatus(ctx context.Context, tiObject *v1alpha1.TriggerInstance, version string, kind string, resourceName string, err error) error {
318f.updateTIResourceStatus(ctx, tiObject, resourceName, err.Error(), utils.FactoryResourceCreateFailed, "", kind, version)
319return err
320}
321
322func (f *FactoryController) updateTIResourceStatus(ctx context.Context, tiObject *v1alpha1.TriggerInstance, resourceName string, message string, phase string, data string, kind string, version string) {
323log := logger.FromContext(ctx)
324
325tiActualObject, err := utils.Cl.TriggerInstance().Get(ctx, tiObject.Namespace, tiObject.Name, metav1.GetOptions{})
326if err != nil {
327log.Errorf("FactoryController: tiActualObject cant get: %s", err.Error())
328return
329}
330tiActualObject.UpdateResourceStatus(resourceName, message, phase, data, kind, version)
331_, err = utils.Cl.TriggerInstance().UpdateStatus(ctx, tiObject.Namespace, tiActualObject, metav1.UpdateOptions{})
332if err != nil {
333log.Errorf("FactoryController: failed to updateTIResourceStatus: %s", err.Error())
334return
335}
336}
337