kuma

Форк
0
/
collector.go 
105 строк · 3.0 Кб
1
package gc
2

3
import (
4
	"context"
5
	"fmt"
6
	"time"
7

8
	"github.com/prometheus/client_golang/prometheus"
9

10
	mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
11
	"github.com/kumahq/kuma/pkg/core"
12
	core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
13
	"github.com/kumahq/kuma/pkg/core/resources/manager"
14
	"github.com/kumahq/kuma/pkg/core/resources/model"
15
	"github.com/kumahq/kuma/pkg/core/resources/store"
16
	"github.com/kumahq/kuma/pkg/core/runtime/component"
17
	"github.com/kumahq/kuma/pkg/core/user"
18
	core_metrics "github.com/kumahq/kuma/pkg/metrics"
19
)
20

21
var gcLog = core.Log.WithName("dataplane-gc")
22

23
type collector struct {
24
	rm         manager.ResourceManager
25
	cleanupAge time.Duration
26
	newTicker  func() *time.Ticker
27
	metric     prometheus.Summary
28
}
29

30
func NewCollector(
31
	rm manager.ResourceManager,
32
	newTicker func() *time.Ticker,
33
	cleanupAge time.Duration,
34
	metrics core_metrics.Metrics,
35
) (component.Component, error) {
36
	metric := prometheus.NewSummary(prometheus.SummaryOpts{
37
		Name:       "component_dp_gc",
38
		Help:       "Summary of Dataplane GC component interval",
39
		Objectives: core_metrics.DefaultObjectives,
40
	})
41
	if err := metrics.Register(metric); err != nil {
42
		return nil, err
43
	}
44
	return &collector{
45
		cleanupAge: cleanupAge,
46
		rm:         rm,
47
		newTicker:  newTicker,
48
		metric:     metric,
49
	}, nil
50
}
51

52
func (d *collector) Start(stop <-chan struct{}) error {
53
	ticker := d.newTicker()
54
	defer ticker.Stop()
55
	gcLog.Info("started")
56
	ctx := user.Ctx(context.Background(), user.ControlPlane)
57
	for {
58
		select {
59
		case now := <-ticker.C:
60
			start := core.Now()
61
			if err := d.cleanup(ctx, now); err != nil {
62
				gcLog.Error(err, "unable to cleanup")
63
				continue
64
			}
65
			d.metric.Observe(float64(core.Now().Sub(start).Milliseconds()))
66
		case <-stop:
67
			gcLog.Info("stopped")
68
			return nil
69
		}
70
	}
71
}
72

73
func (d *collector) cleanup(ctx context.Context, now time.Time) error {
74
	dataplaneInsights := &core_mesh.DataplaneInsightResourceList{}
75
	if err := d.rm.List(ctx, dataplaneInsights); err != nil {
76
		return err
77
	}
78
	onDelete := []model.ResourceKey{}
79
	for _, di := range dataplaneInsights.Items {
80
		if di.Spec.IsOnline() {
81
			continue
82
		}
83
		if s := di.Spec.GetLastSubscription().(*mesh_proto.DiscoverySubscription); s != nil {
84
			if err := s.GetDisconnectTime().CheckValid(); err != nil {
85
				gcLog.Error(err, "unable to parse DisconnectTime", "disconnect time", s.GetDisconnectTime(), "mesh", di.GetMeta().GetMesh(), "dataplane", di.GetMeta().GetName())
86
				continue
87
			}
88
			if now.Sub(s.GetDisconnectTime().AsTime()) > d.cleanupAge {
89
				onDelete = append(onDelete, model.ResourceKey{Name: di.GetMeta().GetName(), Mesh: di.GetMeta().GetMesh()})
90
			}
91
		}
92
	}
93
	for _, rk := range onDelete {
94
		gcLog.Info(fmt.Sprintf("deleting dataplane which is offline for %v", d.cleanupAge), "name", rk.Name, "mesh", rk.Mesh)
95
		if err := d.rm.Delete(ctx, core_mesh.NewDataplaneResource(), store.DeleteBy(rk)); err != nil {
96
			gcLog.Error(err, "unable to delete dataplane", "name", rk.Name, "mesh", rk.Mesh)
97
			continue
98
		}
99
	}
100
	return nil
101
}
102

103
func (d *collector) NeedLeaderElection() bool {
104
	return true
105
}
106

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.