kubelatte-ce
Форк от sbertech/kubelatte-ce
196 строк · 5.6 Кб
1package creation
2
3import (
4"context"
5"encoding/json"
6"github.com/jasonlvhit/gocron"
7"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/api/v1alpha1"
8"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/kubeapi"
9"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/observability/logger"
10"gitverse.ru/ktrntrsv/kubelatte-ce/pkg/operator/utils"
11"k8s.io/apimachinery/pkg/api/errors"
12metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13"sigs.k8s.io/yaml"
14"sync"
15)
16
17type WatcherI interface {
18StartWatcher()
19StopWatcher()
20}
21
22func NewWatcher(reSyncPeriod uint64) WatcherI {
23return &Watcher{reSyncPeriod: reSyncPeriod}
24}
25
26type Watcher struct {
27reSyncPeriod uint64
28job *gocron.Job
29scheduler *gocron.Scheduler
30watchObjects map[string]v1alpha1.TriggerInstance
31rwMutex sync.Mutex
32stopListeners context.CancelFunc
33}
34
35func (w *Watcher) StartWatcher() {
36log := logger.FromContext(context.Background())
37log.Debug("Watcher: Start")
38go w.init()
39}
40
41func (w *Watcher) StopWatcher() {
42log := logger.FromContext(context.Background())
43log.Debug("Watcher: Stop")
44w.scheduler.Remove(w.job)
45w.stopListeners()
46}
47
48func (w *Watcher) startToListen(ctx context.Context) {
49for {
50select {
51case period := <-utils.WatcherUpdateResyncPeriod:
52go w.updateSyncPeriod(period)
53case obj := <-utils.WatcherAddNewObjectTo:
54go w.addNewObject(obj)
55case obj := <-utils.WatcherRemoveObjectFrom:
56go w.removeObject(obj)
57case <-ctx.Done():
58return
59}
60}
61}
62
63func (w *Watcher) init() {
64log := logger.FromContext(context.Background())
65w.watchObjects = map[string]v1alpha1.TriggerInstance{}
66
67ctx, stop := context.WithCancel(context.Background())
68w.stopListeners = stop
69go w.startToListen(ctx)
70
71if w.reSyncPeriod <= 0 {
72w.reSyncPeriod = 5
73}
74w.scheduler = gocron.NewScheduler()
75w.job = w.scheduler.Every(w.reSyncPeriod).Second()
76err := w.job.Do(w.updateStatus)
77if err != nil {
78log.Errorf("Watcher: Init failed: %s", err.Error())
79return
80}
81<-w.scheduler.Start()
82}
83
84func (w *Watcher) updateStatus() {
85w.rwMutex.Lock()
86defer w.rwMutex.Unlock()
87
88var mustDeleted []string
89ctx := context.Background()
90log := logger.FromContext(ctx)
91clt := kubeapi.GetClient()
92
93for tiName, triggerInstance := range w.watchObjects {
94ti, err := utils.Cl.TriggerInstance().Get(ctx, triggerInstance.Namespace, triggerInstance.Name, metav1.GetOptions{})
95if err != nil {
96if errors.IsNotFound(err) {
97mustDeleted = append(mustDeleted, tiName)
98continue
99}
100}
101originalStatusHash := ti.GetStatusHash()
102if ti.Status.ResourceStatus != nil {
103allRemoved := true
104for i := 0; i < len(ti.Status.ResourceStatus); i++ {
105name := ti.Status.ResourceStatus[i].Name
106kind := ti.Status.ResourceStatus[i].Kind
107version := ti.Status.ResourceStatus[i].ApiVersion
108if ti.Status.ResourceStatus[i].Status == utils.FactoryResourceRemoved ||
109ti.Status.ResourceStatus[i].Phase == utils.FactoryResourceCreateFailed {
110continue
111}
112resourceMeta := kubeapi.GetResourceMeta(clt, kind, version)
113raw, errKube := kubeapi.GetRawResourceFromKubeApi(ctx, clt.RESTClient(), resourceMeta, name, triggerInstance.Namespace)
114if errKube != nil {
115// if resource is not exist then we set status removed
116if errors.IsNotFound(errKube) {
117ti.UpdateResourceStatus(name, "", utils.FactoryResourceRemoved, utils.FactoryResourceRemoved, kind, version)
118log.Infof("Watcher: Resource %s deleted, set status REMOVED for this ti: %s", name, tiName)
119} else {
120log.Warnf("Watcher: Get resource %s failed, for this ti: %s error %s", name, tiName, errKube)
121}
122continue
123}
124
125var resource map[string]interface{}
126err = json.Unmarshal(raw, &resource)
127if err != nil {
128log.Errorf("Watcher: Update trigger status failed: %s", err.Error())
129return
130}
131
132var status map[string]interface{}
133phase := ""
134status, ok := resource["status"].(map[string]interface{})
135if ok {
136phs, ok := status["phase"]
137if ok {
138phase = phs.(string)
139}
140}
141if !ok {
142phase = "Apply"
143}
144
145data, err := yaml.Marshal(status)
146if err != nil {
147log.Errorf("Watcher: Update trigger status failed: %s", err.Error())
148}
149allRemoved = false
150ti.UpdateResourceStatus(name, "", phase, string(data), kind, version)
151}
152if allRemoved {
153log.Infof("Watcher: All resources deleted, mark to delete this ti: %s", tiName)
154mustDeleted = append(mustDeleted, tiName)
155}
156}
157actualStatusHash := ti.GetStatusHash()
158if originalStatusHash == actualStatusHash {
159//logs.Debug("Watcher: Ti status hash equal, no need update ti: " + tiName) add trace level
160} else {
161_, err = utils.Cl.TriggerInstance().UpdateStatus(ctx, triggerInstance.Namespace, ti, metav1.UpdateOptions{})
162if err != nil {
163log.Errorf("Watcher: Save trigger status failed: %s", err.Error())
164}
165}
166}
167
168if len(mustDeleted) > 0 {
169for _, tiName := range mustDeleted {
170delete(w.watchObjects, tiName)
171log.Debugf("Watcher: Clear trigger instance: %s", tiName)
172}
173}
174}
175
176func (w *Watcher) updateSyncPeriod(period uint64) {
177w.reSyncPeriod = period
178log := logger.FromContext(context.Background())
179log.Debugf("Watcher: updateSyncPeriod %d", w.reSyncPeriod)
180}
181
182func (w *Watcher) addNewObject(obj v1alpha1.TriggerInstance) {
183w.rwMutex.Lock()
184w.watchObjects[obj.GetNamespacedName()] = obj
185log := logger.FromContext(context.Background())
186log.Debugf("Watcher: addNewObject %s", obj.GetNamespacedName())
187w.rwMutex.Unlock()
188}
189
190func (w *Watcher) removeObject(obj v1alpha1.TriggerInstance) {
191w.rwMutex.Lock()
192delete(w.watchObjects, obj.GetNamespacedName())
193log := logger.FromContext(context.Background())
194log.Debugf("Watcher: removeObject %s", obj.GetNamespacedName())
195w.rwMutex.Unlock()
196}
197