В 22:00 МСК будет объявлен перерыв - 10 минут. Вы отдыхаете - мы обновляем!

kubelatte-ce

Форк от sbertech/kubelatte-ce
Форк
2
196 строк · 5.6 Кб
1
package creation
2

3
import (
4
	"context"
5
	"encoding/json"
6
	"github.com/jasonlvhit/gocron"
7
	"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"
8
	"gitverse.ru/synapse/kubelatte/pkg/kubeapi"
9
	"gitverse.ru/synapse/kubelatte/pkg/observability/logger"
10
	"gitverse.ru/synapse/kubelatte/pkg/operator/utils"
11
	"k8s.io/apimachinery/pkg/api/errors"
12
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
	"sigs.k8s.io/yaml"
14
	"sync"
15
)
16

17
type WatcherI interface {
18
	StartWatcher()
19
	StopWatcher()
20
}
21

22
func NewWatcher(reSyncPeriod uint64) WatcherI {
23
	return &Watcher{reSyncPeriod: reSyncPeriod}
24
}
25

26
type Watcher struct {
27
	reSyncPeriod  uint64
28
	job           *gocron.Job
29
	scheduler     *gocron.Scheduler
30
	watchObjects  map[string]v1alpha1.TriggerInstance
31
	rwMutex       sync.Mutex
32
	stopListeners context.CancelFunc
33
}
34

35
func (w *Watcher) StartWatcher() {
36
	log := logger.FromContext(context.Background())
37
	log.Debug("Watcher: Start")
38
	go w.init()
39
}
40

41
func (w *Watcher) StopWatcher() {
42
	log := logger.FromContext(context.Background())
43
	log.Debug("Watcher: Stop")
44
	w.scheduler.Remove(w.job)
45
	w.stopListeners()
46
}
47

48
func (w *Watcher) startToListen(ctx context.Context) {
49
	for {
50
		select {
51
		case period := <-utils.WatcherUpdateResyncPeriod:
52
			go w.updateSyncPeriod(period)
53
		case obj := <-utils.WatcherAddNewObjectTo:
54
			go w.addNewObject(obj)
55
		case obj := <-utils.WatcherRemoveObjectFrom:
56
			go w.removeObject(obj)
57
		case <-ctx.Done():
58
			return
59
		}
60
	}
61
}
62

63
func (w *Watcher) init() {
64
	log := logger.FromContext(context.Background())
65
	w.watchObjects = map[string]v1alpha1.TriggerInstance{}
66

67
	ctx, stop := context.WithCancel(context.Background())
68
	w.stopListeners = stop
69
	go w.startToListen(ctx)
70

71
	if w.reSyncPeriod <= 0 {
72
		w.reSyncPeriod = 5
73
	}
74
	w.scheduler = gocron.NewScheduler()
75
	w.job = w.scheduler.Every(w.reSyncPeriod).Second()
76
	err := w.job.Do(w.updateStatus)
77
	if err != nil {
78
		log.Errorf("Watcher: Init failed: %s", err.Error())
79
		return
80
	}
81
	<-w.scheduler.Start()
82
}
83

84
func (w *Watcher) updateStatus() {
85
	w.rwMutex.Lock()
86
	defer w.rwMutex.Unlock()
87

88
	var mustDeleted []string
89
	ctx := context.Background()
90
	log := logger.FromContext(ctx)
91
	clt := kubeapi.GetClient()
92

93
	for tiName, triggerInstance := range w.watchObjects {
94
		ti, err := utils.Cl.TriggerInstance().Get(ctx, triggerInstance.Namespace, triggerInstance.Name, metav1.GetOptions{})
95
		if err != nil {
96
			if errors.IsNotFound(err) {
97
				mustDeleted = append(mustDeleted, tiName)
98
				continue
99
			}
100
		}
101
		originalStatusHash := ti.GetStatusHash()
102
		if ti.Status.ResourceStatus != nil {
103
			allRemoved := true
104
			for i := 0; i < len(ti.Status.ResourceStatus); i++ {
105
				name := ti.Status.ResourceStatus[i].Name
106
				kind := ti.Status.ResourceStatus[i].Kind
107
				version := ti.Status.ResourceStatus[i].ApiVersion
108
				if ti.Status.ResourceStatus[i].Status == utils.FactoryResourceRemoved ||
109
					ti.Status.ResourceStatus[i].Phase == utils.FactoryResourceCreateFailed {
110
					continue
111
				}
112
				resourceMeta := kubeapi.GetResourceMeta(clt, kind, version)
113
				raw, errKube := kubeapi.GetRawResourceFromKubeApi(ctx, clt.RESTClient(), resourceMeta, name, triggerInstance.Namespace)
114
				if errKube != nil {
115
					// if resource is not exist then we set status removed
116
					if errors.IsNotFound(errKube) {
117
						ti.UpdateResourceStatus(name, "", utils.FactoryResourceRemoved, utils.FactoryResourceRemoved, kind, version)
118
						log.Infof("Watcher: Resource %s deleted, set status REMOVED for this ti: %s", name, tiName)
119
					} else {
120
						log.Warnf("Watcher: Get resource %s failed, for this ti: %s error %s", name, tiName, errKube)
121
					}
122
					continue
123
				}
124

125
				var resource map[string]interface{}
126
				err = json.Unmarshal(raw, &resource)
127
				if err != nil {
128
					log.Errorf("Watcher: Update trigger status failed: %s", err.Error())
129
					return
130
				}
131

132
				var status map[string]interface{}
133
				phase := ""
134
				status, ok := resource["status"].(map[string]interface{})
135
				if ok {
136
					phs, ok := status["phase"]
137
					if ok {
138
						phase = phs.(string)
139
					}
140
				}
141
				if !ok {
142
					phase = "Apply"
143
				}
144

145
				data, err := yaml.Marshal(status)
146
				if err != nil {
147
					log.Errorf("Watcher: Update trigger status failed: %s", err.Error())
148
				}
149
				allRemoved = false
150
				ti.UpdateResourceStatus(name, "", phase, string(data), kind, version)
151
			}
152
			if allRemoved {
153
				log.Infof("Watcher: All resources deleted, mark to delete this ti: %s", tiName)
154
				mustDeleted = append(mustDeleted, tiName)
155
			}
156
		}
157
		actualStatusHash := ti.GetStatusHash()
158
		if originalStatusHash == actualStatusHash {
159
			//logs.Debug("Watcher: Ti status hash equal, no need update ti: " + tiName) add trace level
160
		} else {
161
			_, err = utils.Cl.TriggerInstance().UpdateStatus(ctx, triggerInstance.Namespace, ti, metav1.UpdateOptions{})
162
			if err != nil {
163
				log.Errorf("Watcher: Save trigger status failed: %s", err.Error())
164
			}
165
		}
166
	}
167

168
	if len(mustDeleted) > 0 {
169
		for _, tiName := range mustDeleted {
170
			delete(w.watchObjects, tiName)
171
			log.Debugf("Watcher: Clear trigger instance: %s", tiName)
172
		}
173
	}
174
}
175

176
func (w *Watcher) updateSyncPeriod(period uint64) {
177
	w.reSyncPeriod = period
178
	log := logger.FromContext(context.Background())
179
	log.Debugf("Watcher: updateSyncPeriod %d", w.reSyncPeriod)
180
}
181

182
func (w *Watcher) addNewObject(obj v1alpha1.TriggerInstance) {
183
	w.rwMutex.Lock()
184
	w.watchObjects[obj.GetNamespacedName()] = obj
185
	log := logger.FromContext(context.Background())
186
	log.Debugf("Watcher: addNewObject %s", obj.GetNamespacedName())
187
	w.rwMutex.Unlock()
188
}
189

190
func (w *Watcher) removeObject(obj v1alpha1.TriggerInstance) {
191
	w.rwMutex.Lock()
192
	delete(w.watchObjects, obj.GetNamespacedName())
193
	log := logger.FromContext(context.Background())
194
	log.Debugf("Watcher: removeObject %s", obj.GetNamespacedName())
195
	w.rwMutex.Unlock()
196
}
197

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.