argo-cd
2226 строк · 82.3 Кб
1package controller
2
3import (
4"context"
5"encoding/json"
6goerrors "errors"
7"fmt"
8"math"
9"math/rand"
10"net/http"
11"reflect"
12"runtime/debug"
13"sort"
14"strconv"
15"strings"
16"sync"
17"time"
18
19clustercache "github.com/argoproj/gitops-engine/pkg/cache"
20"github.com/argoproj/gitops-engine/pkg/diff"
21"github.com/argoproj/gitops-engine/pkg/health"
22synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
23resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource"
24"github.com/argoproj/gitops-engine/pkg/utils/kube"
25jsonpatch "github.com/evanphx/json-patch"
26log "github.com/sirupsen/logrus"
27"golang.org/x/sync/semaphore"
28v1 "k8s.io/api/core/v1"
29apierr "k8s.io/apimachinery/pkg/api/errors"
30metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32"k8s.io/apimachinery/pkg/labels"
33apiruntime "k8s.io/apimachinery/pkg/runtime"
34"k8s.io/apimachinery/pkg/runtime/schema"
35"k8s.io/apimachinery/pkg/types"
36"k8s.io/apimachinery/pkg/util/runtime"
37"k8s.io/apimachinery/pkg/util/wait"
38"k8s.io/apimachinery/pkg/watch"
39"k8s.io/client-go/informers"
40informerv1 "k8s.io/client-go/informers/apps/v1"
41"k8s.io/client-go/kubernetes"
42"k8s.io/client-go/tools/cache"
43"k8s.io/client-go/util/workqueue"
44
45"github.com/argoproj/argo-cd/v2/common"
46statecache "github.com/argoproj/argo-cd/v2/controller/cache"
47"github.com/argoproj/argo-cd/v2/controller/metrics"
48"github.com/argoproj/argo-cd/v2/controller/sharding"
49"github.com/argoproj/argo-cd/v2/pkg/apis/application"
50appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
51appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
52"github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1"
53applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
54"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
55"github.com/argoproj/argo-cd/v2/util/argo"
56argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
57"github.com/argoproj/argo-cd/v2/util/env"
58
59kubeerrors "k8s.io/apimachinery/pkg/api/errors"
60
61"github.com/argoproj/argo-cd/v2/pkg/ratelimiter"
62appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
63"github.com/argoproj/argo-cd/v2/util/db"
64"github.com/argoproj/argo-cd/v2/util/errors"
65"github.com/argoproj/argo-cd/v2/util/glob"
66"github.com/argoproj/argo-cd/v2/util/helm"
67logutils "github.com/argoproj/argo-cd/v2/util/log"
68settings_util "github.com/argoproj/argo-cd/v2/util/settings"
69)
70
71const (
72updateOperationStateTimeout = 1 * time.Second
73defaultDeploymentInformerResyncDuration = 10 * time.Second
74// orphanedIndex contains application which monitor orphaned resources by namespace
75orphanedIndex = "orphaned"
76)
77
78type CompareWith int
79
80const (
81// Compare live application state against state defined in latest git revision with no resolved revision caching.
82CompareWithLatestForceResolve CompareWith = 3
83// Compare live application state against state defined in latest git revision.
84CompareWithLatest CompareWith = 2
85// Compare live application state against state defined using revision of most recent comparison.
86CompareWithRecent CompareWith = 1
87// Skip comparison and only refresh application resources tree
88ComparisonWithNothing CompareWith = 0
89)
90
91func (a CompareWith) Max(b CompareWith) CompareWith {
92return CompareWith(math.Max(float64(a), float64(b)))
93}
94
95func (a CompareWith) Pointer() *CompareWith {
96return &a
97}
98
99// ApplicationController is the controller for application resources.
100type ApplicationController struct {
101cache *appstatecache.Cache
102namespace string
103kubeClientset kubernetes.Interface
104kubectl kube.Kubectl
105applicationClientset appclientset.Interface
106auditLogger *argo.AuditLogger
107// queue contains app namespace/name
108appRefreshQueue workqueue.RateLimitingInterface
109// queue contains app namespace/name/comparisonType and used to request app refresh with the predefined comparison type
110appComparisonTypeRefreshQueue workqueue.RateLimitingInterface
111appOperationQueue workqueue.RateLimitingInterface
112projectRefreshQueue workqueue.RateLimitingInterface
113appInformer cache.SharedIndexInformer
114appLister applisters.ApplicationLister
115projInformer cache.SharedIndexInformer
116appStateManager AppStateManager
117stateCache statecache.LiveStateCache
118statusRefreshTimeout time.Duration
119statusHardRefreshTimeout time.Duration
120statusRefreshJitter time.Duration
121selfHealTimeout time.Duration
122repoClientset apiclient.Clientset
123db db.ArgoDB
124settingsMgr *settings_util.SettingsManager
125refreshRequestedApps map[string]CompareWith
126refreshRequestedAppsMutex *sync.Mutex
127metricsServer *metrics.MetricsServer
128kubectlSemaphore *semaphore.Weighted
129clusterSharding sharding.ClusterShardingCache
130projByNameCache sync.Map
131applicationNamespaces []string
132
133// dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized
134dynamicClusterDistributionEnabled bool
135deploymentInformer informerv1.DeploymentInformer
136}
137
138// NewApplicationController creates new instance of ApplicationController.
139func NewApplicationController(
140namespace string,
141settingsMgr *settings_util.SettingsManager,
142kubeClientset kubernetes.Interface,
143applicationClientset appclientset.Interface,
144repoClientset apiclient.Clientset,
145argoCache *appstatecache.Cache,
146kubectl kube.Kubectl,
147appResyncPeriod time.Duration,
148appHardResyncPeriod time.Duration,
149appResyncJitter time.Duration,
150selfHealTimeout time.Duration,
151repoErrorGracePeriod time.Duration,
152metricsPort int,
153metricsCacheExpiration time.Duration,
154metricsApplicationLabels []string,
155kubectlParallelismLimit int64,
156persistResourceHealth bool,
157clusterSharding sharding.ClusterShardingCache,
158applicationNamespaces []string,
159rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
160serverSideDiff bool,
161dynamicClusterDistributionEnabled bool,
162) (*ApplicationController, error) {
163log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter)
164db := db.NewDB(namespace, settingsMgr, kubeClientset)
165if rateLimiterConfig == nil {
166rateLimiterConfig = ratelimiter.GetDefaultAppRateLimiterConfig()
167log.Info("Using default workqueue rate limiter config")
168}
169ctrl := ApplicationController{
170cache: argoCache,
171namespace: namespace,
172kubeClientset: kubeClientset,
173kubectl: kubectl,
174applicationClientset: applicationClientset,
175repoClientset: repoClientset,
176appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
177appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
178projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
179appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
180db: db,
181statusRefreshTimeout: appResyncPeriod,
182statusHardRefreshTimeout: appHardResyncPeriod,
183statusRefreshJitter: appResyncJitter,
184refreshRequestedApps: make(map[string]CompareWith),
185refreshRequestedAppsMutex: &sync.Mutex{},
186auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
187settingsMgr: settingsMgr,
188selfHealTimeout: selfHealTimeout,
189clusterSharding: clusterSharding,
190projByNameCache: sync.Map{},
191applicationNamespaces: applicationNamespaces,
192dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled,
193}
194if kubectlParallelismLimit > 0 {
195ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit)
196}
197kubectl.SetOnKubectlRun(ctrl.onKubectlRun)
198appInformer, appLister := ctrl.newApplicationInformerAndLister()
199indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
200projInformer := v1alpha1.NewAppProjectInformer(applicationClientset, namespace, appResyncPeriod, indexers)
201var err error
202_, err = projInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
203AddFunc: func(obj interface{}) {
204if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
205ctrl.projectRefreshQueue.AddRateLimited(key)
206if projMeta, ok := obj.(metav1.Object); ok {
207ctrl.InvalidateProjectsCache(projMeta.GetName())
208}
209
210}
211},
212UpdateFunc: func(old, new interface{}) {
213if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
214ctrl.projectRefreshQueue.AddRateLimited(key)
215if projMeta, ok := new.(metav1.Object); ok {
216ctrl.InvalidateProjectsCache(projMeta.GetName())
217}
218}
219},
220DeleteFunc: func(obj interface{}) {
221if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
222// immediately push to queue for deletes
223ctrl.projectRefreshQueue.Add(key)
224if projMeta, ok := obj.(metav1.Object); ok {
225ctrl.InvalidateProjectsCache(projMeta.GetName())
226}
227}
228},
229})
230if err != nil {
231return nil, err
232}
233
234factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace()))
235
236var deploymentInformer informerv1.DeploymentInformer
237
238// only initialize deployment informer if dynamic distribution is enabled
239if dynamicClusterDistributionEnabled {
240deploymentInformer = factory.Apps().V1().Deployments()
241}
242
243readinessHealthCheck := func(r *http.Request) error {
244if dynamicClusterDistributionEnabled {
245applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
246appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
247if err != nil {
248if kubeerrors.IsNotFound(err) {
249appControllerDeployment = nil
250} else {
251return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
252}
253}
254if appControllerDeployment != nil {
255if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
256return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
257}
258shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
259if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
260return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
261}
262}
263}
264return nil
265}
266
267metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
268
269ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels)
270if err != nil {
271return nil, err
272}
273if metricsCacheExpiration.Seconds() != 0 {
274err = ctrl.metricsServer.SetExpiration(metricsCacheExpiration)
275if err != nil {
276return nil, err
277}
278}
279stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterSharding, argo.NewResourceTracking())
280appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth, repoErrorGracePeriod, serverSideDiff)
281ctrl.appInformer = appInformer
282ctrl.appLister = appLister
283ctrl.projInformer = projInformer
284ctrl.deploymentInformer = deploymentInformer
285ctrl.appStateManager = appStateManager
286ctrl.stateCache = stateCache
287
288return &ctrl, nil
289}
290
291func (ctrl *ApplicationController) InvalidateProjectsCache(names ...string) {
292if len(names) > 0 {
293for _, name := range names {
294ctrl.projByNameCache.Delete(name)
295}
296} else {
297if ctrl != nil {
298ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
299ctrl.projByNameCache.Delete(key)
300return true
301})
302}
303}
304}
305
306func (ctrl *ApplicationController) GetMetricsServer() *metrics.MetricsServer {
307return ctrl.metricsServer
308}
309
310func (ctrl *ApplicationController) onKubectlRun(command string) (kube.CleanupFunc, error) {
311ctrl.metricsServer.IncKubectlExec(command)
312if ctrl.kubectlSemaphore != nil {
313if err := ctrl.kubectlSemaphore.Acquire(context.Background(), 1); err != nil {
314return nil, err
315}
316ctrl.metricsServer.IncKubectlExecPending(command)
317}
318return func() {
319if ctrl.kubectlSemaphore != nil {
320ctrl.kubectlSemaphore.Release(1)
321ctrl.metricsServer.DecKubectlExecPending(command)
322}
323}, nil
324}
325
326func isSelfReferencedApp(app *appv1.Application, ref v1.ObjectReference) bool {
327gvk := ref.GroupVersionKind()
328return ref.UID == app.UID &&
329ref.Name == app.Name &&
330ref.Namespace == app.Namespace &&
331gvk.Group == application.Group &&
332gvk.Kind == application.ApplicationKind
333}
334
335func (ctrl *ApplicationController) newAppProjCache(name string) *appProjCache {
336return &appProjCache{name: name, ctrl: ctrl}
337}
338
339type appProjCache struct {
340name string
341ctrl *ApplicationController
342
343lock sync.Mutex
344appProj *appv1.AppProject
345}
346
347// GetAppProject gets an AppProject from the cache. If the AppProject is not
348// yet cached, retrieves the AppProject from the K8s control plane and stores
349// in the cache.
350func (projCache *appProjCache) GetAppProject(ctx context.Context) (*appv1.AppProject, error) {
351projCache.lock.Lock()
352defer projCache.lock.Unlock()
353if projCache.appProj != nil {
354return projCache.appProj, nil
355}
356proj, err := argo.GetAppProjectByName(projCache.name, applisters.NewAppProjectLister(projCache.ctrl.projInformer.GetIndexer()), projCache.ctrl.namespace, projCache.ctrl.settingsMgr, projCache.ctrl.db, ctx)
357if err != nil {
358return nil, err
359}
360projCache.appProj = proj
361return projCache.appProj, nil
362}
363
364// getAppProj gets the AppProject for the given Application app.
365func (ctrl *ApplicationController) getAppProj(app *appv1.Application) (*appv1.AppProject, error) {
366projCache, _ := ctrl.projByNameCache.LoadOrStore(app.Spec.GetProject(), ctrl.newAppProjCache(app.Spec.GetProject()))
367proj, err := projCache.(*appProjCache).GetAppProject(context.TODO())
368if err != nil {
369if apierr.IsNotFound(err) {
370return nil, err
371} else {
372return nil, fmt.Errorf("could not retrieve AppProject '%s' from cache: %v", app.Spec.Project, err)
373}
374}
375if !proj.IsAppNamespacePermitted(app, ctrl.namespace) {
376return nil, argo.ErrProjectNotPermitted(app.GetName(), app.GetNamespace(), proj.GetName())
377}
378return proj, nil
379}
380
381func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]bool, ref v1.ObjectReference) {
382// if namespaced resource is not managed by any app it might be orphaned resource of some other apps
383if len(managedByApp) == 0 && ref.Namespace != "" {
384// retrieve applications which monitor orphaned resources in the same namespace and refresh them unless resource is denied in app project
385if objs, err := ctrl.appInformer.GetIndexer().ByIndex(orphanedIndex, ref.Namespace); err == nil {
386for i := range objs {
387app, ok := objs[i].(*appv1.Application)
388if !ok {
389continue
390}
391
392managedByApp[app.InstanceName(ctrl.namespace)] = true
393}
394}
395}
396for appName, isManagedResource := range managedByApp {
397// The appName is given as <namespace>_<name>, but the indexer needs it
398// format <namespace>/<name>
399appKey := ctrl.toAppKey(appName)
400obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
401app, ok := obj.(*appv1.Application)
402if exists && err == nil && ok && isSelfReferencedApp(app, ref) {
403// Don't force refresh app if related resource is application itself. This prevents infinite reconciliation loop.
404continue
405}
406
407if !ctrl.canProcessApp(obj) {
408// Don't force refresh app if app belongs to a different controller shard or is outside the allowed namespaces.
409continue
410}
411
412// Enforce application's permission for the source namespace
413_, err = ctrl.getAppProj(app)
414if err != nil {
415log.Errorf("Unable to determine project for app '%s': %v", app.QualifiedName(), err)
416continue
417}
418
419level := ComparisonWithNothing
420if isManagedResource {
421level = CompareWithRecent
422}
423
424namespace := ref.Namespace
425if ref.Namespace == "" {
426namespace = "(cluster-scoped)"
427}
428log.WithFields(log.Fields{
429"application": appKey,
430"level": level,
431"namespace": namespace,
432"name": ref.Name,
433"api-version": ref.APIVersion,
434"kind": ref.Kind,
435"server": app.Spec.Destination.Server,
436"cluster-name": app.Spec.Destination.Name,
437}).Debug("Requesting app refresh caused by object update")
438
439ctrl.requestAppRefresh(app.QualifiedName(), &level, nil)
440}
441}
442
443// setAppManagedResources will build a list of ResourceDiff based on the provided comparisonResult
444// and persist app resources related data in the cache. Will return the persisted ApplicationTree.
445func (ctrl *ApplicationController) setAppManagedResources(a *appv1.Application, comparisonResult *comparisonResult) (*appv1.ApplicationTree, error) {
446managedResources, err := ctrl.hideSecretData(a, comparisonResult)
447if err != nil {
448return nil, fmt.Errorf("error getting managed resources: %s", err)
449}
450tree, err := ctrl.getResourceTree(a, managedResources)
451if err != nil {
452return nil, fmt.Errorf("error getting resource tree: %s", err)
453}
454err = ctrl.cache.SetAppResourcesTree(a.InstanceName(ctrl.namespace), tree)
455if err != nil {
456return nil, fmt.Errorf("error setting app resource tree: %s", err)
457}
458err = ctrl.cache.SetAppManagedResources(a.InstanceName(ctrl.namespace), managedResources)
459if err != nil {
460return nil, fmt.Errorf("error setting app managed resources: %s", err)
461}
462return tree, nil
463}
464
465// returns true of given resources exist in the namespace by default and not managed by the user
466func isKnownOrphanedResourceExclusion(key kube.ResourceKey, proj *appv1.AppProject) bool {
467if key.Namespace == "default" && key.Group == "" && key.Kind == kube.ServiceKind && key.Name == "kubernetes" {
468return true
469}
470if key.Group == "" && key.Kind == kube.ServiceAccountKind && key.Name == "default" {
471return true
472}
473if key.Group == "" && key.Kind == "ConfigMap" && key.Name == "kube-root-ca.crt" {
474return true
475}
476list := proj.Spec.OrphanedResources.Ignore
477for _, item := range list {
478if item.Kind == "" || glob.Match(item.Kind, key.Kind) {
479if glob.Match(item.Group, key.Group) {
480if item.Name == "" || glob.Match(item.Name, key.Name) {
481return true
482}
483}
484}
485}
486return false
487}
488
489func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managedResources []*appv1.ResourceDiff) (*appv1.ApplicationTree, error) {
490nodes := make([]appv1.ResourceNode, 0)
491proj, err := ctrl.getAppProj(a)
492if err != nil {
493return nil, fmt.Errorf("failed to get project: %w", err)
494}
495
496orphanedNodesMap := make(map[kube.ResourceKey]appv1.ResourceNode)
497warnOrphaned := true
498if proj.Spec.OrphanedResources != nil {
499orphanedNodesMap, err = ctrl.stateCache.GetNamespaceTopLevelResources(a.Spec.Destination.Server, a.Spec.Destination.Namespace)
500if err != nil {
501return nil, fmt.Errorf("failed to get namespace top-level resources: %w", err)
502}
503warnOrphaned = proj.Spec.OrphanedResources.IsWarn()
504}
505for i := range managedResources {
506managedResource := managedResources[i]
507delete(orphanedNodesMap, kube.NewResourceKey(managedResource.Group, managedResource.Kind, managedResource.Namespace, managedResource.Name))
508var live = &unstructured.Unstructured{}
509err := json.Unmarshal([]byte(managedResource.LiveState), &live)
510if err != nil {
511return nil, fmt.Errorf("failed to unmarshal live state of managed resources: %w", err)
512}
513
514if live == nil {
515var target = &unstructured.Unstructured{}
516err = json.Unmarshal([]byte(managedResource.TargetState), &target)
517if err != nil {
518return nil, fmt.Errorf("failed to unmarshal target state of managed resources: %w", err)
519}
520nodes = append(nodes, appv1.ResourceNode{
521ResourceRef: appv1.ResourceRef{
522Version: target.GroupVersionKind().Version,
523Name: managedResource.Name,
524Kind: managedResource.Kind,
525Group: managedResource.Group,
526Namespace: managedResource.Namespace,
527},
528})
529} else {
530err := ctrl.stateCache.IterateHierarchy(a.Spec.Destination.Server, kube.GetResourceKey(live), func(child appv1.ResourceNode, appName string) bool {
531permitted, _ := proj.IsResourcePermitted(schema.GroupKind{Group: child.ResourceRef.Group, Kind: child.ResourceRef.Kind}, child.Namespace, a.Spec.Destination, func(project string) ([]*appv1.Cluster, error) {
532clusters, err := ctrl.db.GetProjectClusters(context.TODO(), project)
533if err != nil {
534return nil, fmt.Errorf("failed to get project clusters: %w", err)
535}
536return clusters, nil
537})
538if !permitted {
539return false
540}
541nodes = append(nodes, child)
542return true
543})
544if err != nil {
545return nil, fmt.Errorf("failed to iterate resource hierarchy: %w", err)
546}
547}
548}
549orphanedNodes := make([]appv1.ResourceNode, 0)
550for k := range orphanedNodesMap {
551if k.Namespace != "" && proj.IsGroupKindPermitted(k.GroupKind(), true) && !isKnownOrphanedResourceExclusion(k, proj) {
552err := ctrl.stateCache.IterateHierarchy(a.Spec.Destination.Server, k, func(child appv1.ResourceNode, appName string) bool {
553belongToAnotherApp := false
554if appName != "" {
555appKey := ctrl.toAppKey(appName)
556if _, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey); exists && err == nil {
557belongToAnotherApp = true
558}
559}
560
561if belongToAnotherApp {
562return false
563}
564
565permitted, _ := proj.IsResourcePermitted(schema.GroupKind{Group: child.ResourceRef.Group, Kind: child.ResourceRef.Kind}, child.Namespace, a.Spec.Destination, func(project string) ([]*appv1.Cluster, error) {
566return ctrl.db.GetProjectClusters(context.TODO(), project)
567})
568
569if !permitted {
570return false
571}
572orphanedNodes = append(orphanedNodes, child)
573return true
574})
575if err != nil {
576return nil, err
577}
578}
579}
580var conditions []appv1.ApplicationCondition
581if len(orphanedNodes) > 0 && warnOrphaned {
582conditions = []appv1.ApplicationCondition{{
583Type: appv1.ApplicationConditionOrphanedResourceWarning,
584Message: fmt.Sprintf("Application has %d orphaned resources", len(orphanedNodes)),
585}}
586}
587a.Status.SetConditions(conditions, map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionOrphanedResourceWarning: true})
588sort.Slice(orphanedNodes, func(i, j int) bool {
589return orphanedNodes[i].ResourceRef.String() < orphanedNodes[j].ResourceRef.String()
590})
591
592hosts, err := ctrl.getAppHosts(a, nodes)
593if err != nil {
594return nil, fmt.Errorf("failed to get app hosts: %w", err)
595}
596return &appv1.ApplicationTree{Nodes: nodes, OrphanedNodes: orphanedNodes, Hosts: hosts}, nil
597}
598
599func (ctrl *ApplicationController) getAppHosts(a *appv1.Application, appNodes []appv1.ResourceNode) ([]appv1.HostInfo, error) {
600supportedResourceNames := map[v1.ResourceName]bool{
601v1.ResourceCPU: true,
602v1.ResourceStorage: true,
603v1.ResourceMemory: true,
604}
605appPods := map[kube.ResourceKey]bool{}
606for _, node := range appNodes {
607if node.Group == "" && node.Kind == kube.PodKind {
608appPods[kube.NewResourceKey(node.Group, node.Kind, node.Namespace, node.Name)] = true
609}
610}
611
612allNodesInfo := map[string]statecache.NodeInfo{}
613allPodsByNode := map[string][]statecache.PodInfo{}
614appPodsByNode := map[string][]statecache.PodInfo{}
615err := ctrl.stateCache.IterateResources(a.Spec.Destination.Server, func(res *clustercache.Resource, info *statecache.ResourceInfo) {
616key := res.ResourceKey()
617
618switch {
619case info.NodeInfo != nil && key.Group == "" && key.Kind == "Node":
620allNodesInfo[key.Name] = *info.NodeInfo
621case info.PodInfo != nil && key.Group == "" && key.Kind == kube.PodKind:
622if appPods[key] {
623appPodsByNode[info.PodInfo.NodeName] = append(appPodsByNode[info.PodInfo.NodeName], *info.PodInfo)
624} else {
625allPodsByNode[info.PodInfo.NodeName] = append(allPodsByNode[info.PodInfo.NodeName], *info.PodInfo)
626}
627}
628})
629if err != nil {
630return nil, err
631}
632
633var hosts []appv1.HostInfo
634for nodeName, appPods := range appPodsByNode {
635node, ok := allNodesInfo[nodeName]
636if !ok {
637continue
638}
639
640neighbors := allPodsByNode[nodeName]
641
642resources := map[v1.ResourceName]appv1.HostResourceInfo{}
643for name, resource := range node.Capacity {
644info := resources[name]
645info.ResourceName = name
646info.Capacity += resource.MilliValue()
647resources[name] = info
648}
649
650for _, pod := range appPods {
651for name, resource := range pod.ResourceRequests {
652if !supportedResourceNames[name] {
653continue
654}
655
656info := resources[name]
657info.RequestedByApp += resource.MilliValue()
658resources[name] = info
659}
660}
661
662for _, pod := range neighbors {
663for name, resource := range pod.ResourceRequests {
664if !supportedResourceNames[name] || pod.Phase == v1.PodSucceeded || pod.Phase == v1.PodFailed {
665continue
666}
667info := resources[name]
668info.RequestedByNeighbors += resource.MilliValue()
669resources[name] = info
670}
671}
672
673var resourcesInfo []appv1.HostResourceInfo
674for _, info := range resources {
675if supportedResourceNames[info.ResourceName] && info.Capacity > 0 {
676resourcesInfo = append(resourcesInfo, info)
677}
678}
679sort.Slice(resourcesInfo, func(i, j int) bool {
680return resourcesInfo[i].ResourceName < resourcesInfo[j].ResourceName
681})
682hosts = append(hosts, appv1.HostInfo{Name: nodeName, SystemInfo: node.SystemInfo, ResourcesInfo: resourcesInfo})
683}
684return hosts, nil
685}
686
687func (ctrl *ApplicationController) hideSecretData(app *appv1.Application, comparisonResult *comparisonResult) ([]*appv1.ResourceDiff, error) {
688items := make([]*appv1.ResourceDiff, len(comparisonResult.managedResources))
689for i := range comparisonResult.managedResources {
690res := comparisonResult.managedResources[i]
691item := appv1.ResourceDiff{
692Namespace: res.Namespace,
693Name: res.Name,
694Group: res.Group,
695Kind: res.Kind,
696Hook: res.Hook,
697ResourceVersion: res.ResourceVersion,
698}
699
700target := res.Target
701live := res.Live
702resDiff := res.Diff
703if res.Kind == kube.SecretKind && res.Group == "" {
704var err error
705target, live, err = diff.HideSecretData(res.Target, res.Live)
706if err != nil {
707return nil, fmt.Errorf("error hiding secret data: %s", err)
708}
709compareOptions, err := ctrl.settingsMgr.GetResourceCompareOptions()
710if err != nil {
711return nil, fmt.Errorf("error getting resource compare options: %s", err)
712}
713resourceOverrides, err := ctrl.settingsMgr.GetResourceOverrides()
714if err != nil {
715return nil, fmt.Errorf("error getting resource overrides: %s", err)
716}
717appLabelKey, err := ctrl.settingsMgr.GetAppInstanceLabelKey()
718if err != nil {
719return nil, fmt.Errorf("error getting app instance label key: %s", err)
720}
721trackingMethod, err := ctrl.settingsMgr.GetTrackingMethod()
722if err != nil {
723return nil, fmt.Errorf("error getting tracking method: %s", err)
724}
725
726clusterCache, err := ctrl.stateCache.GetClusterCache(app.Spec.Destination.Server)
727if err != nil {
728return nil, fmt.Errorf("error getting cluster cache: %s", err)
729}
730diffConfig, err := argodiff.NewDiffConfigBuilder().
731WithDiffSettings(app.Spec.IgnoreDifferences, resourceOverrides, compareOptions.IgnoreAggregatedRoles).
732WithTracking(appLabelKey, trackingMethod).
733WithNoCache().
734WithLogger(logutils.NewLogrusLogger(logutils.NewWithCurrentConfig())).
735WithGVKParser(clusterCache.GetGVKParser()).
736Build()
737if err != nil {
738return nil, fmt.Errorf("appcontroller error building diff config: %s", err)
739}
740
741diffResult, err := argodiff.StateDiff(live, target, diffConfig)
742if err != nil {
743return nil, fmt.Errorf("error applying diff: %s", err)
744}
745resDiff = diffResult
746}
747
748if live != nil {
749data, err := json.Marshal(live)
750if err != nil {
751return nil, fmt.Errorf("error marshaling live json: %s", err)
752}
753item.LiveState = string(data)
754} else {
755item.LiveState = "null"
756}
757
758if target != nil {
759data, err := json.Marshal(target)
760if err != nil {
761return nil, fmt.Errorf("error marshaling target json: %s", err)
762}
763item.TargetState = string(data)
764} else {
765item.TargetState = "null"
766}
767item.PredictedLiveState = string(resDiff.PredictedLive)
768item.NormalizedLiveState = string(resDiff.NormalizedLive)
769item.Modified = resDiff.Modified
770
771items[i] = &item
772}
773return items, nil
774}
775
776// Run starts the Application CRD controller.
777func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
778defer runtime.HandleCrash()
779defer ctrl.appRefreshQueue.ShutDown()
780defer ctrl.appComparisonTypeRefreshQueue.ShutDown()
781defer ctrl.appOperationQueue.ShutDown()
782defer ctrl.projectRefreshQueue.ShutDown()
783
784ctrl.metricsServer.RegisterClustersInfoSource(ctx, ctrl.stateCache)
785ctrl.RegisterClusterSecretUpdater(ctx)
786
787go ctrl.appInformer.Run(ctx.Done())
788go ctrl.projInformer.Run(ctx.Done())
789
790if ctrl.dynamicClusterDistributionEnabled {
791// only start deployment informer if dynamic distribution is enabled
792go ctrl.deploymentInformer.Informer().Run(ctx.Done())
793}
794
795clusters, err := ctrl.db.ListClusters(ctx)
796if err != nil {
797log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err)
798} else {
799ctrl.clusterSharding.Init(clusters)
800}
801
802errors.CheckError(ctrl.stateCache.Init())
803
804if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
805log.Error("Timed out waiting for caches to sync")
806return
807}
808
809go func() { errors.CheckError(ctrl.stateCache.Run(ctx)) }()
810go func() { errors.CheckError(ctrl.metricsServer.ListenAndServe()) }()
811
812for i := 0; i < statusProcessors; i++ {
813go wait.Until(func() {
814for ctrl.processAppRefreshQueueItem() {
815}
816}, time.Second, ctx.Done())
817}
818
819for i := 0; i < operationProcessors; i++ {
820go wait.Until(func() {
821for ctrl.processAppOperationQueueItem() {
822}
823}, time.Second, ctx.Done())
824}
825
826go wait.Until(func() {
827for ctrl.processAppComparisonTypeQueueItem() {
828}
829}, time.Second, ctx.Done())
830
831go wait.Until(func() {
832for ctrl.processProjectQueueItem() {
833}
834}, time.Second, ctx.Done())
835<-ctx.Done()
836}
837
838// requestAppRefresh adds a request for given app to the refresh queue. appName
839// needs to be the qualified name of the application, i.e. <namespace>/<name>.
840func (ctrl *ApplicationController) requestAppRefresh(appName string, compareWith *CompareWith, after *time.Duration) {
841key := ctrl.toAppKey(appName)
842
843if compareWith != nil && after != nil {
844ctrl.appComparisonTypeRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", key, compareWith), *after)
845} else {
846if compareWith != nil {
847ctrl.refreshRequestedAppsMutex.Lock()
848ctrl.refreshRequestedApps[key] = compareWith.Max(ctrl.refreshRequestedApps[key])
849ctrl.refreshRequestedAppsMutex.Unlock()
850}
851if after != nil {
852ctrl.appRefreshQueue.AddAfter(key, *after)
853ctrl.appOperationQueue.AddAfter(key, *after)
854} else {
855ctrl.appRefreshQueue.AddRateLimited(key)
856ctrl.appOperationQueue.AddRateLimited(key)
857}
858}
859}
860
861func (ctrl *ApplicationController) isRefreshRequested(appName string) (bool, CompareWith) {
862ctrl.refreshRequestedAppsMutex.Lock()
863defer ctrl.refreshRequestedAppsMutex.Unlock()
864level, ok := ctrl.refreshRequestedApps[appName]
865if ok {
866delete(ctrl.refreshRequestedApps, appName)
867}
868return ok, level
869}
870
871func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) {
872appKey, shutdown := ctrl.appOperationQueue.Get()
873if shutdown {
874processNext = false
875return
876}
877processNext = true
878defer func() {
879if r := recover(); r != nil {
880log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
881}
882ctrl.appOperationQueue.Done(appKey)
883}()
884
885obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
886if err != nil {
887log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
888return
889}
890if !exists {
891// This happens after app was deleted, but the work queue still had an entry for it.
892return
893}
894origApp, ok := obj.(*appv1.Application)
895if !ok {
896log.Warnf("Key '%s' in index is not an application", appKey)
897return
898}
899app := origApp.DeepCopy()
900
901if app.Operation != nil {
902// If we get here, we are about to process an operation, but we cannot rely on informer since it might have stale data.
903// So always retrieve the latest version to ensure it is not stale to avoid unnecessary syncing.
904// We cannot rely on informer since applications might be updated by both application controller and api server.
905freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
906if err != nil {
907log.Errorf("Failed to retrieve latest application state: %v", err)
908return
909}
910app = freshApp
911}
912
913if app.Operation != nil {
914ctrl.processRequestedAppOperation(app)
915} else if app.DeletionTimestamp != nil {
916if err = ctrl.finalizeApplicationDeletion(app, func(project string) ([]*appv1.Cluster, error) {
917return ctrl.db.GetProjectClusters(context.Background(), project)
918}); err != nil {
919ctrl.setAppCondition(app, appv1.ApplicationCondition{
920Type: appv1.ApplicationConditionDeletionError,
921Message: err.Error(),
922})
923message := fmt.Sprintf("Unable to delete application resources: %v", err.Error())
924ctrl.auditLogger.LogAppEvent(app, argo.EventInfo{Reason: argo.EventReasonStatusRefreshed, Type: v1.EventTypeWarning}, message, "")
925}
926}
927return
928}
929
930func (ctrl *ApplicationController) processAppComparisonTypeQueueItem() (processNext bool) {
931key, shutdown := ctrl.appComparisonTypeRefreshQueue.Get()
932processNext = true
933
934defer func() {
935if r := recover(); r != nil {
936log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
937}
938ctrl.appComparisonTypeRefreshQueue.Done(key)
939}()
940if shutdown {
941processNext = false
942return
943}
944
945if parts := strings.Split(key.(string), "/"); len(parts) != 3 {
946log.Warnf("Unexpected key format in appComparisonTypeRefreshTypeQueue. Key should consists of namespace/name/comparisonType but got: %s", key.(string))
947} else {
948if compareWith, err := strconv.Atoi(parts[2]); err != nil {
949log.Warnf("Unable to parse comparison type: %v", err)
950return
951} else {
952ctrl.requestAppRefresh(ctrl.toAppQualifiedName(parts[1], parts[0]), CompareWith(compareWith).Pointer(), nil)
953}
954}
955return
956}
957
958func (ctrl *ApplicationController) processProjectQueueItem() (processNext bool) {
959key, shutdown := ctrl.projectRefreshQueue.Get()
960processNext = true
961
962defer func() {
963if r := recover(); r != nil {
964log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
965}
966ctrl.projectRefreshQueue.Done(key)
967}()
968if shutdown {
969processNext = false
970return
971}
972obj, exists, err := ctrl.projInformer.GetIndexer().GetByKey(key.(string))
973if err != nil {
974log.Errorf("Failed to get project '%s' from informer index: %+v", key, err)
975return
976}
977if !exists {
978// This happens after appproj was deleted, but the work queue still had an entry for it.
979return
980}
981origProj, ok := obj.(*appv1.AppProject)
982if !ok {
983log.Warnf("Key '%s' in index is not an appproject", key)
984return
985}
986
987if origProj.DeletionTimestamp != nil && origProj.HasFinalizer() {
988if err := ctrl.finalizeProjectDeletion(origProj.DeepCopy()); err != nil {
989log.Warnf("Failed to finalize project deletion: %v", err)
990}
991}
992return
993}
994
995func (ctrl *ApplicationController) finalizeProjectDeletion(proj *appv1.AppProject) error {
996apps, err := ctrl.appLister.Applications(ctrl.namespace).List(labels.Everything())
997if err != nil {
998return fmt.Errorf("error listing applications: %w", err)
999}
1000appsCount := 0
1001for i := range apps {
1002if apps[i].Spec.GetProject() == proj.Name {
1003appsCount++
1004}
1005}
1006if appsCount == 0 {
1007return ctrl.removeProjectFinalizer(proj)
1008} else {
1009log.Infof("Cannot remove project '%s' finalizer as is referenced by %d applications", proj.Name, appsCount)
1010}
1011return nil
1012}
1013
1014func (ctrl *ApplicationController) removeProjectFinalizer(proj *appv1.AppProject) error {
1015proj.RemoveFinalizer()
1016var patch []byte
1017patch, _ = json.Marshal(map[string]interface{}{
1018"metadata": map[string]interface{}{
1019"finalizers": proj.Finalizers,
1020},
1021})
1022_, err := ctrl.applicationClientset.ArgoprojV1alpha1().AppProjects(ctrl.namespace).Patch(context.Background(), proj.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1023return err
1024}
1025
1026// shouldBeDeleted returns whether a given resource obj should be deleted on cascade delete of application app
1027func (ctrl *ApplicationController) shouldBeDeleted(app *appv1.Application, obj *unstructured.Unstructured) bool {
1028return !kube.IsCRD(obj) && !isSelfReferencedApp(app, kube.GetObjectRef(obj)) &&
1029!resourceutil.HasAnnotationOption(obj, synccommon.AnnotationSyncOptions, synccommon.SyncOptionDisableDeletion) &&
1030!resourceutil.HasAnnotationOption(obj, helm.ResourcePolicyAnnotation, helm.ResourcePolicyKeep)
1031}
1032
1033func (ctrl *ApplicationController) getPermittedAppLiveObjects(app *appv1.Application, proj *appv1.AppProject, projectClusters func(project string) ([]*appv1.Cluster, error)) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
1034objsMap, err := ctrl.stateCache.GetManagedLiveObjs(app, []*unstructured.Unstructured{})
1035if err != nil {
1036return nil, err
1037}
1038// Don't delete live resources which are not permitted in the app project
1039for k, v := range objsMap {
1040permitted, err := proj.IsLiveResourcePermitted(v, app.Spec.Destination.Server, app.Spec.Destination.Name, projectClusters)
1041
1042if err != nil {
1043return nil, err
1044}
1045
1046if !permitted {
1047delete(objsMap, k)
1048}
1049}
1050return objsMap, nil
1051}
1052
1053func (ctrl *ApplicationController) isValidDestination(app *appv1.Application) (bool, *appv1.Cluster) {
1054// Validate the cluster using the Application destination's `name` field, if applicable,
1055// and set the Server field, if needed.
1056if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
1057log.Warnf("Unable to validate destination of the Application being deleted: %v", err)
1058return false, nil
1059}
1060
1061cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
1062if err != nil {
1063log.Warnf("Unable to locate cluster URL for Application being deleted: %v", err)
1064return false, nil
1065}
1066return true, cluster
1067}
1068
1069func (ctrl *ApplicationController) finalizeApplicationDeletion(app *appv1.Application, projectClusters func(project string) ([]*appv1.Cluster, error)) error {
1070logCtx := log.WithField("application", app.QualifiedName())
1071// Get refreshed application info, since informer app copy might be stale
1072app, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
1073if err != nil {
1074if !apierr.IsNotFound(err) {
1075logCtx.Errorf("Unable to get refreshed application info prior deleting resources: %v", err)
1076}
1077return nil
1078}
1079proj, err := ctrl.getAppProj(app)
1080if err != nil {
1081return err
1082}
1083
1084isValid, cluster := ctrl.isValidDestination(app)
1085if !isValid {
1086app.UnSetCascadedDeletion()
1087app.UnSetPostDeleteFinalizer()
1088if err := ctrl.updateFinalizers(app); err != nil {
1089return err
1090}
1091logCtx.Infof("Resource entries removed from undefined cluster")
1092return nil
1093}
1094config := metrics.AddMetricsTransportWrapper(ctrl.metricsServer, app, cluster.RESTConfig())
1095
1096if app.CascadedDeletion() {
1097logCtx.Infof("Deleting resources")
1098// ApplicationDestination points to a valid cluster, so we may clean up the live objects
1099objs := make([]*unstructured.Unstructured, 0)
1100objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1101if err != nil {
1102return err
1103}
1104
1105for k := range objsMap {
1106// Wait for objects pending deletion to complete before proceeding with next sync wave
1107if objsMap[k].GetDeletionTimestamp() != nil {
1108logCtx.Infof("%d objects remaining for deletion", len(objsMap))
1109return nil
1110}
1111
1112if ctrl.shouldBeDeleted(app, objsMap[k]) {
1113objs = append(objs, objsMap[k])
1114}
1115}
1116
1117filteredObjs := FilterObjectsForDeletion(objs)
1118
1119propagationPolicy := metav1.DeletePropagationForeground
1120if app.GetPropagationPolicy() == appv1.BackgroundPropagationPolicyFinalizer {
1121propagationPolicy = metav1.DeletePropagationBackground
1122}
1123logCtx.Infof("Deleting application's resources with %s propagation policy", propagationPolicy)
1124
1125err = kube.RunAllAsync(len(filteredObjs), func(i int) error {
1126obj := filteredObjs[i]
1127return ctrl.kubectl.DeleteResource(context.Background(), config, obj.GroupVersionKind(), obj.GetName(), obj.GetNamespace(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy})
1128})
1129if err != nil {
1130return err
1131}
1132
1133objsMap, err = ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1134if err != nil {
1135return err
1136}
1137
1138for k, obj := range objsMap {
1139if !ctrl.shouldBeDeleted(app, obj) {
1140delete(objsMap, k)
1141}
1142}
1143if len(objsMap) > 0 {
1144logCtx.Infof("%d objects remaining for deletion", len(objsMap))
1145return nil
1146}
1147logCtx.Infof("Successfully deleted %d resources", len(objs))
1148app.UnSetCascadedDeletion()
1149return ctrl.updateFinalizers(app)
1150}
1151
1152if app.HasPostDeleteFinalizer() {
1153objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1154if err != nil {
1155return err
1156}
1157
1158done, err := ctrl.executePostDeleteHooks(app, proj, objsMap, config, logCtx)
1159if err != nil {
1160return err
1161}
1162if !done {
1163return nil
1164}
1165app.UnSetPostDeleteFinalizer()
1166return ctrl.updateFinalizers(app)
1167}
1168
1169if app.HasPostDeleteFinalizer("cleanup") {
1170objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1171if err != nil {
1172return err
1173}
1174
1175done, err := ctrl.cleanupPostDeleteHooks(objsMap, config, logCtx)
1176if err != nil {
1177return err
1178}
1179if !done {
1180return nil
1181}
1182app.UnSetPostDeleteFinalizer("cleanup")
1183return ctrl.updateFinalizers(app)
1184}
1185
1186if !app.CascadedDeletion() && !app.HasPostDeleteFinalizer() {
1187if err := ctrl.cache.SetAppManagedResources(app.Name, nil); err != nil {
1188return err
1189}
1190
1191if err := ctrl.cache.SetAppResourcesTree(app.Name, nil); err != nil {
1192return err
1193}
1194ctrl.projectRefreshQueue.Add(fmt.Sprintf("%s/%s", ctrl.namespace, app.Spec.GetProject()))
1195}
1196
1197return nil
1198}
1199
1200func (ctrl *ApplicationController) updateFinalizers(app *appv1.Application) error {
1201_, err := ctrl.getAppProj(app)
1202if err != nil {
1203return fmt.Errorf("error getting project: %w", err)
1204}
1205
1206var patch []byte
1207patch, _ = json.Marshal(map[string]interface{}{
1208"metadata": map[string]interface{}{
1209"finalizers": app.Finalizers,
1210},
1211})
1212
1213_, err = ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1214return err
1215}
1216
1217func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condition appv1.ApplicationCondition) {
1218// do nothing if app already has same condition
1219for _, c := range app.Status.Conditions {
1220if c.Message == condition.Message && c.Type == condition.Type {
1221return
1222}
1223}
1224
1225app.Status.SetConditions([]appv1.ApplicationCondition{condition}, map[appv1.ApplicationConditionType]bool{condition.Type: true})
1226
1227var patch []byte
1228patch, err := json.Marshal(map[string]interface{}{
1229"status": map[string]interface{}{
1230"conditions": app.Status.Conditions,
1231},
1232})
1233if err == nil {
1234_, err = ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1235}
1236if err != nil {
1237log.Errorf("Unable to set application condition: %v", err)
1238}
1239}
1240
1241func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
1242logCtx := log.WithField("application", app.QualifiedName())
1243var state *appv1.OperationState
1244// Recover from any unexpected panics and automatically set the status to be failed
1245defer func() {
1246if r := recover(); r != nil {
1247logCtx.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
1248state.Phase = synccommon.OperationError
1249if rerr, ok := r.(error); ok {
1250state.Message = rerr.Error()
1251} else {
1252state.Message = fmt.Sprintf("%v", r)
1253}
1254ctrl.setOperationState(app, state)
1255}
1256}()
1257terminating := false
1258if isOperationInProgress(app) {
1259state = app.Status.OperationState.DeepCopy()
1260terminating = state.Phase == synccommon.OperationTerminating
1261// Failed operation with retry strategy might have be in-progress and has completion time
1262if state.FinishedAt != nil && !terminating {
1263retryAt, err := app.Status.OperationState.Operation.Retry.NextRetryAt(state.FinishedAt.Time, state.RetryCount)
1264if err != nil {
1265state.Phase = synccommon.OperationFailed
1266state.Message = err.Error()
1267ctrl.setOperationState(app, state)
1268return
1269}
1270retryAfter := time.Until(retryAt)
1271if retryAfter > 0 {
1272logCtx.Infof("Skipping retrying in-progress operation. Attempting again at: %s", retryAt.Format(time.RFC3339))
1273ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), &retryAfter)
1274return
1275} else {
1276// retrying operation. remove previous failure time in app since it is used as a trigger
1277// that previous failed and operation should be retried
1278state.FinishedAt = nil
1279ctrl.setOperationState(app, state)
1280// Get rid of sync results and null out previous operation completion time
1281state.SyncResult = nil
1282}
1283} else {
1284logCtx.Infof("Resuming in-progress operation. phase: %s, message: %s", state.Phase, state.Message)
1285}
1286} else {
1287state = &appv1.OperationState{Phase: synccommon.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
1288ctrl.setOperationState(app, state)
1289logCtx.Infof("Initialized new operation: %v", *app.Operation)
1290}
1291
1292if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
1293state.Phase = synccommon.OperationFailed
1294state.Message = err.Error()
1295} else {
1296ctrl.appStateManager.SyncAppState(app, state)
1297}
1298
1299// Check whether application is allowed to use project
1300_, err := ctrl.getAppProj(app)
1301if err != nil {
1302state.Phase = synccommon.OperationError
1303state.Message = err.Error()
1304}
1305
1306if state.Phase == synccommon.OperationRunning {
1307// It's possible for an app to be terminated while we were operating on it. We do not want
1308// to clobber the Terminated state with Running. Get the latest app state to check for this.
1309freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
1310if err == nil {
1311// App may have lost permissions to use the project meanwhile.
1312_, err = ctrl.getAppProj(freshApp)
1313if err != nil {
1314state.Phase = synccommon.OperationFailed
1315state.Message = fmt.Sprintf("operation not allowed: %v", err)
1316}
1317if freshApp.Status.OperationState != nil && freshApp.Status.OperationState.Phase == synccommon.OperationTerminating {
1318state.Phase = synccommon.OperationTerminating
1319state.Message = "operation is terminating"
1320// after this, we will get requeued to the workqueue, but next time the
1321// SyncAppState will operate in a Terminating phase, allowing the worker to perform
1322// cleanup (e.g. delete jobs, workflows, etc...)
1323}
1324}
1325} else if state.Phase == synccommon.OperationFailed || state.Phase == synccommon.OperationError {
1326if !terminating && (state.RetryCount < state.Operation.Retry.Limit || state.Operation.Retry.Limit < 0) {
1327now := metav1.Now()
1328state.FinishedAt = &now
1329if retryAt, err := state.Operation.Retry.NextRetryAt(now.Time, state.RetryCount); err != nil {
1330state.Phase = synccommon.OperationFailed
1331state.Message = fmt.Sprintf("%s (failed to retry: %v)", state.Message, err)
1332} else {
1333state.Phase = synccommon.OperationRunning
1334state.RetryCount++
1335state.Message = fmt.Sprintf("%s. Retrying attempt #%d at %s.", state.Message, state.RetryCount, retryAt.Format(time.Kitchen))
1336}
1337} else if state.RetryCount > 0 {
1338state.Message = fmt.Sprintf("%s (retried %d times).", state.Message, state.RetryCount)
1339}
1340
1341}
1342
1343ctrl.setOperationState(app, state)
1344if state.Phase.Completed() && (app.Operation.Sync != nil && !app.Operation.Sync.DryRun) {
1345// if we just completed an operation, force a refresh so that UI will report up-to-date
1346// sync/health information
1347if _, err := cache.MetaNamespaceKeyFunc(app); err == nil {
1348// force app refresh with using CompareWithLatest comparison type and trigger app reconciliation loop
1349ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), nil)
1350} else {
1351logCtx.Warnf("Fails to requeue application: %v", err)
1352}
1353}
1354}
1355
1356func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState) {
1357logCtx := log.WithFields(log.Fields{"application": app.Name, "appNamespace": app.Namespace, "project": app.Spec.Project})
1358
1359if state.Phase == "" {
1360// expose any bugs where we neglect to set phase
1361panic("no phase was set")
1362}
1363if state.Phase.Completed() {
1364now := metav1.Now()
1365state.FinishedAt = &now
1366}
1367patch := map[string]interface{}{
1368"status": map[string]interface{}{
1369"operationState": state,
1370},
1371}
1372if state.Phase.Completed() {
1373// If operation is completed, clear the operation field to indicate no operation is
1374// in progress.
1375patch["operation"] = nil
1376}
1377if reflect.DeepEqual(app.Status.OperationState, state) {
1378logCtx.Infof("No operation updates necessary to '%s'. Skipping patch", app.QualifiedName())
1379return
1380}
1381patchJSON, err := json.Marshal(patch)
1382if err != nil {
1383logCtx.Errorf("error marshaling json: %v", err)
1384return
1385}
1386if app.Status.OperationState != nil && app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
1387patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
1388if err != nil {
1389logCtx.Errorf("error merging operation state patch: %v", err)
1390return
1391}
1392}
1393
1394kube.RetryUntilSucceed(context.Background(), updateOperationStateTimeout, "Update application operation state", logutils.NewLogrusLogger(logutils.NewWithCurrentConfig()), func() error {
1395_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patchJSON, metav1.PatchOptions{})
1396if err != nil {
1397// Stop retrying updating deleted application
1398if apierr.IsNotFound(err) {
1399return nil
1400}
1401// kube.RetryUntilSucceed logs failed attempts at "debug" level, but we want to know if this fails. Log a
1402// warning.
1403logCtx.Warnf("error patching application with operation state: %v", err)
1404return fmt.Errorf("error patching application with operation state: %w", err)
1405}
1406return nil
1407})
1408
1409logCtx.Infof("updated '%s' operation (phase: %s)", app.QualifiedName(), state.Phase)
1410if state.Phase.Completed() {
1411eventInfo := argo.EventInfo{Reason: argo.EventReasonOperationCompleted}
1412var messages []string
1413if state.Operation.Sync != nil && len(state.Operation.Sync.Resources) > 0 {
1414messages = []string{"Partial sync operation"}
1415} else {
1416messages = []string{"Sync operation"}
1417}
1418if state.SyncResult != nil {
1419messages = append(messages, "to", state.SyncResult.Revision)
1420}
1421if state.Phase.Successful() {
1422eventInfo.Type = v1.EventTypeNormal
1423messages = append(messages, "succeeded")
1424} else {
1425eventInfo.Type = v1.EventTypeWarning
1426messages = append(messages, "failed:", state.Message)
1427}
1428ctrl.auditLogger.LogAppEvent(app, eventInfo, strings.Join(messages, " "), "")
1429ctrl.metricsServer.IncSync(app, state)
1430}
1431}
1432
1433// writeBackToInformer writes a just recently updated App back into the informer cache.
1434// This prevents the situation where the controller operates on a stale app and repeats work
1435func (ctrl *ApplicationController) writeBackToInformer(app *appv1.Application) {
1436logCtx := log.WithFields(log.Fields{"application": app.Name, "appNamespace": app.Namespace, "project": app.Spec.Project, "informer-writeBack": true})
1437err := ctrl.appInformer.GetStore().Update(app)
1438if err != nil {
1439logCtx.Errorf("failed to update informer store: %v", err)
1440return
1441}
1442}
1443
1444// PatchAppWithWriteBack patches an application and writes it back to the informer cache
1445func (ctrl *ApplicationController) PatchAppWithWriteBack(ctx context.Context, name, ns string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *appv1.Application, err error) {
1446patchedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ns).Patch(ctx, name, pt, data, opts, subresources...)
1447if err != nil {
1448return patchedApp, err
1449}
1450ctrl.writeBackToInformer(patchedApp)
1451return patchedApp, err
1452}
1453
1454func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
1455patchMs := time.Duration(0) // time spent in doing patch/update calls
1456setOpMs := time.Duration(0) // time spent in doing Operation patch calls in autosync
1457appKey, shutdown := ctrl.appRefreshQueue.Get()
1458if shutdown {
1459processNext = false
1460return
1461}
1462processNext = true
1463defer func() {
1464if r := recover(); r != nil {
1465log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
1466}
1467ctrl.appRefreshQueue.Done(appKey)
1468}()
1469obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
1470if err != nil {
1471log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
1472return
1473}
1474if !exists {
1475// This happens after app was deleted, but the work queue still had an entry for it.
1476return
1477}
1478origApp, ok := obj.(*appv1.Application)
1479if !ok {
1480log.Warnf("Key '%s' in index is not an application", appKey)
1481return
1482}
1483origApp = origApp.DeepCopy()
1484needRefresh, refreshType, comparisonLevel := ctrl.needRefreshAppStatus(origApp, ctrl.statusRefreshTimeout, ctrl.statusHardRefreshTimeout)
1485
1486if !needRefresh {
1487return
1488}
1489app := origApp.DeepCopy()
1490logCtx := log.WithFields(log.Fields{
1491"application": app.QualifiedName(),
1492"level": comparisonLevel,
1493"dest-server": origApp.Spec.Destination.Server,
1494"dest-name": origApp.Spec.Destination.Name,
1495"dest-namespace": origApp.Spec.Destination.Namespace,
1496})
1497
1498startTime := time.Now()
1499defer func() {
1500reconcileDuration := time.Since(startTime)
1501ctrl.metricsServer.IncReconcile(origApp, reconcileDuration)
1502logCtx.WithFields(log.Fields{
1503"time_ms": reconcileDuration.Milliseconds(),
1504"patch_ms": patchMs.Milliseconds(),
1505"setop_ms": setOpMs.Milliseconds(),
1506}).Info("Reconciliation completed")
1507}()
1508
1509if comparisonLevel == ComparisonWithNothing {
1510managedResources := make([]*appv1.ResourceDiff, 0)
1511if err := ctrl.cache.GetAppManagedResources(app.InstanceName(ctrl.namespace), &managedResources); err != nil {
1512logCtx.Warnf("Failed to get cached managed resources for tree reconciliation, fall back to full reconciliation")
1513} else {
1514var tree *appv1.ApplicationTree
1515if tree, err = ctrl.getResourceTree(app, managedResources); err == nil {
1516app.Status.Summary = tree.GetSummary(app)
1517if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), tree); err != nil {
1518logCtx.Errorf("Failed to cache resources tree: %v", err)
1519return
1520}
1521}
1522
1523patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1524return
1525}
1526}
1527
1528project, hasErrors := ctrl.refreshAppConditions(app)
1529if hasErrors {
1530app.Status.Sync.Status = appv1.SyncStatusCodeUnknown
1531app.Status.Health.Status = health.HealthStatusUnknown
1532patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1533
1534if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), &appv1.ApplicationTree{}); err != nil {
1535log.Warnf("failed to set app resource tree: %v", err)
1536}
1537if err := ctrl.cache.SetAppManagedResources(app.InstanceName(ctrl.namespace), nil); err != nil {
1538log.Warnf("failed to set app managed resources tree: %v", err)
1539}
1540return
1541}
1542
1543var localManifests []string
1544if opState := app.Status.OperationState; opState != nil && opState.Operation.Sync != nil {
1545localManifests = opState.Operation.Sync.Manifests
1546}
1547
1548revisions := make([]string, 0)
1549sources := make([]appv1.ApplicationSource, 0)
1550
1551hasMultipleSources := app.Spec.HasMultipleSources()
1552
1553// If we have multiple sources, we use all the sources under `sources` field and ignore source under `source` field.
1554// else we use the source under the source field.
1555if hasMultipleSources {
1556for _, source := range app.Spec.Sources {
1557// We do not perform any filtering of duplicate sources.
1558// Argo CD will apply and update the resources generated from the sources automatically
1559// based on the order in which manifests were generated
1560sources = append(sources, source)
1561revisions = append(revisions, source.TargetRevision)
1562}
1563if comparisonLevel == CompareWithRecent {
1564revisions = app.Status.Sync.Revisions
1565}
1566} else {
1567revision := app.Spec.GetSource().TargetRevision
1568if comparisonLevel == CompareWithRecent {
1569revision = app.Status.Sync.Revision
1570}
1571revisions = append(revisions, revision)
1572sources = append(sources, app.Spec.GetSource())
1573}
1574now := metav1.Now()
1575
1576compareResult, err := ctrl.appStateManager.CompareAppState(app, project, revisions, sources,
1577refreshType == appv1.RefreshTypeHard,
1578comparisonLevel == CompareWithLatestForceResolve, localManifests, hasMultipleSources)
1579
1580if goerrors.Is(err, CompareStateRepoError) {
1581logCtx.Warnf("Ignoring temporary failed attempt to compare app state against repo: %v", err)
1582return // short circuit if git error is encountered
1583}
1584
1585for k, v := range compareResult.timings {
1586logCtx = logCtx.WithField(k, v.Milliseconds())
1587}
1588
1589ctrl.normalizeApplication(origApp, app)
1590
1591tree, err := ctrl.setAppManagedResources(app, compareResult)
1592if err != nil {
1593logCtx.Errorf("Failed to cache app resources: %v", err)
1594} else {
1595app.Status.Summary = tree.GetSummary(app)
1596}
1597
1598if project.Spec.SyncWindows.Matches(app).CanSync(false) {
1599syncErrCond, opMS := ctrl.autoSync(app, compareResult.syncStatus, compareResult.resources)
1600setOpMs = opMS
1601if syncErrCond != nil {
1602app.Status.SetConditions(
1603[]appv1.ApplicationCondition{*syncErrCond},
1604map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
1605)
1606} else {
1607app.Status.SetConditions(
1608[]appv1.ApplicationCondition{},
1609map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
1610)
1611}
1612} else {
1613logCtx.Info("Sync prevented by sync window")
1614}
1615
1616if app.Status.ReconciledAt == nil || comparisonLevel >= CompareWithLatest {
1617app.Status.ReconciledAt = &now
1618}
1619app.Status.Sync = *compareResult.syncStatus
1620app.Status.Health = *compareResult.healthStatus
1621app.Status.Resources = compareResult.resources
1622sort.Slice(app.Status.Resources, func(i, j int) bool {
1623return resourceStatusKey(app.Status.Resources[i]) < resourceStatusKey(app.Status.Resources[j])
1624})
1625app.Status.SourceType = compareResult.appSourceType
1626app.Status.SourceTypes = compareResult.appSourceTypes
1627app.Status.ControllerNamespace = ctrl.namespace
1628patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1629if (compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer() || compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer("cleanup")) &&
1630app.GetDeletionTimestamp() == nil {
1631if compareResult.hasPostDeleteHooks {
1632app.SetPostDeleteFinalizer()
1633app.SetPostDeleteFinalizer("cleanup")
1634} else {
1635app.UnSetPostDeleteFinalizer()
1636app.UnSetPostDeleteFinalizer("cleanup")
1637}
1638
1639if err := ctrl.updateFinalizers(app); err != nil {
1640logCtx.Errorf("Failed to update finalizers: %v", err)
1641}
1642}
1643return
1644}
1645
1646func resourceStatusKey(res appv1.ResourceStatus) string {
1647return strings.Join([]string{res.Group, res.Kind, res.Namespace, res.Name}, "/")
1648}
1649
1650func currentSourceEqualsSyncedSource(app *appv1.Application) bool {
1651if app.Spec.HasMultipleSources() {
1652return app.Spec.Sources.Equals(app.Status.Sync.ComparedTo.Sources)
1653}
1654return app.Spec.Source.Equals(&app.Status.Sync.ComparedTo.Source)
1655}
1656
1657// needRefreshAppStatus answers if application status needs to be refreshed.
1658// Returns true if application never been compared, has changed or comparison result has expired.
1659// Additionally, it returns whether full refresh was requested or not.
1660// If full refresh is requested then target and live state should be reconciled, else only live state tree should be updated.
1661func (ctrl *ApplicationController) needRefreshAppStatus(app *appv1.Application, statusRefreshTimeout, statusHardRefreshTimeout time.Duration) (bool, appv1.RefreshType, CompareWith) {
1662logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1663var reason string
1664compareWith := CompareWithLatest
1665refreshType := appv1.RefreshTypeNormal
1666
1667softExpired := app.Status.ReconciledAt == nil || app.Status.ReconciledAt.Add(statusRefreshTimeout).Before(time.Now().UTC())
1668hardExpired := (app.Status.ReconciledAt == nil || app.Status.ReconciledAt.Add(statusHardRefreshTimeout).Before(time.Now().UTC())) && statusHardRefreshTimeout.Seconds() != 0
1669
1670if requestedType, ok := app.IsRefreshRequested(); ok {
1671compareWith = CompareWithLatestForceResolve
1672// user requested app refresh.
1673refreshType = requestedType
1674reason = fmt.Sprintf("%s refresh requested", refreshType)
1675} else {
1676if !currentSourceEqualsSyncedSource(app) {
1677reason = "spec.source differs"
1678compareWith = CompareWithLatestForceResolve
1679if app.Spec.HasMultipleSources() {
1680reason = "at least one of the spec.sources differs"
1681}
1682} else if hardExpired || softExpired {
1683// The commented line below mysteriously crashes if app.Status.ReconciledAt is nil
1684// reason = fmt.Sprintf("comparison expired. reconciledAt: %v, expiry: %v", app.Status.ReconciledAt, statusRefreshTimeout)
1685// TODO: find existing Golang bug or create a new one
1686reconciledAtStr := "never"
1687if app.Status.ReconciledAt != nil {
1688reconciledAtStr = app.Status.ReconciledAt.String()
1689}
1690reason = fmt.Sprintf("comparison expired, requesting refresh. reconciledAt: %v, expiry: %v", reconciledAtStr, statusRefreshTimeout)
1691if hardExpired {
1692reason = fmt.Sprintf("comparison expired, requesting hard refresh. reconciledAt: %v, expiry: %v", reconciledAtStr, statusHardRefreshTimeout)
1693refreshType = appv1.RefreshTypeHard
1694}
1695} else if !app.Spec.Destination.Equals(app.Status.Sync.ComparedTo.Destination) {
1696reason = "spec.destination differs"
1697} else if app.HasChangedManagedNamespaceMetadata() {
1698reason = "spec.syncPolicy.managedNamespaceMetadata differs"
1699} else if !app.Spec.IgnoreDifferences.Equals(app.Status.Sync.ComparedTo.IgnoreDifferences) {
1700reason = "spec.ignoreDifferences differs"
1701} else if requested, level := ctrl.isRefreshRequested(app.QualifiedName()); requested {
1702compareWith = level
1703reason = "controller refresh requested"
1704}
1705}
1706
1707if reason != "" {
1708logCtx.Infof("Refreshing app status (%s), level (%d)", reason, compareWith)
1709return true, refreshType, compareWith
1710}
1711return false, refreshType, compareWith
1712}
1713
1714func (ctrl *ApplicationController) refreshAppConditions(app *appv1.Application) (*appv1.AppProject, bool) {
1715errorConditions := make([]appv1.ApplicationCondition, 0)
1716proj, err := ctrl.getAppProj(app)
1717if err != nil {
1718errorConditions = append(errorConditions, ctrl.projectErrorToCondition(err, app))
1719} else {
1720specConditions, err := argo.ValidatePermissions(context.Background(), &app.Spec, proj, ctrl.db)
1721if err != nil {
1722errorConditions = append(errorConditions, appv1.ApplicationCondition{
1723Type: appv1.ApplicationConditionUnknownError,
1724Message: err.Error(),
1725})
1726} else {
1727errorConditions = append(errorConditions, specConditions...)
1728}
1729}
1730app.Status.SetConditions(errorConditions, map[appv1.ApplicationConditionType]bool{
1731appv1.ApplicationConditionInvalidSpecError: true,
1732appv1.ApplicationConditionUnknownError: true,
1733})
1734return proj, len(errorConditions) > 0
1735}
1736
1737// normalizeApplication normalizes an application.spec and additionally persists updates if it changed
1738func (ctrl *ApplicationController) normalizeApplication(orig, app *appv1.Application) {
1739logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1740app.Spec = *argo.NormalizeApplicationSpec(&app.Spec)
1741
1742patch, modified, err := diff.CreateTwoWayMergePatch(orig, app, appv1.Application{})
1743
1744if err != nil {
1745logCtx.Errorf("error constructing app spec patch: %v", err)
1746} else if modified {
1747_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
1748if err != nil {
1749logCtx.Errorf("Error persisting normalized application spec: %v", err)
1750} else {
1751logCtx.Infof("Normalized app spec: %s", string(patch))
1752}
1753}
1754}
1755
1756// persistAppStatus persists updates to application status. If no changes were made, it is a no-op
1757func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, newStatus *appv1.ApplicationStatus) (patchMs time.Duration) {
1758logCtx := log.WithFields(log.Fields{"application": orig.QualifiedName()})
1759if orig.Status.Sync.Status != newStatus.Sync.Status {
1760message := fmt.Sprintf("Updated sync status: %s -> %s", orig.Status.Sync.Status, newStatus.Sync.Status)
1761ctrl.auditLogger.LogAppEvent(orig, argo.EventInfo{Reason: argo.EventReasonResourceUpdated, Type: v1.EventTypeNormal}, message, "")
1762}
1763if orig.Status.Health.Status != newStatus.Health.Status {
1764message := fmt.Sprintf("Updated health status: %s -> %s", orig.Status.Health.Status, newStatus.Health.Status)
1765ctrl.auditLogger.LogAppEvent(orig, argo.EventInfo{Reason: argo.EventReasonResourceUpdated, Type: v1.EventTypeNormal}, message, "")
1766}
1767var newAnnotations map[string]string
1768if orig.GetAnnotations() != nil {
1769newAnnotations = make(map[string]string)
1770for k, v := range orig.GetAnnotations() {
1771newAnnotations[k] = v
1772}
1773delete(newAnnotations, appv1.AnnotationKeyRefresh)
1774}
1775patch, modified, err := diff.CreateTwoWayMergePatch(
1776&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: orig.GetAnnotations()}, Status: orig.Status},
1777&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: newAnnotations}, Status: *newStatus}, appv1.Application{})
1778if err != nil {
1779logCtx.Errorf("Error constructing app status patch: %v", err)
1780return
1781}
1782if !modified {
1783logCtx.Infof("No status changes. Skipping patch")
1784return
1785}
1786// calculate time for path call
1787start := time.Now()
1788defer func() {
1789patchMs = time.Since(start)
1790}()
1791_, err = ctrl.PatchAppWithWriteBack(context.Background(), orig.Name, orig.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
1792if err != nil {
1793logCtx.Warnf("Error updating application: %v", err)
1794} else {
1795logCtx.Infof("Update successful")
1796}
1797return patchMs
1798}
1799
1800// autoSync will initiate a sync operation for an application configured with automated sync
1801func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *appv1.SyncStatus, resources []appv1.ResourceStatus) (*appv1.ApplicationCondition, time.Duration) {
1802if app.Spec.SyncPolicy == nil || app.Spec.SyncPolicy.Automated == nil {
1803return nil, 0
1804}
1805logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1806
1807if app.Operation != nil {
1808logCtx.Infof("Skipping auto-sync: another operation is in progress")
1809return nil, 0
1810}
1811if app.DeletionTimestamp != nil && !app.DeletionTimestamp.IsZero() {
1812logCtx.Infof("Skipping auto-sync: deletion in progress")
1813return nil, 0
1814}
1815
1816// Only perform auto-sync if we detect OutOfSync status. This is to prevent us from attempting
1817// a sync when application is already in a Synced or Unknown state
1818if syncStatus.Status != appv1.SyncStatusCodeOutOfSync {
1819logCtx.Infof("Skipping auto-sync: application status is %s", syncStatus.Status)
1820return nil, 0
1821}
1822
1823if !app.Spec.SyncPolicy.Automated.Prune {
1824requirePruneOnly := true
1825for _, r := range resources {
1826if r.Status != appv1.SyncStatusCodeSynced && !r.RequiresPruning {
1827requirePruneOnly = false
1828break
1829}
1830}
1831if requirePruneOnly {
1832logCtx.Infof("Skipping auto-sync: need to prune extra resources only but automated prune is disabled")
1833return nil, 0
1834}
1835}
1836
1837desiredCommitSHA := syncStatus.Revision
1838desiredCommitSHAsMS := syncStatus.Revisions
1839alreadyAttempted, attemptPhase := alreadyAttemptedSync(app, desiredCommitSHA, desiredCommitSHAsMS, app.Spec.HasMultipleSources())
1840selfHeal := app.Spec.SyncPolicy.Automated.SelfHeal
1841op := appv1.Operation{
1842Sync: &appv1.SyncOperation{
1843Revision: desiredCommitSHA,
1844Prune: app.Spec.SyncPolicy.Automated.Prune,
1845SyncOptions: app.Spec.SyncPolicy.SyncOptions,
1846Revisions: desiredCommitSHAsMS,
1847},
1848InitiatedBy: appv1.OperationInitiator{Automated: true},
1849Retry: appv1.RetryStrategy{Limit: 5},
1850}
1851if app.Spec.SyncPolicy.Retry != nil {
1852op.Retry = *app.Spec.SyncPolicy.Retry
1853}
1854// It is possible for manifests to remain OutOfSync even after a sync/kubectl apply (e.g.
1855// auto-sync with pruning disabled). We need to ensure that we do not keep Syncing an
1856// application in an infinite loop. To detect this, we only attempt the Sync if the revision
1857// and parameter overrides are different from our most recent sync operation.
1858if alreadyAttempted && (!selfHeal || !attemptPhase.Successful()) {
1859if !attemptPhase.Successful() {
1860logCtx.Warnf("Skipping auto-sync: failed previous sync attempt to %s", desiredCommitSHA)
1861message := fmt.Sprintf("Failed sync attempt to %s: %s", desiredCommitSHA, app.Status.OperationState.Message)
1862return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: message}, 0
1863}
1864logCtx.Infof("Skipping auto-sync: most recent sync already to %s", desiredCommitSHA)
1865return nil, 0
1866} else if alreadyAttempted && selfHeal {
1867if shouldSelfHeal, retryAfter := ctrl.shouldSelfHeal(app); shouldSelfHeal {
1868for _, resource := range resources {
1869if resource.Status != appv1.SyncStatusCodeSynced {
1870op.Sync.Resources = append(op.Sync.Resources, appv1.SyncOperationResource{
1871Kind: resource.Kind,
1872Group: resource.Group,
1873Name: resource.Name,
1874})
1875}
1876}
1877} else {
1878logCtx.Infof("Skipping auto-sync: already attempted sync to %s with timeout %v (retrying in %v)", desiredCommitSHA, ctrl.selfHealTimeout, retryAfter)
1879ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), &retryAfter)
1880return nil, 0
1881}
1882
1883}
1884
1885if app.Spec.SyncPolicy.Automated.Prune && !app.Spec.SyncPolicy.Automated.AllowEmpty {
1886bAllNeedPrune := true
1887for _, r := range resources {
1888if !r.RequiresPruning {
1889bAllNeedPrune = false
1890}
1891}
1892if bAllNeedPrune {
1893message := fmt.Sprintf("Skipping sync attempt to %s: auto-sync will wipe out all resources", desiredCommitSHA)
1894logCtx.Warnf(message)
1895return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: message}, 0
1896}
1897}
1898
1899appIf := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
1900start := time.Now()
1901updatedApp, err := argo.SetAppOperation(appIf, app.Name, &op)
1902setOpTime := time.Since(start)
1903if err != nil {
1904if goerrors.Is(err, argo.ErrAnotherOperationInProgress) {
1905// skipping auto-sync because another operation is in progress and was not noticed due to stale data in informer
1906// it is safe to skip auto-sync because it is already running
1907logCtx.Warnf("Failed to initiate auto-sync to %s: %v", desiredCommitSHA, err)
1908return nil, 0
1909}
1910
1911logCtx.Errorf("Failed to initiate auto-sync to %s: %v", desiredCommitSHA, err)
1912return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: err.Error()}, setOpTime
1913} else {
1914ctrl.writeBackToInformer(updatedApp)
1915}
1916message := fmt.Sprintf("Initiated automated sync to '%s'", desiredCommitSHA)
1917ctrl.auditLogger.LogAppEvent(app, argo.EventInfo{Reason: argo.EventReasonOperationStarted, Type: v1.EventTypeNormal}, message, "")
1918logCtx.Info(message)
1919return nil, setOpTime
1920}
1921
1922// alreadyAttemptedSync returns whether the most recent sync was performed against the
1923// commitSHA and with the same app source config which are currently set in the app
1924func alreadyAttemptedSync(app *appv1.Application, commitSHA string, commitSHAsMS []string, hasMultipleSources bool) (bool, synccommon.OperationPhase) {
1925if app.Status.OperationState == nil || app.Status.OperationState.Operation.Sync == nil || app.Status.OperationState.SyncResult == nil {
1926return false, ""
1927}
1928if hasMultipleSources {
1929if !reflect.DeepEqual(app.Status.OperationState.SyncResult.Revisions, commitSHAsMS) {
1930return false, ""
1931}
1932} else {
1933if app.Status.OperationState.SyncResult.Revision != commitSHA {
1934return false, ""
1935}
1936}
1937
1938if hasMultipleSources {
1939// Ignore differences in target revision, since we already just verified commitSHAs are equal,
1940// and we do not want to trigger auto-sync due to things like HEAD != master
1941specSources := app.Spec.Sources.DeepCopy()
1942syncSources := app.Status.OperationState.SyncResult.Sources.DeepCopy()
1943for _, source := range specSources {
1944source.TargetRevision = ""
1945}
1946for _, source := range syncSources {
1947source.TargetRevision = ""
1948}
1949return reflect.DeepEqual(app.Spec.Sources, app.Status.OperationState.SyncResult.Sources), app.Status.OperationState.Phase
1950} else {
1951// Ignore differences in target revision, since we already just verified commitSHAs are equal,
1952// and we do not want to trigger auto-sync due to things like HEAD != master
1953specSource := app.Spec.Source.DeepCopy()
1954specSource.TargetRevision = ""
1955syncResSource := app.Status.OperationState.SyncResult.Source.DeepCopy()
1956syncResSource.TargetRevision = ""
1957return reflect.DeepEqual(app.Spec.GetSource(), app.Status.OperationState.SyncResult.Source), app.Status.OperationState.Phase
1958}
1959}
1960
1961func (ctrl *ApplicationController) shouldSelfHeal(app *appv1.Application) (bool, time.Duration) {
1962if app.Status.OperationState == nil {
1963return true, time.Duration(0)
1964}
1965
1966var retryAfter time.Duration
1967if app.Status.OperationState.FinishedAt == nil {
1968retryAfter = ctrl.selfHealTimeout
1969} else {
1970retryAfter = ctrl.selfHealTimeout - time.Since(app.Status.OperationState.FinishedAt.Time)
1971}
1972return retryAfter <= 0, retryAfter
1973}
1974
1975// isAppNamespaceAllowed returns whether the application is allowed in the
1976// namespace it's residing in.
1977func (ctrl *ApplicationController) isAppNamespaceAllowed(app *appv1.Application) bool {
1978return app.Namespace == ctrl.namespace || glob.MatchStringInList(ctrl.applicationNamespaces, app.Namespace, false)
1979}
1980
1981func (ctrl *ApplicationController) canProcessApp(obj interface{}) bool {
1982app, ok := obj.(*appv1.Application)
1983if !ok {
1984return false
1985}
1986
1987// Only process given app if it exists in a watched namespace, or in the
1988// control plane's namespace.
1989if !ctrl.isAppNamespaceAllowed(app) {
1990return false
1991}
1992
1993if annotations := app.GetAnnotations(); annotations != nil {
1994if skipVal, ok := annotations[common.AnnotationKeyAppSkipReconcile]; ok {
1995logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1996if skipReconcile, err := strconv.ParseBool(skipVal); err == nil {
1997if skipReconcile {
1998logCtx.Debugf("Skipping Application reconcile based on annotation %s", common.AnnotationKeyAppSkipReconcile)
1999return false
2000}
2001} else {
2002logCtx.Debugf("Unable to determine if Application should skip reconcile based on annotation %s: %v", common.AnnotationKeyAppSkipReconcile, err)
2003}
2004}
2005}
2006
2007cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
2008if err != nil {
2009return ctrl.clusterSharding.IsManagedCluster(nil)
2010}
2011return ctrl.clusterSharding.IsManagedCluster(cluster)
2012}
2013
2014func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
2015watchNamespace := ctrl.namespace
2016// If we have at least one additional namespace configured, we need to
2017// watch on them all.
2018if len(ctrl.applicationNamespaces) > 0 {
2019watchNamespace = ""
2020}
2021refreshTimeout := ctrl.statusRefreshTimeout
2022if ctrl.statusHardRefreshTimeout.Seconds() != 0 && (ctrl.statusHardRefreshTimeout < ctrl.statusRefreshTimeout) {
2023refreshTimeout = ctrl.statusHardRefreshTimeout
2024}
2025informer := cache.NewSharedIndexInformer(
2026&cache.ListWatch{
2027ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
2028// We are only interested in apps that exist in namespaces the
2029// user wants to be enabled.
2030appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options)
2031if err != nil {
2032return nil, err
2033}
2034newItems := []appv1.Application{}
2035for _, app := range appList.Items {
2036if ctrl.isAppNamespaceAllowed(&app) {
2037newItems = append(newItems, app)
2038}
2039}
2040appList.Items = newItems
2041return appList, nil
2042},
2043WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
2044return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).Watch(context.TODO(), options)
2045},
2046},
2047&appv1.Application{},
2048refreshTimeout,
2049cache.Indexers{
2050cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
2051app, ok := obj.(*appv1.Application)
2052if ok {
2053// We only generally work with applications that are in one
2054// the allowed namespaces.
2055if ctrl.isAppNamespaceAllowed(app) {
2056// If the application is not allowed to use the project,
2057// log an error.
2058if _, err := ctrl.getAppProj(app); err != nil {
2059ctrl.setAppCondition(app, ctrl.projectErrorToCondition(err, app))
2060} else {
2061// This call to 'ValidateDestination' ensures that the .spec.destination field of all Applications
2062// returned by the informer/lister will have server field set (if not already set) based on the name.
2063// (or, if not found, an error app condition)
2064
2065// If the server field is not set, set it based on the cluster name; if the cluster name can't be found,
2066// log an error as an App Condition.
2067if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
2068ctrl.setAppCondition(app, appv1.ApplicationCondition{Type: appv1.ApplicationConditionInvalidSpecError, Message: err.Error()})
2069}
2070}
2071}
2072}
2073
2074return cache.MetaNamespaceIndexFunc(obj)
2075},
2076orphanedIndex: func(obj interface{}) (i []string, e error) {
2077app, ok := obj.(*appv1.Application)
2078if !ok {
2079return nil, nil
2080}
2081
2082if !ctrl.isAppNamespaceAllowed(app) {
2083return nil, nil
2084}
2085
2086proj, err := ctrl.getAppProj(app)
2087if err != nil {
2088return nil, nil
2089}
2090if proj.Spec.OrphanedResources != nil {
2091return []string{app.Spec.Destination.Namespace}, nil
2092}
2093return nil, nil
2094},
2095},
2096)
2097lister := applisters.NewApplicationLister(informer.GetIndexer())
2098_, err := informer.AddEventHandler(
2099cache.ResourceEventHandlerFuncs{
2100AddFunc: func(obj interface{}) {
2101if !ctrl.canProcessApp(obj) {
2102return
2103}
2104key, err := cache.MetaNamespaceKeyFunc(obj)
2105if err == nil {
2106ctrl.appRefreshQueue.AddRateLimited(key)
2107ctrl.appOperationQueue.AddRateLimited(key)
2108}
2109},
2110UpdateFunc: func(old, new interface{}) {
2111if !ctrl.canProcessApp(new) {
2112return
2113}
2114
2115key, err := cache.MetaNamespaceKeyFunc(new)
2116if err != nil {
2117return
2118}
2119
2120var compareWith *CompareWith
2121var delay *time.Duration
2122
2123oldApp, oldOK := old.(*appv1.Application)
2124newApp, newOK := new.(*appv1.Application)
2125if oldOK && newOK {
2126if automatedSyncEnabled(oldApp, newApp) {
2127log.WithField("application", newApp.QualifiedName()).Info("Enabled automated sync")
2128compareWith = CompareWithLatest.Pointer()
2129}
2130if ctrl.statusRefreshJitter != 0 && oldApp.ResourceVersion == newApp.ResourceVersion {
2131// Handler is refreshing the apps, add a random jitter to spread the load and avoid spikes
2132jitter := time.Duration(float64(ctrl.statusRefreshJitter) * rand.Float64())
2133delay = &jitter
2134}
2135}
2136
2137ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay)
2138ctrl.appOperationQueue.AddRateLimited(key)
2139},
2140DeleteFunc: func(obj interface{}) {
2141if !ctrl.canProcessApp(obj) {
2142return
2143}
2144// IndexerInformer uses a delta queue, therefore for deletes we have to use this
2145// key function.
2146key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
2147if err == nil {
2148// for deletes, we immediately add to the refresh queue
2149ctrl.appRefreshQueue.Add(key)
2150}
2151},
2152},
2153)
2154if err != nil {
2155return nil, nil
2156}
2157return informer, lister
2158}
2159
2160func (ctrl *ApplicationController) projectErrorToCondition(err error, app *appv1.Application) appv1.ApplicationCondition {
2161var condition appv1.ApplicationCondition
2162if apierr.IsNotFound(err) {
2163condition = appv1.ApplicationCondition{
2164Type: appv1.ApplicationConditionInvalidSpecError,
2165Message: fmt.Sprintf("Application referencing project %s which does not exist", app.Spec.Project),
2166}
2167} else {
2168condition = appv1.ApplicationCondition{Type: appv1.ApplicationConditionUnknownError, Message: err.Error()}
2169}
2170return condition
2171}
2172
2173func (ctrl *ApplicationController) RegisterClusterSecretUpdater(ctx context.Context) {
2174updater := NewClusterInfoUpdater(ctrl.stateCache, ctrl.db, ctrl.appLister.Applications(""), ctrl.cache, ctrl.clusterSharding.IsManagedCluster, ctrl.getAppProj, ctrl.namespace)
2175go updater.Run(ctx)
2176}
2177
2178func isOperationInProgress(app *appv1.Application) bool {
2179return app.Status.OperationState != nil && !app.Status.OperationState.Phase.Completed()
2180}
2181
2182// automatedSyncEnabled tests if an app went from auto-sync disabled to enabled.
2183// if it was toggled to be enabled, the informer handler will force a refresh
2184func automatedSyncEnabled(oldApp *appv1.Application, newApp *appv1.Application) bool {
2185oldEnabled := false
2186oldSelfHealEnabled := false
2187if oldApp.Spec.SyncPolicy != nil && oldApp.Spec.SyncPolicy.Automated != nil {
2188oldEnabled = true
2189oldSelfHealEnabled = oldApp.Spec.SyncPolicy.Automated.SelfHeal
2190}
2191
2192newEnabled := false
2193newSelfHealEnabled := false
2194if newApp.Spec.SyncPolicy != nil && newApp.Spec.SyncPolicy.Automated != nil {
2195newEnabled = true
2196newSelfHealEnabled = newApp.Spec.SyncPolicy.Automated.SelfHeal
2197}
2198if !oldEnabled && newEnabled {
2199return true
2200}
2201if !oldSelfHealEnabled && newSelfHealEnabled {
2202return true
2203}
2204// nothing changed
2205return false
2206}
2207
2208// toAppKey returns the application key from a given appName, that is, it will
2209// replace underscores with forward-slashes to become a <namespace>/<name>
2210// format. If the appName is an unqualified name (such as, "app"), it will use
2211// the controller's namespace in the key.
2212func (ctrl *ApplicationController) toAppKey(appName string) string {
2213if !strings.Contains(appName, "_") && !strings.Contains(appName, "/") {
2214return ctrl.namespace + "/" + appName
2215} else if strings.Contains(appName, "/") {
2216return appName
2217} else {
2218return strings.ReplaceAll(appName, "_", "/")
2219}
2220}
2221
2222func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace string) string {
2223return fmt.Sprintf("%s/%s", appNamespace, appName)
2224}
2225
2226type ClusterFilterFunction func(c *appv1.Cluster, distributionFunction sharding.DistributionFunction) bool
2227