kubelatte-ce
Форк от sbertech/kubelatte-ce
196 строк · 5.6 Кб
1package creation2
3import (4"context"5"encoding/json"6"github.com/jasonlvhit/gocron"7"gitverse.ru/synapse/kubelatte/pkg/api/v1alpha1"8"gitverse.ru/synapse/kubelatte/pkg/kubeapi"9"gitverse.ru/synapse/kubelatte/pkg/observability/logger"10"gitverse.ru/synapse/kubelatte/pkg/operator/utils"11"k8s.io/apimachinery/pkg/api/errors"12metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"13"sigs.k8s.io/yaml"14"sync"15)
16
17type WatcherI interface {18StartWatcher()19StopWatcher()20}
21
22func NewWatcher(reSyncPeriod uint64) WatcherI {23return &Watcher{reSyncPeriod: reSyncPeriod}24}
25
26type Watcher struct {27reSyncPeriod uint6428job *gocron.Job29scheduler *gocron.Scheduler30watchObjects map[string]v1alpha1.TriggerInstance31rwMutex sync.Mutex32stopListeners context.CancelFunc33}
34
35func (w *Watcher) StartWatcher() {36log := logger.FromContext(context.Background())37log.Debug("Watcher: Start")38go w.init()39}
40
41func (w *Watcher) StopWatcher() {42log := logger.FromContext(context.Background())43log.Debug("Watcher: Stop")44w.scheduler.Remove(w.job)45w.stopListeners()46}
47
48func (w *Watcher) startToListen(ctx context.Context) {49for {50select {51case period := <-utils.WatcherUpdateResyncPeriod:52go w.updateSyncPeriod(period)53case obj := <-utils.WatcherAddNewObjectTo:54go w.addNewObject(obj)55case obj := <-utils.WatcherRemoveObjectFrom:56go w.removeObject(obj)57case <-ctx.Done():58return59}60}61}
62
63func (w *Watcher) init() {64log := logger.FromContext(context.Background())65w.watchObjects = map[string]v1alpha1.TriggerInstance{}66
67ctx, stop := context.WithCancel(context.Background())68w.stopListeners = stop69go w.startToListen(ctx)70
71if w.reSyncPeriod <= 0 {72w.reSyncPeriod = 573}74w.scheduler = gocron.NewScheduler()75w.job = w.scheduler.Every(w.reSyncPeriod).Second()76err := w.job.Do(w.updateStatus)77if err != nil {78log.Errorf("Watcher: Init failed: %s", err.Error())79return80}81<-w.scheduler.Start()82}
83
84func (w *Watcher) updateStatus() {85w.rwMutex.Lock()86defer w.rwMutex.Unlock()87
88var mustDeleted []string89ctx := context.Background()90log := logger.FromContext(ctx)91clt := kubeapi.GetClient()92
93for tiName, triggerInstance := range w.watchObjects {94ti, err := utils.Cl.TriggerInstance().Get(ctx, triggerInstance.Namespace, triggerInstance.Name, metav1.GetOptions{})95if err != nil {96if errors.IsNotFound(err) {97mustDeleted = append(mustDeleted, tiName)98continue99}100}101originalStatusHash := ti.GetStatusHash()102if ti.Status.ResourceStatus != nil {103allRemoved := true104for i := 0; i < len(ti.Status.ResourceStatus); i++ {105name := ti.Status.ResourceStatus[i].Name106kind := ti.Status.ResourceStatus[i].Kind107version := ti.Status.ResourceStatus[i].ApiVersion108if ti.Status.ResourceStatus[i].Status == utils.FactoryResourceRemoved ||109ti.Status.ResourceStatus[i].Phase == utils.FactoryResourceCreateFailed {110continue111}112resourceMeta := kubeapi.GetResourceMeta(clt, kind, version)113raw, errKube := kubeapi.GetRawResourceFromKubeApi(ctx, clt.RESTClient(), resourceMeta, name, triggerInstance.Namespace)114if errKube != nil {115// if resource is not exist then we set status removed116if errors.IsNotFound(errKube) {117ti.UpdateResourceStatus(name, "", utils.FactoryResourceRemoved, utils.FactoryResourceRemoved, kind, version)118log.Infof("Watcher: Resource %s deleted, set status REMOVED for this ti: %s", name, tiName)119} else {120log.Warnf("Watcher: Get resource %s failed, for this ti: %s error %s", name, tiName, errKube)121}122continue123}124
125var resource map[string]interface{}126err = json.Unmarshal(raw, &resource)127if err != nil {128log.Errorf("Watcher: Update trigger status failed: %s", err.Error())129return130}131
132var status map[string]interface{}133phase := ""134status, ok := resource["status"].(map[string]interface{})135if ok {136phs, ok := status["phase"]137if ok {138phase = phs.(string)139}140}141if !ok {142phase = "Apply"143}144
145data, err := yaml.Marshal(status)146if err != nil {147log.Errorf("Watcher: Update trigger status failed: %s", err.Error())148}149allRemoved = false150ti.UpdateResourceStatus(name, "", phase, string(data), kind, version)151}152if allRemoved {153log.Infof("Watcher: All resources deleted, mark to delete this ti: %s", tiName)154mustDeleted = append(mustDeleted, tiName)155}156}157actualStatusHash := ti.GetStatusHash()158if originalStatusHash == actualStatusHash {159//logs.Debug("Watcher: Ti status hash equal, no need update ti: " + tiName) add trace level160} else {161_, err = utils.Cl.TriggerInstance().UpdateStatus(ctx, triggerInstance.Namespace, ti, metav1.UpdateOptions{})162if err != nil {163log.Errorf("Watcher: Save trigger status failed: %s", err.Error())164}165}166}167
168if len(mustDeleted) > 0 {169for _, tiName := range mustDeleted {170delete(w.watchObjects, tiName)171log.Debugf("Watcher: Clear trigger instance: %s", tiName)172}173}174}
175
176func (w *Watcher) updateSyncPeriod(period uint64) {177w.reSyncPeriod = period178log := logger.FromContext(context.Background())179log.Debugf("Watcher: updateSyncPeriod %d", w.reSyncPeriod)180}
181
182func (w *Watcher) addNewObject(obj v1alpha1.TriggerInstance) {183w.rwMutex.Lock()184w.watchObjects[obj.GetNamespacedName()] = obj185log := logger.FromContext(context.Background())186log.Debugf("Watcher: addNewObject %s", obj.GetNamespacedName())187w.rwMutex.Unlock()188}
189
190func (w *Watcher) removeObject(obj v1alpha1.TriggerInstance) {191w.rwMutex.Lock()192delete(w.watchObjects, obj.GetNamespacedName())193log := logger.FromContext(context.Background())194log.Debugf("Watcher: removeObject %s", obj.GetNamespacedName())195w.rwMutex.Unlock()196}
197