kubelatte-ce

Форк
2
Форк от sbertech/kubelatte-ce
336 строк · 10.9 Кб
1
package creation
2

3
import (
4
	"context"
5
	"encoding/json"
6
	errors2 "errors"
7
	"fmt"
8
	"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
9
	"gitverse.ru/synapse/kubelatte/pkg/kubeapi"
10
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
11
	utils "gitverse.ru/synapse/kubelatte/pkg/operator/utils"
12
	"gitverse.ru/synapse/kubelatte/pkg/util"
13
	"gitverse.ru/synapse/kubelatte/pkg/util/env"
14
	"k8s.io/apimachinery/pkg/api/errors"
15
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
	"k8s.io/client-go/kubernetes"
17
	"sigs.k8s.io/yaml"
18
)
19

20
type FactoryI interface {
21
	StartFactory()
22
	StopFactory()
23
	Make(ctx context.Context, message v1alpha1.TriggerInstance) error
24
}
25

26
var ErrAllResourcesRemoved = errors2.New("err all resources removed for ti")
27

28
type FactoryController struct {
29
	Client   *kubernetes.Clientset
30
	renderer RendererI
31
	stop     context.CancelFunc
32
	act      FactoryI
33
}
34

35
func NewFactoryController(client *kubernetes.Clientset, renderer RendererI, act FactoryI) FactoryI {
36
	factory := &FactoryController{Client: client, renderer: renderer}
37
	if act == nil {
38
		act = factory
39
	}
40
	factory.act = act
41
	return factory
42
}
43

44
func (f *FactoryController) StopFactory() {
45
	f.stop()
46
	log := logger.FromContext(context.Background())
47
	log.Debug("FactoryController: Stop")
48
}
49

50
func (f *FactoryController) StartFactory() {
51
	log := logger.FromContext(context.Background())
52
	log.Debug("FactoryController: Start")
53
	ctx, stop := context.WithCancel(context.Background())
54
	f.stop = stop
55
	go f.startToListen(ctx)
56
}
57

58
func (f *FactoryController) startToListen(ctx context.Context) {
59
	log := logger.FromContext(ctx)
60
	for {
61
		select {
62
		case v := <-utils.FactoryCreateRequest:
63
			go func() {
64
				err := f.act.Make(ctx, v)
65
				if err != nil {
66
					log.Error(err.Error())
67
				}
68
			}()
69
		case <-ctx.Done():
70
			return
71
		}
72
	}
73
}
74

75
func (f *FactoryController) Make(ctx context.Context, tiObject v1alpha1.TriggerInstance) (err error) {
76
	log := logger.FromContext(ctx)
77
	log.Infof("FactoryController: Make %s", tiObject.GetNamespacedName())
78

79
	if len(tiObject.Status.ResourceStatus) > 0 {
80
		// resources were created only check status and watch
81
		allRemoved := true
82
		for _, resource := range tiObject.Status.ResourceStatus {
83
			if resource.Status != utils.FactoryResourceRemoved {
84
				allRemoved = false
85
			}
86
		}
87
		if allRemoved {
88
			return fmt.Errorf("FactoryController: %w %s, ignore instance", ErrAllResourcesRemoved, tiObject.GetNamespacedName())
89
		}
90
		// one or more resources are alive so keep watching
91
	} else {
92
		// resources were not created, full logic
93
		if tiObject.Spec.Data != "" {
94
			var dataObj map[string]interface{}
95
			unErr := yaml.Unmarshal([]byte(tiObject.Spec.Data), &dataObj)
96
			if unErr != nil {
97
				return errors2.Join(util.ErrUnmarshalling)
98
			}
99
			tiObject.Spec.DataObj = dataObj
100
		}
101

102
		renders := f.renderer.GetRenders(ctx, &tiObject)
103
		for _, templateRender := range renders {
104
			creErr := f.createResource(ctx, &tiObject, templateRender)
105
			if creErr != nil {
106
				log.Error(creErr.Error())
107
				err = errors2.Join(creErr)
108
			}
109
		}
110
	}
111

112
	utils.WatcherAddNewObjectTo <- tiObject
113
	return err
114
}
115

116
func (f *FactoryController) checkIfProcessed(name string, ti *v1alpha1.TriggerInstance, checkIsRemoved bool) bool {
117
	for i := 0; i < len(ti.Status.ResourceStatus); i++ {
118
		if ti.Status.ResourceStatus[i].Name == name {
119
			if checkIsRemoved && ti.Status.ResourceStatus[i].Status == utils.FactoryResourceRemoved {
120
				return true
121
			} else if !checkIsRemoved {
122
				return true
123
			}
124
		}
125
	}
126
	return false
127
}
128

129
func (f *FactoryController) getResourceName(tiName, kind string, render util.RenderItem) (string, error) {
130
	resourceName := tiName
131

132
	if render.CreationIsUniqueName {
133
		resourceName = fmt.Sprintf("%s-%s", tiName, utils.GetShortHash(utils.GetRandomPart()+render.Template.Name+kind))
134
	} else {
135
		var template map[string]interface{}
136
		err := yaml.Unmarshal([]byte(render.Render), &template)
137
		if err != nil {
138
			return "", err
139
		}
140

141
		metadata, ok := template["metadata"]
142
		if ok && metadata != nil {
143
			name, ok := metadata.(map[string]interface{})["name"].(string)
144
			if ok {
145
				resourceName = name
146
			}
147
		}
148
	}
149

150
	return resourceName, nil
151
}
152

153
func (f *FactoryController) getResourceFromKubeApi(ctx context.Context, kind, apiVersion, name, namespace string) ([]byte, error) {
154
	resourceMeta := kubeapi.GetResourceMeta(f.Client, kind, apiVersion)
155
	if resourceMeta == nil {
156
		return nil, fmt.Errorf("failed to get meta for resource : ns %s; apiVersion %s; kind %s ", namespace, apiVersion, kind)
157
	}
158

159
	res, err := kubeapi.GetResource(ctx, resourceMeta, name, namespace)
160
	return res, err
161
}
162

163
func (f *FactoryController) createResource(ctx context.Context, tiObject *v1alpha1.TriggerInstance, render util.RenderItem) error {
164
	log := logger.FromContext(ctx)
165

166
	kind := render.Template.Spec.Kind
167
	version := render.Template.Spec.ApiVersion
168

169
	resourceName, err := f.getResourceName(tiObject.Name, kind, render)
170
	if err != nil {
171
		return f.returnErrorWithUpdateStatus(ctx, tiObject,
172
			version, kind, resourceName,
173
			fmt.Errorf("FactoryController: failed to get name for resource : err %s, version %s, r %s ", err.Error(), version, kind),
174
		)
175
	}
176

177
	namespaceName := tiObject.Namespace
178
	_, err = f.getResourceFromKubeApi(ctx, kind, version, resourceName, namespaceName)
179
	if err != nil {
180
		if !errors.IsNotFound(err) {
181
			return f.returnErrorWithUpdateStatus(ctx, tiObject,
182
				version, kind, resourceName,
183
				fmt.Errorf("FactoryController: failed to check resource in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error()),
184
			)
185
		}
186
	} else {
187
		// resource exist so we can not move next
188
		log.Errorf("FactoryController: Resource %s kind %s exist in ns %s, fail creation", resourceName, kind, namespaceName)
189
		return nil
190
	}
191

192
	template, err := f.modifyTemplate(ctx, tiObject, render, version, kind, resourceName, namespaceName)
193
	if err != nil {
194
		return err
195
	}
196

197
	log.Debugf("FactoryController: Resource %s kind %s \nbody %v", resourceName, kind, template)
198

199
	if f.checkIfProcessed(resourceName, tiObject, false) {
200
		log.Warnf("FactoryController: Object %s already processed", resourceName)
201
		return nil
202
	}
203

204
	resourceMeta := kubeapi.GetResourceMeta(f.Client, kind, version)
205
	if resourceMeta == nil {
206
		return f.returnErrorWithUpdateStatus(ctx, tiObject,
207
			version, kind, resourceName,
208
			fmt.Errorf("FactoryController: failed to get meta for resource : ns %s v %s r %s ", namespaceName, version, kind),
209
		)
210
	}
211

212
	jsBody, err := json.Marshal(template)
213

214
	var object map[string]interface{}
215
	raw, err, fullErr := kubeapi.CreateResourceInKubeApi(ctx, f.Client.RESTClient(), resourceMeta, jsBody, namespaceName)
216

217
	log.Debugf("FactoryController: RAW Resource %s kind %s \nbody %v", resourceName, kind, string(raw))
218

219
	if err != nil {
220
		resourceMeta = kubeapi.InvalidateCacheAndTryGetResourceMeta(f.Client, kind, version)
221

222
		if resourceMeta == nil {
223
			return f.returnErrorWithUpdateStatus(ctx, tiObject,
224
				version, kind, resourceName,
225
				fmt.Errorf("FactoryController: failed to get meta for resource : ns %s v %s r %s ", namespaceName, version, kind),
226
			)
227
		}
228

229
		raw, err, fullErr = kubeapi.CreateResourceInKubeApi(ctx, f.Client.RESTClient(), resourceMeta, jsBody, namespaceName)
230
		log.Debugf("FactoryController: Updated RAW Resource %s kind %s \nbody %v", resourceName, kind, string(raw))
231
		if err != nil {
232
			return f.returnErrorWithUpdateStatus(ctx, tiObject,
233
				version, kind, resourceName,
234
				fmt.Errorf("FactoryController: Create resource failed in: ns %s v %s r %s error %s", namespaceName, version, kind, fullErr),
235
			)
236
		}
237
	}
238
	err = json.Unmarshal(raw, &object)
239
	if err != nil {
240
		return f.returnErrorWithUpdateStatus(ctx, tiObject,
241
			version, kind, resourceName,
242
			fmt.Errorf("FactoryController: Unmarshal resource failed in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error()),
243
		)
244
	}
245

246
	var status map[string]interface{}
247
	phase := "Apply"
248

249
	status, ok := object["status"].(map[string]interface{})
250
	if ok {
251
		phs, ok := status["phase"]
252
		if ok {
253
			phase = phs.(string)
254
		}
255
	}
256

257
	data, err := yaml.Marshal(status)
258
	if err != nil {
259
		log.Errorf("FactoryController: Update trigger status failed: %s", err.Error())
260
	}
261

262
	f.updateTIResourceStatus(ctx, tiObject, resourceName, "", phase, string(data), kind, version)
263

264
	return nil
265
}
266

267
func (f *FactoryController) modifyTemplate(ctx context.Context, tiObject *v1alpha1.TriggerInstance, render util.RenderItem, version string, kind string, resourceName string, namespaceName string) (map[string]interface{}, error) {
268
	var template map[string]interface{}
269
	err := yaml.Unmarshal([]byte(render.Render), &template)
270
	if err != nil {
271
		return nil, fmt.Errorf("FactoryController: failed to unmarshal template resource in: ns %s v %s r %s error %s", namespaceName, version, kind, err.Error())
272
	}
273

274
	metadata, ok := template["metadata"]
275
	if !ok || metadata == nil {
276
		template["metadata"] = make(map[string]interface{})
277
		metadata = template["metadata"]
278
	}
279

280
	metadata.(map[string]interface{})["name"] = resourceName
281
	metadata.(map[string]interface{})["namespace"] = namespaceName
282
	metadata.(map[string]interface{})["ownerReferences"] = []map[string]string{
283
		0: {
284
			"apiVersion": tiObject.APIVersion,
285
			"kind":       tiObject.Kind,
286
			"name":       tiObject.Name,
287
			"uid":        string(tiObject.UID),
288
		},
289
	}
290

291
	labels, ok := metadata.(map[string]interface{})["labels"]
292
	if !ok || labels == nil {
293
		template["metadata"].(map[string]interface{})["labels"] = make(map[string]interface{})
294
		labels = template["metadata"].(map[string]interface{})["labels"]
295
	}
296

297
	labels.(map[string]interface{})[env.KbltCreatorLabelKey] = env.KbltCreatorLabelValue
298
	if tiObject.Labels != nil {
299
		for k, v := range tiObject.Labels {
300
			labels.(map[string]interface{})[k] = v
301
		}
302
	}
303

304
	annotations, ok := metadata.(map[string]interface{})["annotations"]
305
	if !ok || annotations == nil {
306
		template["metadata"].(map[string]interface{})["annotations"] = make(map[string]interface{})
307
		annotations = template["metadata"].(map[string]interface{})["annotations"]
308
	}
309
	if tiObject.Annotations != nil {
310
		for k, v := range tiObject.Annotations {
311
			annotations.(map[string]interface{})[k] = v
312
		}
313
	}
314
	return template, err
315
}
316

317
func (f *FactoryController) returnErrorWithUpdateStatus(ctx context.Context, tiObject *v1alpha1.TriggerInstance, version string, kind string, resourceName string, err error) error {
318
	f.updateTIResourceStatus(ctx, tiObject, resourceName, err.Error(), utils.FactoryResourceCreateFailed, "", kind, version)
319
	return err
320
}
321

322
func (f *FactoryController) updateTIResourceStatus(ctx context.Context, tiObject *v1alpha1.TriggerInstance, resourceName string, message string, phase string, data string, kind string, version string) {
323
	log := logger.FromContext(ctx)
324

325
	tiActualObject, err := utils.Cl.TriggerInstance().Get(ctx, tiObject.Namespace, tiObject.Name, metav1.GetOptions{})
326
	if err != nil {
327
		log.Errorf("FactoryController: tiActualObject cant get: %s", err.Error())
328
		return
329
	}
330
	tiActualObject.UpdateResourceStatus(resourceName, message, phase, data, kind, version)
331
	_, err = utils.Cl.TriggerInstance().UpdateStatus(ctx, tiObject.Namespace, tiActualObject, metav1.UpdateOptions{})
332
	if err != nil {
333
		log.Errorf("FactoryController: failed to updateTIResourceStatus: %s", err.Error())
334
		return
335
	}
336
}
337

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.