8
"github.com/prometheus/client_golang/prometheus"
10
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
11
"github.com/kumahq/kuma/pkg/core"
12
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
13
"github.com/kumahq/kuma/pkg/core/resources/manager"
14
"github.com/kumahq/kuma/pkg/core/resources/model"
15
"github.com/kumahq/kuma/pkg/core/resources/store"
16
"github.com/kumahq/kuma/pkg/core/runtime/component"
17
"github.com/kumahq/kuma/pkg/core/user"
18
core_metrics "github.com/kumahq/kuma/pkg/metrics"
21
var gcLog = core.Log.WithName("dataplane-gc")
23
type collector struct {
24
rm manager.ResourceManager
25
cleanupAge time.Duration
26
newTicker func() *time.Ticker
27
metric prometheus.Summary
31
rm manager.ResourceManager,
32
newTicker func() *time.Ticker,
33
cleanupAge time.Duration,
34
metrics core_metrics.Metrics,
35
) (component.Component, error) {
36
metric := prometheus.NewSummary(prometheus.SummaryOpts{
37
Name: "component_dp_gc",
38
Help: "Summary of Dataplane GC component interval",
39
Objectives: core_metrics.DefaultObjectives,
41
if err := metrics.Register(metric); err != nil {
45
cleanupAge: cleanupAge,
52
func (d *collector) Start(stop <-chan struct{}) error {
53
ticker := d.newTicker()
56
ctx := user.Ctx(context.Background(), user.ControlPlane)
59
case now := <-ticker.C:
61
if err := d.cleanup(ctx, now); err != nil {
62
gcLog.Error(err, "unable to cleanup")
65
d.metric.Observe(float64(core.Now().Sub(start).Milliseconds()))
73
func (d *collector) cleanup(ctx context.Context, now time.Time) error {
74
dataplaneInsights := &core_mesh.DataplaneInsightResourceList{}
75
if err := d.rm.List(ctx, dataplaneInsights); err != nil {
78
onDelete := []model.ResourceKey{}
79
for _, di := range dataplaneInsights.Items {
80
if di.Spec.IsOnline() {
83
if s := di.Spec.GetLastSubscription().(*mesh_proto.DiscoverySubscription); s != nil {
84
if err := s.GetDisconnectTime().CheckValid(); err != nil {
85
gcLog.Error(err, "unable to parse DisconnectTime", "disconnect time", s.GetDisconnectTime(), "mesh", di.GetMeta().GetMesh(), "dataplane", di.GetMeta().GetName())
88
if now.Sub(s.GetDisconnectTime().AsTime()) > d.cleanupAge {
89
onDelete = append(onDelete, model.ResourceKey{Name: di.GetMeta().GetName(), Mesh: di.GetMeta().GetMesh()})
93
for _, rk := range onDelete {
94
gcLog.Info(fmt.Sprintf("deleting dataplane which is offline for %v", d.cleanupAge), "name", rk.Name, "mesh", rk.Mesh)
95
if err := d.rm.Delete(ctx, core_mesh.NewDataplaneResource(), store.DeleteBy(rk)); err != nil {
96
gcLog.Error(err, "unable to delete dataplane", "name", rk.Name, "mesh", rk.Mesh)
103
func (d *collector) NeedLeaderElection() bool {