argo-cd
160 строк · 4.8 Кб
1package controller
2
3import (
4"context"
5"fmt"
6"time"
7
8"github.com/argoproj/argo-cd/v2/util/env"
9"github.com/argoproj/gitops-engine/pkg/cache"
10"github.com/argoproj/gitops-engine/pkg/utils/kube"
11log "github.com/sirupsen/logrus"
12metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13"k8s.io/apimachinery/pkg/labels"
14
15"github.com/argoproj/argo-cd/v2/controller/metrics"
16appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
17"github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
18"github.com/argoproj/argo-cd/v2/util/argo"
19appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
20"github.com/argoproj/argo-cd/v2/util/db"
21)
22
23const (
24defaultSecretUpdateInterval = 10 * time.Second
25
26EnvClusterInfoTimeout = "ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT"
27)
28
29var (
30clusterInfoTimeout = env.ParseDurationFromEnv(EnvClusterInfoTimeout, defaultSecretUpdateInterval, defaultSecretUpdateInterval, 1*time.Minute)
31)
32
33type clusterInfoUpdater struct {
34infoSource metrics.HasClustersInfo
35db db.ArgoDB
36appLister v1alpha1.ApplicationNamespaceLister
37cache *appstatecache.Cache
38clusterFilter func(cluster *appv1.Cluster) bool
39projGetter func(app *appv1.Application) (*appv1.AppProject, error)
40namespace string
41lastUpdated time.Time
42}
43
44func NewClusterInfoUpdater(
45infoSource metrics.HasClustersInfo,
46db db.ArgoDB,
47appLister v1alpha1.ApplicationNamespaceLister,
48cache *appstatecache.Cache,
49clusterFilter func(cluster *appv1.Cluster) bool,
50projGetter func(app *appv1.Application) (*appv1.AppProject, error),
51namespace string) *clusterInfoUpdater {
52
53return &clusterInfoUpdater{infoSource, db, appLister, cache, clusterFilter, projGetter, namespace, time.Time{}}
54}
55
56func (c *clusterInfoUpdater) Run(ctx context.Context) {
57c.updateClusters()
58ticker := time.NewTicker(clusterInfoTimeout)
59for {
60select {
61case <-ctx.Done():
62ticker.Stop()
63return
64case <-ticker.C:
65c.updateClusters()
66}
67}
68}
69
70func (c *clusterInfoUpdater) updateClusters() {
71if time.Since(c.lastUpdated) < clusterInfoTimeout {
72return
73}
74
75ctx, cancel := context.WithTimeout(context.Background(), clusterInfoTimeout)
76defer func() {
77cancel()
78c.lastUpdated = time.Now()
79}()
80
81infoByServer := make(map[string]*cache.ClusterInfo)
82clustersInfo := c.infoSource.GetClustersInfo()
83for i := range clustersInfo {
84info := clustersInfo[i]
85infoByServer[info.Server] = &info
86}
87clusters, err := c.db.ListClusters(ctx)
88if err != nil {
89log.Warnf("Failed to save clusters info: %v", err)
90return
91}
92var clustersFiltered []appv1.Cluster
93if c.clusterFilter == nil {
94clustersFiltered = clusters.Items
95} else {
96for i := range clusters.Items {
97if c.clusterFilter(&clusters.Items[i]) {
98clustersFiltered = append(clustersFiltered, clusters.Items[i])
99}
100}
101}
102_ = kube.RunAllAsync(len(clustersFiltered), func(i int) error {
103cluster := clustersFiltered[i]
104if err := c.updateClusterInfo(ctx, cluster, infoByServer[cluster.Server]); err != nil {
105log.Warnf("Failed to save clusters info: %v", err)
106}
107return nil
108})
109log.Debugf("Successfully saved info of %d clusters", len(clustersFiltered))
110}
111
112func (c *clusterInfoUpdater) updateClusterInfo(ctx context.Context, cluster appv1.Cluster, info *cache.ClusterInfo) error {
113apps, err := c.appLister.List(labels.Everything())
114if err != nil {
115return fmt.Errorf("error while fetching the apps list: %w", err)
116}
117var appCount int64
118for _, a := range apps {
119if c.projGetter != nil {
120proj, err := c.projGetter(a)
121if err != nil || !proj.IsAppNamespacePermitted(a, c.namespace) {
122continue
123}
124}
125if err := argo.ValidateDestination(ctx, &a.Spec.Destination, c.db); err != nil {
126continue
127}
128if a.Spec.Destination.Server == cluster.Server {
129appCount += 1
130}
131}
132now := metav1.Now()
133clusterInfo := appv1.ClusterInfo{
134ConnectionState: appv1.ConnectionState{ModifiedAt: &now},
135ApplicationsCount: appCount,
136}
137if info != nil {
138clusterInfo.ServerVersion = info.K8SVersion
139clusterInfo.APIVersions = argo.APIResourcesToStrings(info.APIResources, true)
140if info.LastCacheSyncTime == nil {
141clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
142} else if info.SyncError == nil {
143clusterInfo.ConnectionState.Status = appv1.ConnectionStatusSuccessful
144syncTime := metav1.NewTime(*info.LastCacheSyncTime)
145clusterInfo.CacheInfo.LastCacheSyncTime = &syncTime
146clusterInfo.CacheInfo.APIsCount = int64(info.APIsCount)
147clusterInfo.CacheInfo.ResourcesCount = int64(info.ResourcesCount)
148} else {
149clusterInfo.ConnectionState.Status = appv1.ConnectionStatusFailed
150clusterInfo.ConnectionState.Message = info.SyncError.Error()
151}
152} else {
153clusterInfo.ConnectionState.Status = appv1.ConnectionStatusUnknown
154if appCount == 0 {
155clusterInfo.ConnectionState.Message = "Cluster has no applications and is not being monitored."
156}
157}
158
159return c.cache.SetClusterInfo(cluster.Server, &clusterInfo)
160}
161