argo-cd

Форк
0
/
clusterinfoupdater.go 
160 строк · 4.8 Кб
1
package controller
2

3
import (
4
	"context"
5
	"fmt"
6
	"time"
7

8
	"github.com/argoproj/argo-cd/v2/util/env"
9
	"github.com/argoproj/gitops-engine/pkg/cache"
10
	"github.com/argoproj/gitops-engine/pkg/utils/kube"
11
	log "github.com/sirupsen/logrus"
12
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
	"k8s.io/apimachinery/pkg/labels"
14

15
	"github.com/argoproj/argo-cd/v2/controller/metrics"
16
	appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
17
	"github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
18
	"github.com/argoproj/argo-cd/v2/util/argo"
19
	appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
20
	"github.com/argoproj/argo-cd/v2/util/db"
21
)
22

23
const (
24
	defaultSecretUpdateInterval = 10 * time.Second
25

26
	EnvClusterInfoTimeout = "ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT"
27
)
28

29
var (
30
	clusterInfoTimeout = env.ParseDurationFromEnv(EnvClusterInfoTimeout, defaultSecretUpdateInterval, defaultSecretUpdateInterval, 1*time.Minute)
31
)
32

33
type clusterInfoUpdater struct {
34
	infoSource    metrics.HasClustersInfo
35
	db            db.ArgoDB
36
	appLister     v1alpha1.ApplicationNamespaceLister
37
	cache         *appstatecache.Cache
38
	clusterFilter func(cluster *appv1.Cluster) bool
39
	projGetter    func(app *appv1.Application) (*appv1.AppProject, error)
40
	namespace     string
41
	lastUpdated   time.Time
42
}
43

44
func NewClusterInfoUpdater(
45
	infoSource metrics.HasClustersInfo,
46
	db db.ArgoDB,
47
	appLister v1alpha1.ApplicationNamespaceLister,
48
	cache *appstatecache.Cache,
49
	clusterFilter func(cluster *appv1.Cluster) bool,
50
	projGetter func(app *appv1.Application) (*appv1.AppProject, error),
51
	namespace string) *clusterInfoUpdater {
52

53
	return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace, time.Time{}}
54
}
55

56
func (c *clusterInfoUpdater) Run(ctx context.Context) {
57
	c.updateClusters()
58
	ticker := time.NewTicker(clusterInfoTimeout)
59
	for {
60
		select {
61
		case <-ctx.Done():
62
			ticker.Stop()
63
			return
64
		case <-ticker.C:
65
			c.updateClusters()
66
		}
67
	}
68
}
69

70
func (c *clusterInfoUpdater) updateClusters() {
71
	if time.Since(c.lastUpdated) < clusterInfoTimeout {
72
		return
73
	}
74

75
	ctx, cancel := context.WithTimeout(context.Background(), clusterInfoTimeout)
76
	defer func() {
77
		cancel()
78
		c.lastUpdated = time.Now()
79
	}()
80

81
	infoByServer := make(map[string]*cache.ClusterInfo)
82
	clustersInfo := c.infoSource.GetClustersInfo()
83
	for i := range clustersInfo {
84
		info := clustersInfo[i]
85
		infoByServer[info.Server] = &info
86
	}
87
	clusters, err := c.db.ListClusters(ctx)
88
	if err != nil {
89
		log.Warnf("Failed to save clusters info: %v", err)
90
		return
91
	}
92
	var clustersFiltered []appv1.Cluster
93
	if c.clusterFilter == nil {
94
		clustersFiltered = clusters.Items
95
	} else {
96
		for i := range clusters.Items {
97
			if c.clusterFilter(&clusters.Items[i]) {
98
				clustersFiltered = append(clustersFiltered, clusters.Items[i])
99
			}
100
		}
101
	}
102
	_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
103
		cluster := clustersFiltered[i]
104
		if err := c.updateClusterInfo(ctx, cluster, infoByServer[cluster.Server]); err != nil {
105
			log.Warnf("Failed to save clusters info: %v", err)
106
		}
107
		return nil
108
	})
109
	log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
110
}
111

112
func (c *clusterInfoUpdater) updateClusterInfo(ctx context.Context, cluster appv1.Cluster, info *cache.ClusterInfo) error {
113
	apps, err := c.appLister.List(labels.Everything())
114
	if err != nil {
115
		return fmt.Errorf("error while fetching the apps list: %w", err)
116
	}
117
	var appCount int64
118
	for _, a := range apps {
119
		if c.projGetter != nil {
120
			proj, err := c.projGetter(a)
121
			if err != nil || !proj.IsAppNamespacePermitted(a, c.namespace) {
122
				continue
123
			}
124
		}
125
		if err := argo.ValidateDestination(ctx, &a.Spec.Destination, c.db); err != nil {
126
			continue
127
		}
128
		if a.Spec.Destination.Server == cluster.Server {
129
			appCount += 1
130
		}
131
	}
132
	now := metav1.Now()
133
	clusterInfo := appv1.ClusterInfo{
134
		ConnectionState:   appv1.ConnectionState{ModifiedAt: &now},
135
		ApplicationsCount: appCount,
136
	}
137
	if info != nil {
138
		clusterInfo.ServerVersion = info.K8SVersion
139
		clusterInfo.APIVersions = argo.APIResourcesToStrings(info.APIResources, true)
140
		if info.LastCacheSyncTime == nil {
141
			clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
142
		} else if info.SyncError == nil {
143
			clusterInfo.ConnectionState.Status = appv1.ConnectionStatusSuccessful
144
			syncTime := metav1.NewTime(*info.LastCacheSyncTime)
145
			clusterInfo.CacheInfo.LastCacheSyncTime = &syncTime
146
			clusterInfo.CacheInfo.APIsCount = int64(info.APIsCount)
147
			clusterInfo.CacheInfo.ResourcesCount = int64(info.ResourcesCount)
148
		} else {
149
			clusterInfo.ConnectionState.Status = appv1.ConnectionStatusFailed
150
			clusterInfo.ConnectionState.Message = info.SyncError.Error()
151
		}
152
	} else {
153
		clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
154
		if appCount == 0 {
155
			clusterInfo.ConnectionState.Message = "Cluster has no applications and is not being monitored."
156
		}
157
	}
158

159
	return c.cache.SetClusterInfo(cluster.Server, &clusterInfo)
160
}
161

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.