argo-cd

Форк
0
/
appcontroller.go 
2226 строк · 82.3 Кб
1
package controller
2

3
import (
4
	"context"
5
	"encoding/json"
6
	goerrors "errors"
7
	"fmt"
8
	"math"
9
	"math/rand"
10
	"net/http"
11
	"reflect"
12
	"runtime/debug"
13
	"sort"
14
	"strconv"
15
	"strings"
16
	"sync"
17
	"time"
18

19
	clustercache "github.com/argoproj/gitops-engine/pkg/cache"
20
	"github.com/argoproj/gitops-engine/pkg/diff"
21
	"github.com/argoproj/gitops-engine/pkg/health"
22
	synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
23
	resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource"
24
	"github.com/argoproj/gitops-engine/pkg/utils/kube"
25
	jsonpatch "github.com/evanphx/json-patch"
26
	log "github.com/sirupsen/logrus"
27
	"golang.org/x/sync/semaphore"
28
	v1 "k8s.io/api/core/v1"
29
	apierr "k8s.io/apimachinery/pkg/api/errors"
30
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32
	"k8s.io/apimachinery/pkg/labels"
33
	apiruntime "k8s.io/apimachinery/pkg/runtime"
34
	"k8s.io/apimachinery/pkg/runtime/schema"
35
	"k8s.io/apimachinery/pkg/types"
36
	"k8s.io/apimachinery/pkg/util/runtime"
37
	"k8s.io/apimachinery/pkg/util/wait"
38
	"k8s.io/apimachinery/pkg/watch"
39
	"k8s.io/client-go/informers"
40
	informerv1 "k8s.io/client-go/informers/apps/v1"
41
	"k8s.io/client-go/kubernetes"
42
	"k8s.io/client-go/tools/cache"
43
	"k8s.io/client-go/util/workqueue"
44

45
	"github.com/argoproj/argo-cd/v2/common"
46
	statecache "github.com/argoproj/argo-cd/v2/controller/cache"
47
	"github.com/argoproj/argo-cd/v2/controller/metrics"
48
	"github.com/argoproj/argo-cd/v2/controller/sharding"
49
	"github.com/argoproj/argo-cd/v2/pkg/apis/application"
50
	appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
51
	appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
52
	"github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1"
53
	applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
54
	"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
55
	"github.com/argoproj/argo-cd/v2/util/argo"
56
	argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
57
	"github.com/argoproj/argo-cd/v2/util/env"
58

59
	kubeerrors "k8s.io/apimachinery/pkg/api/errors"
60

61
	"github.com/argoproj/argo-cd/v2/pkg/ratelimiter"
62
	appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
63
	"github.com/argoproj/argo-cd/v2/util/db"
64
	"github.com/argoproj/argo-cd/v2/util/errors"
65
	"github.com/argoproj/argo-cd/v2/util/glob"
66
	"github.com/argoproj/argo-cd/v2/util/helm"
67
	logutils "github.com/argoproj/argo-cd/v2/util/log"
68
	settings_util "github.com/argoproj/argo-cd/v2/util/settings"
69
)
70

71
const (
72
	updateOperationStateTimeout             = 1 * time.Second
73
	defaultDeploymentInformerResyncDuration = 10 * time.Second
74
	// orphanedIndex contains application which monitor orphaned resources by namespace
75
	orphanedIndex = "orphaned"
76
)
77

78
type CompareWith int
79

80
const (
81
	// Compare live application state against state defined in latest git revision with no resolved revision caching.
82
	CompareWithLatestForceResolve CompareWith = 3
83
	// Compare live application state against state defined in latest git revision.
84
	CompareWithLatest CompareWith = 2
85
	// Compare live application state against state defined using revision of most recent comparison.
86
	CompareWithRecent CompareWith = 1
87
	// Skip comparison and only refresh application resources tree
88
	ComparisonWithNothing CompareWith = 0
89
)
90

91
func (a CompareWith) Max(b CompareWith) CompareWith {
92
	return CompareWith(math.Max(float64(a), float64(b)))
93
}
94

95
func (a CompareWith) Pointer() *CompareWith {
96
	return &a
97
}
98

99
// ApplicationController is the controller for application resources.
100
type ApplicationController struct {
101
	cache                *appstatecache.Cache
102
	namespace            string
103
	kubeClientset        kubernetes.Interface
104
	kubectl              kube.Kubectl
105
	applicationClientset appclientset.Interface
106
	auditLogger          *argo.AuditLogger
107
	// queue contains app namespace/name
108
	appRefreshQueue workqueue.RateLimitingInterface
109
	// queue contains app namespace/name/comparisonType and used to request app refresh with the predefined comparison type
110
	appComparisonTypeRefreshQueue workqueue.RateLimitingInterface
111
	appOperationQueue             workqueue.RateLimitingInterface
112
	projectRefreshQueue           workqueue.RateLimitingInterface
113
	appInformer                   cache.SharedIndexInformer
114
	appLister                     applisters.ApplicationLister
115
	projInformer                  cache.SharedIndexInformer
116
	appStateManager               AppStateManager
117
	stateCache                    statecache.LiveStateCache
118
	statusRefreshTimeout          time.Duration
119
	statusHardRefreshTimeout      time.Duration
120
	statusRefreshJitter           time.Duration
121
	selfHealTimeout               time.Duration
122
	repoClientset                 apiclient.Clientset
123
	db                            db.ArgoDB
124
	settingsMgr                   *settings_util.SettingsManager
125
	refreshRequestedApps          map[string]CompareWith
126
	refreshRequestedAppsMutex     *sync.Mutex
127
	metricsServer                 *metrics.MetricsServer
128
	kubectlSemaphore              *semaphore.Weighted
129
	clusterSharding               sharding.ClusterShardingCache
130
	projByNameCache               sync.Map
131
	applicationNamespaces         []string
132

133
	// dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized
134
	dynamicClusterDistributionEnabled bool
135
	deploymentInformer                informerv1.DeploymentInformer
136
}
137

138
// NewApplicationController creates new instance of ApplicationController.
139
func NewApplicationController(
140
	namespace string,
141
	settingsMgr *settings_util.SettingsManager,
142
	kubeClientset kubernetes.Interface,
143
	applicationClientset appclientset.Interface,
144
	repoClientset apiclient.Clientset,
145
	argoCache *appstatecache.Cache,
146
	kubectl kube.Kubectl,
147
	appResyncPeriod time.Duration,
148
	appHardResyncPeriod time.Duration,
149
	appResyncJitter time.Duration,
150
	selfHealTimeout time.Duration,
151
	repoErrorGracePeriod time.Duration,
152
	metricsPort int,
153
	metricsCacheExpiration time.Duration,
154
	metricsApplicationLabels []string,
155
	kubectlParallelismLimit int64,
156
	persistResourceHealth bool,
157
	clusterSharding sharding.ClusterShardingCache,
158
	applicationNamespaces []string,
159
	rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
160
	serverSideDiff bool,
161
	dynamicClusterDistributionEnabled bool,
162
) (*ApplicationController, error) {
163
	log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter)
164
	db := db.NewDB(namespace, settingsMgr, kubeClientset)
165
	if rateLimiterConfig == nil {
166
		rateLimiterConfig = ratelimiter.GetDefaultAppRateLimiterConfig()
167
		log.Info("Using default workqueue rate limiter config")
168
	}
169
	ctrl := ApplicationController{
170
		cache:                             argoCache,
171
		namespace:                         namespace,
172
		kubeClientset:                     kubeClientset,
173
		kubectl:                           kubectl,
174
		applicationClientset:              applicationClientset,
175
		repoClientset:                     repoClientset,
176
		appRefreshQueue:                   workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
177
		appOperationQueue:                 workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
178
		projectRefreshQueue:               workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
179
		appComparisonTypeRefreshQueue:     workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
180
		db:                                db,
181
		statusRefreshTimeout:              appResyncPeriod,
182
		statusHardRefreshTimeout:          appHardResyncPeriod,
183
		statusRefreshJitter:               appResyncJitter,
184
		refreshRequestedApps:              make(map[string]CompareWith),
185
		refreshRequestedAppsMutex:         &sync.Mutex{},
186
		auditLogger:                       argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
187
		settingsMgr:                       settingsMgr,
188
		selfHealTimeout:                   selfHealTimeout,
189
		clusterSharding:                   clusterSharding,
190
		projByNameCache:                   sync.Map{},
191
		applicationNamespaces:             applicationNamespaces,
192
		dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled,
193
	}
194
	if kubectlParallelismLimit > 0 {
195
		ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit)
196
	}
197
	kubectl.SetOnKubectlRun(ctrl.onKubectlRun)
198
	appInformer, appLister := ctrl.newApplicationInformerAndLister()
199
	indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
200
	projInformer := v1alpha1.NewAppProjectInformer(applicationClientset, namespace, appResyncPeriod, indexers)
201
	var err error
202
	_, err = projInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
203
		AddFunc: func(obj interface{}) {
204
			if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
205
				ctrl.projectRefreshQueue.AddRateLimited(key)
206
				if projMeta, ok := obj.(metav1.Object); ok {
207
					ctrl.InvalidateProjectsCache(projMeta.GetName())
208
				}
209

210
			}
211
		},
212
		UpdateFunc: func(old, new interface{}) {
213
			if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
214
				ctrl.projectRefreshQueue.AddRateLimited(key)
215
				if projMeta, ok := new.(metav1.Object); ok {
216
					ctrl.InvalidateProjectsCache(projMeta.GetName())
217
				}
218
			}
219
		},
220
		DeleteFunc: func(obj interface{}) {
221
			if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
222
				// immediately push to queue for deletes
223
				ctrl.projectRefreshQueue.Add(key)
224
				if projMeta, ok := obj.(metav1.Object); ok {
225
					ctrl.InvalidateProjectsCache(projMeta.GetName())
226
				}
227
			}
228
		},
229
	})
230
	if err != nil {
231
		return nil, err
232
	}
233

234
	factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace()))
235

236
	var deploymentInformer informerv1.DeploymentInformer
237

238
	// only initialize deployment informer if dynamic distribution is enabled
239
	if dynamicClusterDistributionEnabled {
240
		deploymentInformer = factory.Apps().V1().Deployments()
241
	}
242

243
	readinessHealthCheck := func(r *http.Request) error {
244
		if dynamicClusterDistributionEnabled {
245
			applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
246
			appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
247
			if err != nil {
248
				if kubeerrors.IsNotFound(err) {
249
					appControllerDeployment = nil
250
				} else {
251
					return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
252
				}
253
			}
254
			if appControllerDeployment != nil {
255
				if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
256
					return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
257
				}
258
				shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
259
				if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
260
					return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
261
				}
262
			}
263
		}
264
		return nil
265
	}
266

267
	metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
268

269
	ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels)
270
	if err != nil {
271
		return nil, err
272
	}
273
	if metricsCacheExpiration.Seconds() != 0 {
274
		err = ctrl.metricsServer.SetExpiration(metricsCacheExpiration)
275
		if err != nil {
276
			return nil, err
277
		}
278
	}
279
	stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterSharding, argo.NewResourceTracking())
280
	appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth, repoErrorGracePeriod, serverSideDiff)
281
	ctrl.appInformer = appInformer
282
	ctrl.appLister = appLister
283
	ctrl.projInformer = projInformer
284
	ctrl.deploymentInformer = deploymentInformer
285
	ctrl.appStateManager = appStateManager
286
	ctrl.stateCache = stateCache
287

288
	return &ctrl, nil
289
}
290

291
func (ctrl *ApplicationController) InvalidateProjectsCache(names ...string) {
292
	if len(names) > 0 {
293
		for _, name := range names {
294
			ctrl.projByNameCache.Delete(name)
295
		}
296
	} else {
297
		if ctrl != nil {
298
			ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
299
				ctrl.projByNameCache.Delete(key)
300
				return true
301
			})
302
		}
303
	}
304
}
305

306
func (ctrl *ApplicationController) GetMetricsServer() *metrics.MetricsServer {
307
	return ctrl.metricsServer
308
}
309

310
func (ctrl *ApplicationController) onKubectlRun(command string) (kube.CleanupFunc, error) {
311
	ctrl.metricsServer.IncKubectlExec(command)
312
	if ctrl.kubectlSemaphore != nil {
313
		if err := ctrl.kubectlSemaphore.Acquire(context.Background(), 1); err != nil {
314
			return nil, err
315
		}
316
		ctrl.metricsServer.IncKubectlExecPending(command)
317
	}
318
	return func() {
319
		if ctrl.kubectlSemaphore != nil {
320
			ctrl.kubectlSemaphore.Release(1)
321
			ctrl.metricsServer.DecKubectlExecPending(command)
322
		}
323
	}, nil
324
}
325

326
func isSelfReferencedApp(app *appv1.Application, ref v1.ObjectReference) bool {
327
	gvk := ref.GroupVersionKind()
328
	return ref.UID == app.UID &&
329
		ref.Name == app.Name &&
330
		ref.Namespace == app.Namespace &&
331
		gvk.Group == application.Group &&
332
		gvk.Kind == application.ApplicationKind
333
}
334

335
func (ctrl *ApplicationController) newAppProjCache(name string) *appProjCache {
336
	return &appProjCache{name: name, ctrl: ctrl}
337
}
338

339
type appProjCache struct {
340
	name string
341
	ctrl *ApplicationController
342

343
	lock    sync.Mutex
344
	appProj *appv1.AppProject
345
}
346

347
// GetAppProject gets an AppProject from the cache. If the AppProject is not
348
// yet cached, retrieves the AppProject from the K8s control plane and stores
349
// in the cache.
350
func (projCache *appProjCache) GetAppProject(ctx context.Context) (*appv1.AppProject, error) {
351
	projCache.lock.Lock()
352
	defer projCache.lock.Unlock()
353
	if projCache.appProj != nil {
354
		return projCache.appProj, nil
355
	}
356
	proj, err := argo.GetAppProjectByName(projCache.name, applisters.NewAppProjectLister(projCache.ctrl.projInformer.GetIndexer()), projCache.ctrl.namespace, projCache.ctrl.settingsMgr, projCache.ctrl.db, ctx)
357
	if err != nil {
358
		return nil, err
359
	}
360
	projCache.appProj = proj
361
	return projCache.appProj, nil
362
}
363

364
// getAppProj gets the AppProject for the given Application app.
365
func (ctrl *ApplicationController) getAppProj(app *appv1.Application) (*appv1.AppProject, error) {
366
	projCache, _ := ctrl.projByNameCache.LoadOrStore(app.Spec.GetProject(), ctrl.newAppProjCache(app.Spec.GetProject()))
367
	proj, err := projCache.(*appProjCache).GetAppProject(context.TODO())
368
	if err != nil {
369
		if apierr.IsNotFound(err) {
370
			return nil, err
371
		} else {
372
			return nil, fmt.Errorf("could not retrieve AppProject '%s' from cache: %v", app.Spec.Project, err)
373
		}
374
	}
375
	if !proj.IsAppNamespacePermitted(app, ctrl.namespace) {
376
		return nil, argo.ErrProjectNotPermitted(app.GetName(), app.GetNamespace(), proj.GetName())
377
	}
378
	return proj, nil
379
}
380

381
func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]bool, ref v1.ObjectReference) {
382
	// if namespaced resource is not managed by any app it might be orphaned resource of some other apps
383
	if len(managedByApp) == 0 && ref.Namespace != "" {
384
		// retrieve applications which monitor orphaned resources in the same namespace and refresh them unless resource is denied in app project
385
		if objs, err := ctrl.appInformer.GetIndexer().ByIndex(orphanedIndex, ref.Namespace); err == nil {
386
			for i := range objs {
387
				app, ok := objs[i].(*appv1.Application)
388
				if !ok {
389
					continue
390
				}
391

392
				managedByApp[app.InstanceName(ctrl.namespace)] = true
393
			}
394
		}
395
	}
396
	for appName, isManagedResource := range managedByApp {
397
		// The appName is given as <namespace>_<name>, but the indexer needs it
398
		// format <namespace>/<name>
399
		appKey := ctrl.toAppKey(appName)
400
		obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
401
		app, ok := obj.(*appv1.Application)
402
		if exists && err == nil && ok && isSelfReferencedApp(app, ref) {
403
			// Don't force refresh app if related resource is application itself. This prevents infinite reconciliation loop.
404
			continue
405
		}
406

407
		if !ctrl.canProcessApp(obj) {
408
			// Don't force refresh app if app belongs to a different controller shard or is outside the allowed namespaces.
409
			continue
410
		}
411

412
		// Enforce application's permission for the source namespace
413
		_, err = ctrl.getAppProj(app)
414
		if err != nil {
415
			log.Errorf("Unable to determine project for app '%s': %v", app.QualifiedName(), err)
416
			continue
417
		}
418

419
		level := ComparisonWithNothing
420
		if isManagedResource {
421
			level = CompareWithRecent
422
		}
423

424
		namespace := ref.Namespace
425
		if ref.Namespace == "" {
426
			namespace = "(cluster-scoped)"
427
		}
428
		log.WithFields(log.Fields{
429
			"application":  appKey,
430
			"level":        level,
431
			"namespace":    namespace,
432
			"name":         ref.Name,
433
			"api-version":  ref.APIVersion,
434
			"kind":         ref.Kind,
435
			"server":       app.Spec.Destination.Server,
436
			"cluster-name": app.Spec.Destination.Name,
437
		}).Debug("Requesting app refresh caused by object update")
438

439
		ctrl.requestAppRefresh(app.QualifiedName(), &level, nil)
440
	}
441
}
442

443
// setAppManagedResources will build a list of ResourceDiff based on the provided comparisonResult
444
// and persist app resources related data in the cache. Will return the persisted ApplicationTree.
445
func (ctrl *ApplicationController) setAppManagedResources(a *appv1.Application, comparisonResult *comparisonResult) (*appv1.ApplicationTree, error) {
446
	managedResources, err := ctrl.hideSecretData(a, comparisonResult)
447
	if err != nil {
448
		return nil, fmt.Errorf("error getting managed resources: %s", err)
449
	}
450
	tree, err := ctrl.getResourceTree(a, managedResources)
451
	if err != nil {
452
		return nil, fmt.Errorf("error getting resource tree: %s", err)
453
	}
454
	err = ctrl.cache.SetAppResourcesTree(a.InstanceName(ctrl.namespace), tree)
455
	if err != nil {
456
		return nil, fmt.Errorf("error setting app resource tree: %s", err)
457
	}
458
	err = ctrl.cache.SetAppManagedResources(a.InstanceName(ctrl.namespace), managedResources)
459
	if err != nil {
460
		return nil, fmt.Errorf("error setting app managed resources: %s", err)
461
	}
462
	return tree, nil
463
}
464

465
// returns true of given resources exist in the namespace by default and not managed by the user
466
func isKnownOrphanedResourceExclusion(key kube.ResourceKey, proj *appv1.AppProject) bool {
467
	if key.Namespace == "default" && key.Group == "" && key.Kind == kube.ServiceKind && key.Name == "kubernetes" {
468
		return true
469
	}
470
	if key.Group == "" && key.Kind == kube.ServiceAccountKind && key.Name == "default" {
471
		return true
472
	}
473
	if key.Group == "" && key.Kind == "ConfigMap" && key.Name == "kube-root-ca.crt" {
474
		return true
475
	}
476
	list := proj.Spec.OrphanedResources.Ignore
477
	for _, item := range list {
478
		if item.Kind == "" || glob.Match(item.Kind, key.Kind) {
479
			if glob.Match(item.Group, key.Group) {
480
				if item.Name == "" || glob.Match(item.Name, key.Name) {
481
					return true
482
				}
483
			}
484
		}
485
	}
486
	return false
487
}
488

489
func (ctrl *ApplicationController) getResourceTree(a *appv1.Application, managedResources []*appv1.ResourceDiff) (*appv1.ApplicationTree, error) {
490
	nodes := make([]appv1.ResourceNode, 0)
491
	proj, err := ctrl.getAppProj(a)
492
	if err != nil {
493
		return nil, fmt.Errorf("failed to get project: %w", err)
494
	}
495

496
	orphanedNodesMap := make(map[kube.ResourceKey]appv1.ResourceNode)
497
	warnOrphaned := true
498
	if proj.Spec.OrphanedResources != nil {
499
		orphanedNodesMap, err = ctrl.stateCache.GetNamespaceTopLevelResources(a.Spec.Destination.Server, a.Spec.Destination.Namespace)
500
		if err != nil {
501
			return nil, fmt.Errorf("failed to get namespace top-level resources: %w", err)
502
		}
503
		warnOrphaned = proj.Spec.OrphanedResources.IsWarn()
504
	}
505
	for i := range managedResources {
506
		managedResource := managedResources[i]
507
		delete(orphanedNodesMap, kube.NewResourceKey(managedResource.Group, managedResource.Kind, managedResource.Namespace, managedResource.Name))
508
		var live = &unstructured.Unstructured{}
509
		err := json.Unmarshal([]byte(managedResource.LiveState), &live)
510
		if err != nil {
511
			return nil, fmt.Errorf("failed to unmarshal live state of managed resources: %w", err)
512
		}
513

514
		if live == nil {
515
			var target = &unstructured.Unstructured{}
516
			err = json.Unmarshal([]byte(managedResource.TargetState), &target)
517
			if err != nil {
518
				return nil, fmt.Errorf("failed to unmarshal target state of managed resources: %w", err)
519
			}
520
			nodes = append(nodes, appv1.ResourceNode{
521
				ResourceRef: appv1.ResourceRef{
522
					Version:   target.GroupVersionKind().Version,
523
					Name:      managedResource.Name,
524
					Kind:      managedResource.Kind,
525
					Group:     managedResource.Group,
526
					Namespace: managedResource.Namespace,
527
				},
528
			})
529
		} else {
530
			err := ctrl.stateCache.IterateHierarchy(a.Spec.Destination.Server, kube.GetResourceKey(live), func(child appv1.ResourceNode, appName string) bool {
531
				permitted, _ := proj.IsResourcePermitted(schema.GroupKind{Group: child.ResourceRef.Group, Kind: child.ResourceRef.Kind}, child.Namespace, a.Spec.Destination, func(project string) ([]*appv1.Cluster, error) {
532
					clusters, err := ctrl.db.GetProjectClusters(context.TODO(), project)
533
					if err != nil {
534
						return nil, fmt.Errorf("failed to get project clusters: %w", err)
535
					}
536
					return clusters, nil
537
				})
538
				if !permitted {
539
					return false
540
				}
541
				nodes = append(nodes, child)
542
				return true
543
			})
544
			if err != nil {
545
				return nil, fmt.Errorf("failed to iterate resource hierarchy: %w", err)
546
			}
547
		}
548
	}
549
	orphanedNodes := make([]appv1.ResourceNode, 0)
550
	for k := range orphanedNodesMap {
551
		if k.Namespace != "" && proj.IsGroupKindPermitted(k.GroupKind(), true) && !isKnownOrphanedResourceExclusion(k, proj) {
552
			err := ctrl.stateCache.IterateHierarchy(a.Spec.Destination.Server, k, func(child appv1.ResourceNode, appName string) bool {
553
				belongToAnotherApp := false
554
				if appName != "" {
555
					appKey := ctrl.toAppKey(appName)
556
					if _, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey); exists && err == nil {
557
						belongToAnotherApp = true
558
					}
559
				}
560

561
				if belongToAnotherApp {
562
					return false
563
				}
564

565
				permitted, _ := proj.IsResourcePermitted(schema.GroupKind{Group: child.ResourceRef.Group, Kind: child.ResourceRef.Kind}, child.Namespace, a.Spec.Destination, func(project string) ([]*appv1.Cluster, error) {
566
					return ctrl.db.GetProjectClusters(context.TODO(), project)
567
				})
568

569
				if !permitted {
570
					return false
571
				}
572
				orphanedNodes = append(orphanedNodes, child)
573
				return true
574
			})
575
			if err != nil {
576
				return nil, err
577
			}
578
		}
579
	}
580
	var conditions []appv1.ApplicationCondition
581
	if len(orphanedNodes) > 0 && warnOrphaned {
582
		conditions = []appv1.ApplicationCondition{{
583
			Type:    appv1.ApplicationConditionOrphanedResourceWarning,
584
			Message: fmt.Sprintf("Application has %d orphaned resources", len(orphanedNodes)),
585
		}}
586
	}
587
	a.Status.SetConditions(conditions, map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionOrphanedResourceWarning: true})
588
	sort.Slice(orphanedNodes, func(i, j int) bool {
589
		return orphanedNodes[i].ResourceRef.String() < orphanedNodes[j].ResourceRef.String()
590
	})
591

592
	hosts, err := ctrl.getAppHosts(a, nodes)
593
	if err != nil {
594
		return nil, fmt.Errorf("failed to get app hosts: %w", err)
595
	}
596
	return &appv1.ApplicationTree{Nodes: nodes, OrphanedNodes: orphanedNodes, Hosts: hosts}, nil
597
}
598

599
func (ctrl *ApplicationController) getAppHosts(a *appv1.Application, appNodes []appv1.ResourceNode) ([]appv1.HostInfo, error) {
600
	supportedResourceNames := map[v1.ResourceName]bool{
601
		v1.ResourceCPU:     true,
602
		v1.ResourceStorage: true,
603
		v1.ResourceMemory:  true,
604
	}
605
	appPods := map[kube.ResourceKey]bool{}
606
	for _, node := range appNodes {
607
		if node.Group == "" && node.Kind == kube.PodKind {
608
			appPods[kube.NewResourceKey(node.Group, node.Kind, node.Namespace, node.Name)] = true
609
		}
610
	}
611

612
	allNodesInfo := map[string]statecache.NodeInfo{}
613
	allPodsByNode := map[string][]statecache.PodInfo{}
614
	appPodsByNode := map[string][]statecache.PodInfo{}
615
	err := ctrl.stateCache.IterateResources(a.Spec.Destination.Server, func(res *clustercache.Resource, info *statecache.ResourceInfo) {
616
		key := res.ResourceKey()
617

618
		switch {
619
		case info.NodeInfo != nil && key.Group == "" && key.Kind == "Node":
620
			allNodesInfo[key.Name] = *info.NodeInfo
621
		case info.PodInfo != nil && key.Group == "" && key.Kind == kube.PodKind:
622
			if appPods[key] {
623
				appPodsByNode[info.PodInfo.NodeName] = append(appPodsByNode[info.PodInfo.NodeName], *info.PodInfo)
624
			} else {
625
				allPodsByNode[info.PodInfo.NodeName] = append(allPodsByNode[info.PodInfo.NodeName], *info.PodInfo)
626
			}
627
		}
628
	})
629
	if err != nil {
630
		return nil, err
631
	}
632

633
	var hosts []appv1.HostInfo
634
	for nodeName, appPods := range appPodsByNode {
635
		node, ok := allNodesInfo[nodeName]
636
		if !ok {
637
			continue
638
		}
639

640
		neighbors := allPodsByNode[nodeName]
641

642
		resources := map[v1.ResourceName]appv1.HostResourceInfo{}
643
		for name, resource := range node.Capacity {
644
			info := resources[name]
645
			info.ResourceName = name
646
			info.Capacity += resource.MilliValue()
647
			resources[name] = info
648
		}
649

650
		for _, pod := range appPods {
651
			for name, resource := range pod.ResourceRequests {
652
				if !supportedResourceNames[name] {
653
					continue
654
				}
655

656
				info := resources[name]
657
				info.RequestedByApp += resource.MilliValue()
658
				resources[name] = info
659
			}
660
		}
661

662
		for _, pod := range neighbors {
663
			for name, resource := range pod.ResourceRequests {
664
				if !supportedResourceNames[name] || pod.Phase == v1.PodSucceeded || pod.Phase == v1.PodFailed {
665
					continue
666
				}
667
				info := resources[name]
668
				info.RequestedByNeighbors += resource.MilliValue()
669
				resources[name] = info
670
			}
671
		}
672

673
		var resourcesInfo []appv1.HostResourceInfo
674
		for _, info := range resources {
675
			if supportedResourceNames[info.ResourceName] && info.Capacity > 0 {
676
				resourcesInfo = append(resourcesInfo, info)
677
			}
678
		}
679
		sort.Slice(resourcesInfo, func(i, j int) bool {
680
			return resourcesInfo[i].ResourceName < resourcesInfo[j].ResourceName
681
		})
682
		hosts = append(hosts, appv1.HostInfo{Name: nodeName, SystemInfo: node.SystemInfo, ResourcesInfo: resourcesInfo})
683
	}
684
	return hosts, nil
685
}
686

687
func (ctrl *ApplicationController) hideSecretData(app *appv1.Application, comparisonResult *comparisonResult) ([]*appv1.ResourceDiff, error) {
688
	items := make([]*appv1.ResourceDiff, len(comparisonResult.managedResources))
689
	for i := range comparisonResult.managedResources {
690
		res := comparisonResult.managedResources[i]
691
		item := appv1.ResourceDiff{
692
			Namespace:       res.Namespace,
693
			Name:            res.Name,
694
			Group:           res.Group,
695
			Kind:            res.Kind,
696
			Hook:            res.Hook,
697
			ResourceVersion: res.ResourceVersion,
698
		}
699

700
		target := res.Target
701
		live := res.Live
702
		resDiff := res.Diff
703
		if res.Kind == kube.SecretKind && res.Group == "" {
704
			var err error
705
			target, live, err = diff.HideSecretData(res.Target, res.Live)
706
			if err != nil {
707
				return nil, fmt.Errorf("error hiding secret data: %s", err)
708
			}
709
			compareOptions, err := ctrl.settingsMgr.GetResourceCompareOptions()
710
			if err != nil {
711
				return nil, fmt.Errorf("error getting resource compare options: %s", err)
712
			}
713
			resourceOverrides, err := ctrl.settingsMgr.GetResourceOverrides()
714
			if err != nil {
715
				return nil, fmt.Errorf("error getting resource overrides: %s", err)
716
			}
717
			appLabelKey, err := ctrl.settingsMgr.GetAppInstanceLabelKey()
718
			if err != nil {
719
				return nil, fmt.Errorf("error getting app instance label key: %s", err)
720
			}
721
			trackingMethod, err := ctrl.settingsMgr.GetTrackingMethod()
722
			if err != nil {
723
				return nil, fmt.Errorf("error getting tracking method: %s", err)
724
			}
725

726
			clusterCache, err := ctrl.stateCache.GetClusterCache(app.Spec.Destination.Server)
727
			if err != nil {
728
				return nil, fmt.Errorf("error getting cluster cache: %s", err)
729
			}
730
			diffConfig, err := argodiff.NewDiffConfigBuilder().
731
				WithDiffSettings(app.Spec.IgnoreDifferences, resourceOverrides, compareOptions.IgnoreAggregatedRoles).
732
				WithTracking(appLabelKey, trackingMethod).
733
				WithNoCache().
734
				WithLogger(logutils.NewLogrusLogger(logutils.NewWithCurrentConfig())).
735
				WithGVKParser(clusterCache.GetGVKParser()).
736
				Build()
737
			if err != nil {
738
				return nil, fmt.Errorf("appcontroller error building diff config: %s", err)
739
			}
740

741
			diffResult, err := argodiff.StateDiff(live, target, diffConfig)
742
			if err != nil {
743
				return nil, fmt.Errorf("error applying diff: %s", err)
744
			}
745
			resDiff = diffResult
746
		}
747

748
		if live != nil {
749
			data, err := json.Marshal(live)
750
			if err != nil {
751
				return nil, fmt.Errorf("error marshaling live json: %s", err)
752
			}
753
			item.LiveState = string(data)
754
		} else {
755
			item.LiveState = "null"
756
		}
757

758
		if target != nil {
759
			data, err := json.Marshal(target)
760
			if err != nil {
761
				return nil, fmt.Errorf("error marshaling target json: %s", err)
762
			}
763
			item.TargetState = string(data)
764
		} else {
765
			item.TargetState = "null"
766
		}
767
		item.PredictedLiveState = string(resDiff.PredictedLive)
768
		item.NormalizedLiveState = string(resDiff.NormalizedLive)
769
		item.Modified = resDiff.Modified
770

771
		items[i] = &item
772
	}
773
	return items, nil
774
}
775

776
// Run starts the Application CRD controller.
777
func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
778
	defer runtime.HandleCrash()
779
	defer ctrl.appRefreshQueue.ShutDown()
780
	defer ctrl.appComparisonTypeRefreshQueue.ShutDown()
781
	defer ctrl.appOperationQueue.ShutDown()
782
	defer ctrl.projectRefreshQueue.ShutDown()
783

784
	ctrl.metricsServer.RegisterClustersInfoSource(ctx, ctrl.stateCache)
785
	ctrl.RegisterClusterSecretUpdater(ctx)
786

787
	go ctrl.appInformer.Run(ctx.Done())
788
	go ctrl.projInformer.Run(ctx.Done())
789

790
	if ctrl.dynamicClusterDistributionEnabled {
791
		// only start deployment informer if dynamic distribution is enabled
792
		go ctrl.deploymentInformer.Informer().Run(ctx.Done())
793
	}
794

795
	clusters, err := ctrl.db.ListClusters(ctx)
796
	if err != nil {
797
		log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err)
798
	} else {
799
		ctrl.clusterSharding.Init(clusters)
800
	}
801

802
	errors.CheckError(ctrl.stateCache.Init())
803

804
	if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced, ctrl.projInformer.HasSynced) {
805
		log.Error("Timed out waiting for caches to sync")
806
		return
807
	}
808

809
	go func() { errors.CheckError(ctrl.stateCache.Run(ctx)) }()
810
	go func() { errors.CheckError(ctrl.metricsServer.ListenAndServe()) }()
811

812
	for i := 0; i < statusProcessors; i++ {
813
		go wait.Until(func() {
814
			for ctrl.processAppRefreshQueueItem() {
815
			}
816
		}, time.Second, ctx.Done())
817
	}
818

819
	for i := 0; i < operationProcessors; i++ {
820
		go wait.Until(func() {
821
			for ctrl.processAppOperationQueueItem() {
822
			}
823
		}, time.Second, ctx.Done())
824
	}
825

826
	go wait.Until(func() {
827
		for ctrl.processAppComparisonTypeQueueItem() {
828
		}
829
	}, time.Second, ctx.Done())
830

831
	go wait.Until(func() {
832
		for ctrl.processProjectQueueItem() {
833
		}
834
	}, time.Second, ctx.Done())
835
	<-ctx.Done()
836
}
837

838
// requestAppRefresh adds a request for given app to the refresh queue. appName
839
// needs to be the qualified name of the application, i.e. <namespace>/<name>.
840
func (ctrl *ApplicationController) requestAppRefresh(appName string, compareWith *CompareWith, after *time.Duration) {
841
	key := ctrl.toAppKey(appName)
842

843
	if compareWith != nil && after != nil {
844
		ctrl.appComparisonTypeRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", key, compareWith), *after)
845
	} else {
846
		if compareWith != nil {
847
			ctrl.refreshRequestedAppsMutex.Lock()
848
			ctrl.refreshRequestedApps[key] = compareWith.Max(ctrl.refreshRequestedApps[key])
849
			ctrl.refreshRequestedAppsMutex.Unlock()
850
		}
851
		if after != nil {
852
			ctrl.appRefreshQueue.AddAfter(key, *after)
853
			ctrl.appOperationQueue.AddAfter(key, *after)
854
		} else {
855
			ctrl.appRefreshQueue.AddRateLimited(key)
856
			ctrl.appOperationQueue.AddRateLimited(key)
857
		}
858
	}
859
}
860

861
func (ctrl *ApplicationController) isRefreshRequested(appName string) (bool, CompareWith) {
862
	ctrl.refreshRequestedAppsMutex.Lock()
863
	defer ctrl.refreshRequestedAppsMutex.Unlock()
864
	level, ok := ctrl.refreshRequestedApps[appName]
865
	if ok {
866
		delete(ctrl.refreshRequestedApps, appName)
867
	}
868
	return ok, level
869
}
870

871
func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext bool) {
872
	appKey, shutdown := ctrl.appOperationQueue.Get()
873
	if shutdown {
874
		processNext = false
875
		return
876
	}
877
	processNext = true
878
	defer func() {
879
		if r := recover(); r != nil {
880
			log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
881
		}
882
		ctrl.appOperationQueue.Done(appKey)
883
	}()
884

885
	obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
886
	if err != nil {
887
		log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
888
		return
889
	}
890
	if !exists {
891
		// This happens after app was deleted, but the work queue still had an entry for it.
892
		return
893
	}
894
	origApp, ok := obj.(*appv1.Application)
895
	if !ok {
896
		log.Warnf("Key '%s' in index is not an application", appKey)
897
		return
898
	}
899
	app := origApp.DeepCopy()
900

901
	if app.Operation != nil {
902
		// If we get here, we are about to process an operation, but we cannot rely on informer since it might have stale data.
903
		// So always retrieve the latest version to ensure it is not stale to avoid unnecessary syncing.
904
		// We cannot rely on informer since applications might be updated by both application controller and api server.
905
		freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
906
		if err != nil {
907
			log.Errorf("Failed to retrieve latest application state: %v", err)
908
			return
909
		}
910
		app = freshApp
911
	}
912

913
	if app.Operation != nil {
914
		ctrl.processRequestedAppOperation(app)
915
	} else if app.DeletionTimestamp != nil {
916
		if err = ctrl.finalizeApplicationDeletion(app, func(project string) ([]*appv1.Cluster, error) {
917
			return ctrl.db.GetProjectClusters(context.Background(), project)
918
		}); err != nil {
919
			ctrl.setAppCondition(app, appv1.ApplicationCondition{
920
				Type:    appv1.ApplicationConditionDeletionError,
921
				Message: err.Error(),
922
			})
923
			message := fmt.Sprintf("Unable to delete application resources: %v", err.Error())
924
			ctrl.auditLogger.LogAppEvent(app, argo.EventInfo{Reason: argo.EventReasonStatusRefreshed, Type: v1.EventTypeWarning}, message, "")
925
		}
926
	}
927
	return
928
}
929

930
func (ctrl *ApplicationController) processAppComparisonTypeQueueItem() (processNext bool) {
931
	key, shutdown := ctrl.appComparisonTypeRefreshQueue.Get()
932
	processNext = true
933

934
	defer func() {
935
		if r := recover(); r != nil {
936
			log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
937
		}
938
		ctrl.appComparisonTypeRefreshQueue.Done(key)
939
	}()
940
	if shutdown {
941
		processNext = false
942
		return
943
	}
944

945
	if parts := strings.Split(key.(string), "/"); len(parts) != 3 {
946
		log.Warnf("Unexpected key format in appComparisonTypeRefreshTypeQueue. Key should consists of namespace/name/comparisonType but got: %s", key.(string))
947
	} else {
948
		if compareWith, err := strconv.Atoi(parts[2]); err != nil {
949
			log.Warnf("Unable to parse comparison type: %v", err)
950
			return
951
		} else {
952
			ctrl.requestAppRefresh(ctrl.toAppQualifiedName(parts[1], parts[0]), CompareWith(compareWith).Pointer(), nil)
953
		}
954
	}
955
	return
956
}
957

958
func (ctrl *ApplicationController) processProjectQueueItem() (processNext bool) {
959
	key, shutdown := ctrl.projectRefreshQueue.Get()
960
	processNext = true
961

962
	defer func() {
963
		if r := recover(); r != nil {
964
			log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
965
		}
966
		ctrl.projectRefreshQueue.Done(key)
967
	}()
968
	if shutdown {
969
		processNext = false
970
		return
971
	}
972
	obj, exists, err := ctrl.projInformer.GetIndexer().GetByKey(key.(string))
973
	if err != nil {
974
		log.Errorf("Failed to get project '%s' from informer index: %+v", key, err)
975
		return
976
	}
977
	if !exists {
978
		// This happens after appproj was deleted, but the work queue still had an entry for it.
979
		return
980
	}
981
	origProj, ok := obj.(*appv1.AppProject)
982
	if !ok {
983
		log.Warnf("Key '%s' in index is not an appproject", key)
984
		return
985
	}
986

987
	if origProj.DeletionTimestamp != nil && origProj.HasFinalizer() {
988
		if err := ctrl.finalizeProjectDeletion(origProj.DeepCopy()); err != nil {
989
			log.Warnf("Failed to finalize project deletion: %v", err)
990
		}
991
	}
992
	return
993
}
994

995
func (ctrl *ApplicationController) finalizeProjectDeletion(proj *appv1.AppProject) error {
996
	apps, err := ctrl.appLister.Applications(ctrl.namespace).List(labels.Everything())
997
	if err != nil {
998
		return fmt.Errorf("error listing applications: %w", err)
999
	}
1000
	appsCount := 0
1001
	for i := range apps {
1002
		if apps[i].Spec.GetProject() == proj.Name {
1003
			appsCount++
1004
		}
1005
	}
1006
	if appsCount == 0 {
1007
		return ctrl.removeProjectFinalizer(proj)
1008
	} else {
1009
		log.Infof("Cannot remove project '%s' finalizer as is referenced by %d applications", proj.Name, appsCount)
1010
	}
1011
	return nil
1012
}
1013

1014
func (ctrl *ApplicationController) removeProjectFinalizer(proj *appv1.AppProject) error {
1015
	proj.RemoveFinalizer()
1016
	var patch []byte
1017
	patch, _ = json.Marshal(map[string]interface{}{
1018
		"metadata": map[string]interface{}{
1019
			"finalizers": proj.Finalizers,
1020
		},
1021
	})
1022
	_, err := ctrl.applicationClientset.ArgoprojV1alpha1().AppProjects(ctrl.namespace).Patch(context.Background(), proj.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1023
	return err
1024
}
1025

1026
// shouldBeDeleted returns whether a given resource obj should be deleted on cascade delete of application app
1027
func (ctrl *ApplicationController) shouldBeDeleted(app *appv1.Application, obj *unstructured.Unstructured) bool {
1028
	return !kube.IsCRD(obj) && !isSelfReferencedApp(app, kube.GetObjectRef(obj)) &&
1029
		!resourceutil.HasAnnotationOption(obj, synccommon.AnnotationSyncOptions, synccommon.SyncOptionDisableDeletion) &&
1030
		!resourceutil.HasAnnotationOption(obj, helm.ResourcePolicyAnnotation, helm.ResourcePolicyKeep)
1031
}
1032

1033
func (ctrl *ApplicationController) getPermittedAppLiveObjects(app *appv1.Application, proj *appv1.AppProject, projectClusters func(project string) ([]*appv1.Cluster, error)) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
1034
	objsMap, err := ctrl.stateCache.GetManagedLiveObjs(app, []*unstructured.Unstructured{})
1035
	if err != nil {
1036
		return nil, err
1037
	}
1038
	// Don't delete live resources which are not permitted in the app project
1039
	for k, v := range objsMap {
1040
		permitted, err := proj.IsLiveResourcePermitted(v, app.Spec.Destination.Server, app.Spec.Destination.Name, projectClusters)
1041

1042
		if err != nil {
1043
			return nil, err
1044
		}
1045

1046
		if !permitted {
1047
			delete(objsMap, k)
1048
		}
1049
	}
1050
	return objsMap, nil
1051
}
1052

1053
func (ctrl *ApplicationController) isValidDestination(app *appv1.Application) (bool, *appv1.Cluster) {
1054
	// Validate the cluster using the Application destination's `name` field, if applicable,
1055
	// and set the Server field, if needed.
1056
	if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
1057
		log.Warnf("Unable to validate destination of the Application being deleted: %v", err)
1058
		return false, nil
1059
	}
1060

1061
	cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
1062
	if err != nil {
1063
		log.Warnf("Unable to locate cluster URL for Application being deleted: %v", err)
1064
		return false, nil
1065
	}
1066
	return true, cluster
1067
}
1068

1069
func (ctrl *ApplicationController) finalizeApplicationDeletion(app *appv1.Application, projectClusters func(project string) ([]*appv1.Cluster, error)) error {
1070
	logCtx := log.WithField("application", app.QualifiedName())
1071
	// Get refreshed application info, since informer app copy might be stale
1072
	app, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(context.Background(), app.Name, metav1.GetOptions{})
1073
	if err != nil {
1074
		if !apierr.IsNotFound(err) {
1075
			logCtx.Errorf("Unable to get refreshed application info prior deleting resources: %v", err)
1076
		}
1077
		return nil
1078
	}
1079
	proj, err := ctrl.getAppProj(app)
1080
	if err != nil {
1081
		return err
1082
	}
1083

1084
	isValid, cluster := ctrl.isValidDestination(app)
1085
	if !isValid {
1086
		app.UnSetCascadedDeletion()
1087
		app.UnSetPostDeleteFinalizer()
1088
		if err := ctrl.updateFinalizers(app); err != nil {
1089
			return err
1090
		}
1091
		logCtx.Infof("Resource entries removed from undefined cluster")
1092
		return nil
1093
	}
1094
	config := metrics.AddMetricsTransportWrapper(ctrl.metricsServer, app, cluster.RESTConfig())
1095

1096
	if app.CascadedDeletion() {
1097
		logCtx.Infof("Deleting resources")
1098
		// ApplicationDestination points to a valid cluster, so we may clean up the live objects
1099
		objs := make([]*unstructured.Unstructured, 0)
1100
		objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1101
		if err != nil {
1102
			return err
1103
		}
1104

1105
		for k := range objsMap {
1106
			// Wait for objects pending deletion to complete before proceeding with next sync wave
1107
			if objsMap[k].GetDeletionTimestamp() != nil {
1108
				logCtx.Infof("%d objects remaining for deletion", len(objsMap))
1109
				return nil
1110
			}
1111

1112
			if ctrl.shouldBeDeleted(app, objsMap[k]) {
1113
				objs = append(objs, objsMap[k])
1114
			}
1115
		}
1116

1117
		filteredObjs := FilterObjectsForDeletion(objs)
1118

1119
		propagationPolicy := metav1.DeletePropagationForeground
1120
		if app.GetPropagationPolicy() == appv1.BackgroundPropagationPolicyFinalizer {
1121
			propagationPolicy = metav1.DeletePropagationBackground
1122
		}
1123
		logCtx.Infof("Deleting application's resources with %s propagation policy", propagationPolicy)
1124

1125
		err = kube.RunAllAsync(len(filteredObjs), func(i int) error {
1126
			obj := filteredObjs[i]
1127
			return ctrl.kubectl.DeleteResource(context.Background(), config, obj.GroupVersionKind(), obj.GetName(), obj.GetNamespace(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy})
1128
		})
1129
		if err != nil {
1130
			return err
1131
		}
1132

1133
		objsMap, err = ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1134
		if err != nil {
1135
			return err
1136
		}
1137

1138
		for k, obj := range objsMap {
1139
			if !ctrl.shouldBeDeleted(app, obj) {
1140
				delete(objsMap, k)
1141
			}
1142
		}
1143
		if len(objsMap) > 0 {
1144
			logCtx.Infof("%d objects remaining for deletion", len(objsMap))
1145
			return nil
1146
		}
1147
		logCtx.Infof("Successfully deleted %d resources", len(objs))
1148
		app.UnSetCascadedDeletion()
1149
		return ctrl.updateFinalizers(app)
1150
	}
1151

1152
	if app.HasPostDeleteFinalizer() {
1153
		objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1154
		if err != nil {
1155
			return err
1156
		}
1157

1158
		done, err := ctrl.executePostDeleteHooks(app, proj, objsMap, config, logCtx)
1159
		if err != nil {
1160
			return err
1161
		}
1162
		if !done {
1163
			return nil
1164
		}
1165
		app.UnSetPostDeleteFinalizer()
1166
		return ctrl.updateFinalizers(app)
1167
	}
1168

1169
	if app.HasPostDeleteFinalizer("cleanup") {
1170
		objsMap, err := ctrl.getPermittedAppLiveObjects(app, proj, projectClusters)
1171
		if err != nil {
1172
			return err
1173
		}
1174

1175
		done, err := ctrl.cleanupPostDeleteHooks(objsMap, config, logCtx)
1176
		if err != nil {
1177
			return err
1178
		}
1179
		if !done {
1180
			return nil
1181
		}
1182
		app.UnSetPostDeleteFinalizer("cleanup")
1183
		return ctrl.updateFinalizers(app)
1184
	}
1185

1186
	if !app.CascadedDeletion() && !app.HasPostDeleteFinalizer() {
1187
		if err := ctrl.cache.SetAppManagedResources(app.Name, nil); err != nil {
1188
			return err
1189
		}
1190

1191
		if err := ctrl.cache.SetAppResourcesTree(app.Name, nil); err != nil {
1192
			return err
1193
		}
1194
		ctrl.projectRefreshQueue.Add(fmt.Sprintf("%s/%s", ctrl.namespace, app.Spec.GetProject()))
1195
	}
1196

1197
	return nil
1198
}
1199

1200
func (ctrl *ApplicationController) updateFinalizers(app *appv1.Application) error {
1201
	_, err := ctrl.getAppProj(app)
1202
	if err != nil {
1203
		return fmt.Errorf("error getting project: %w", err)
1204
	}
1205

1206
	var patch []byte
1207
	patch, _ = json.Marshal(map[string]interface{}{
1208
		"metadata": map[string]interface{}{
1209
			"finalizers": app.Finalizers,
1210
		},
1211
	})
1212

1213
	_, err = ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1214
	return err
1215
}
1216

1217
func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condition appv1.ApplicationCondition) {
1218
	// do nothing if app already has same condition
1219
	for _, c := range app.Status.Conditions {
1220
		if c.Message == condition.Message && c.Type == condition.Type {
1221
			return
1222
		}
1223
	}
1224

1225
	app.Status.SetConditions([]appv1.ApplicationCondition{condition}, map[appv1.ApplicationConditionType]bool{condition.Type: true})
1226

1227
	var patch []byte
1228
	patch, err := json.Marshal(map[string]interface{}{
1229
		"status": map[string]interface{}{
1230
			"conditions": app.Status.Conditions,
1231
		},
1232
	})
1233
	if err == nil {
1234
		_, err = ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
1235
	}
1236
	if err != nil {
1237
		log.Errorf("Unable to set application condition: %v", err)
1238
	}
1239
}
1240

1241
func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
1242
	logCtx := log.WithField("application", app.QualifiedName())
1243
	var state *appv1.OperationState
1244
	// Recover from any unexpected panics and automatically set the status to be failed
1245
	defer func() {
1246
		if r := recover(); r != nil {
1247
			logCtx.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
1248
			state.Phase = synccommon.OperationError
1249
			if rerr, ok := r.(error); ok {
1250
				state.Message = rerr.Error()
1251
			} else {
1252
				state.Message = fmt.Sprintf("%v", r)
1253
			}
1254
			ctrl.setOperationState(app, state)
1255
		}
1256
	}()
1257
	terminating := false
1258
	if isOperationInProgress(app) {
1259
		state = app.Status.OperationState.DeepCopy()
1260
		terminating = state.Phase == synccommon.OperationTerminating
1261
		// Failed  operation with retry strategy might have be in-progress and has completion time
1262
		if state.FinishedAt != nil && !terminating {
1263
			retryAt, err := app.Status.OperationState.Operation.Retry.NextRetryAt(state.FinishedAt.Time, state.RetryCount)
1264
			if err != nil {
1265
				state.Phase = synccommon.OperationFailed
1266
				state.Message = err.Error()
1267
				ctrl.setOperationState(app, state)
1268
				return
1269
			}
1270
			retryAfter := time.Until(retryAt)
1271
			if retryAfter > 0 {
1272
				logCtx.Infof("Skipping retrying in-progress operation. Attempting again at: %s", retryAt.Format(time.RFC3339))
1273
				ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), &retryAfter)
1274
				return
1275
			} else {
1276
				// retrying operation. remove previous failure time in app since it is used as a trigger
1277
				// that previous failed and operation should be retried
1278
				state.FinishedAt = nil
1279
				ctrl.setOperationState(app, state)
1280
				// Get rid of sync results and null out previous operation completion time
1281
				state.SyncResult = nil
1282
			}
1283
		} else {
1284
			logCtx.Infof("Resuming in-progress operation. phase: %s, message: %s", state.Phase, state.Message)
1285
		}
1286
	} else {
1287
		state = &appv1.OperationState{Phase: synccommon.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
1288
		ctrl.setOperationState(app, state)
1289
		logCtx.Infof("Initialized new operation: %v", *app.Operation)
1290
	}
1291

1292
	if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
1293
		state.Phase = synccommon.OperationFailed
1294
		state.Message = err.Error()
1295
	} else {
1296
		ctrl.appStateManager.SyncAppState(app, state)
1297
	}
1298

1299
	// Check whether application is allowed to use project
1300
	_, err := ctrl.getAppProj(app)
1301
	if err != nil {
1302
		state.Phase = synccommon.OperationError
1303
		state.Message = err.Error()
1304
	}
1305

1306
	if state.Phase == synccommon.OperationRunning {
1307
		// It's possible for an app to be terminated while we were operating on it. We do not want
1308
		// to clobber the Terminated state with Running. Get the latest app state to check for this.
1309
		freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
1310
		if err == nil {
1311
			// App may have lost permissions to use the project meanwhile.
1312
			_, err = ctrl.getAppProj(freshApp)
1313
			if err != nil {
1314
				state.Phase = synccommon.OperationFailed
1315
				state.Message = fmt.Sprintf("operation not allowed: %v", err)
1316
			}
1317
			if freshApp.Status.OperationState != nil && freshApp.Status.OperationState.Phase == synccommon.OperationTerminating {
1318
				state.Phase = synccommon.OperationTerminating
1319
				state.Message = "operation is terminating"
1320
				// after this, we will get requeued to the workqueue, but next time the
1321
				// SyncAppState will operate in a Terminating phase, allowing the worker to perform
1322
				// cleanup (e.g. delete jobs, workflows, etc...)
1323
			}
1324
		}
1325
	} else if state.Phase == synccommon.OperationFailed || state.Phase == synccommon.OperationError {
1326
		if !terminating && (state.RetryCount < state.Operation.Retry.Limit || state.Operation.Retry.Limit < 0) {
1327
			now := metav1.Now()
1328
			state.FinishedAt = &now
1329
			if retryAt, err := state.Operation.Retry.NextRetryAt(now.Time, state.RetryCount); err != nil {
1330
				state.Phase = synccommon.OperationFailed
1331
				state.Message = fmt.Sprintf("%s (failed to retry: %v)", state.Message, err)
1332
			} else {
1333
				state.Phase = synccommon.OperationRunning
1334
				state.RetryCount++
1335
				state.Message = fmt.Sprintf("%s. Retrying attempt #%d at %s.", state.Message, state.RetryCount, retryAt.Format(time.Kitchen))
1336
			}
1337
		} else if state.RetryCount > 0 {
1338
			state.Message = fmt.Sprintf("%s (retried %d times).", state.Message, state.RetryCount)
1339
		}
1340

1341
	}
1342

1343
	ctrl.setOperationState(app, state)
1344
	if state.Phase.Completed() && (app.Operation.Sync != nil && !app.Operation.Sync.DryRun) {
1345
		// if we just completed an operation, force a refresh so that UI will report up-to-date
1346
		// sync/health information
1347
		if _, err := cache.MetaNamespaceKeyFunc(app); err == nil {
1348
			// force app refresh with using CompareWithLatest comparison type and trigger app reconciliation loop
1349
			ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), nil)
1350
		} else {
1351
			logCtx.Warnf("Fails to requeue application: %v", err)
1352
		}
1353
	}
1354
}
1355

1356
func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState) {
1357
	logCtx := log.WithFields(log.Fields{"application": app.Name, "appNamespace": app.Namespace, "project": app.Spec.Project})
1358

1359
	if state.Phase == "" {
1360
		// expose any bugs where we neglect to set phase
1361
		panic("no phase was set")
1362
	}
1363
	if state.Phase.Completed() {
1364
		now := metav1.Now()
1365
		state.FinishedAt = &now
1366
	}
1367
	patch := map[string]interface{}{
1368
		"status": map[string]interface{}{
1369
			"operationState": state,
1370
		},
1371
	}
1372
	if state.Phase.Completed() {
1373
		// If operation is completed, clear the operation field to indicate no operation is
1374
		// in progress.
1375
		patch["operation"] = nil
1376
	}
1377
	if reflect.DeepEqual(app.Status.OperationState, state) {
1378
		logCtx.Infof("No operation updates necessary to '%s'. Skipping patch", app.QualifiedName())
1379
		return
1380
	}
1381
	patchJSON, err := json.Marshal(patch)
1382
	if err != nil {
1383
		logCtx.Errorf("error marshaling json: %v", err)
1384
		return
1385
	}
1386
	if app.Status.OperationState != nil && app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
1387
		patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
1388
		if err != nil {
1389
			logCtx.Errorf("error merging operation state patch: %v", err)
1390
			return
1391
		}
1392
	}
1393

1394
	kube.RetryUntilSucceed(context.Background(), updateOperationStateTimeout, "Update application operation state", logutils.NewLogrusLogger(logutils.NewWithCurrentConfig()), func() error {
1395
		_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patchJSON, metav1.PatchOptions{})
1396
		if err != nil {
1397
			// Stop retrying updating deleted application
1398
			if apierr.IsNotFound(err) {
1399
				return nil
1400
			}
1401
			// kube.RetryUntilSucceed logs failed attempts at "debug" level, but we want to know if this fails. Log a
1402
			// warning.
1403
			logCtx.Warnf("error patching application with operation state: %v", err)
1404
			return fmt.Errorf("error patching application with operation state: %w", err)
1405
		}
1406
		return nil
1407
	})
1408

1409
	logCtx.Infof("updated '%s' operation (phase: %s)", app.QualifiedName(), state.Phase)
1410
	if state.Phase.Completed() {
1411
		eventInfo := argo.EventInfo{Reason: argo.EventReasonOperationCompleted}
1412
		var messages []string
1413
		if state.Operation.Sync != nil && len(state.Operation.Sync.Resources) > 0 {
1414
			messages = []string{"Partial sync operation"}
1415
		} else {
1416
			messages = []string{"Sync operation"}
1417
		}
1418
		if state.SyncResult != nil {
1419
			messages = append(messages, "to", state.SyncResult.Revision)
1420
		}
1421
		if state.Phase.Successful() {
1422
			eventInfo.Type = v1.EventTypeNormal
1423
			messages = append(messages, "succeeded")
1424
		} else {
1425
			eventInfo.Type = v1.EventTypeWarning
1426
			messages = append(messages, "failed:", state.Message)
1427
		}
1428
		ctrl.auditLogger.LogAppEvent(app, eventInfo, strings.Join(messages, " "), "")
1429
		ctrl.metricsServer.IncSync(app, state)
1430
	}
1431
}
1432

1433
// writeBackToInformer writes a just recently updated App back into the informer cache.
1434
// This prevents the situation where the controller operates on a stale app and repeats work
1435
func (ctrl *ApplicationController) writeBackToInformer(app *appv1.Application) {
1436
	logCtx := log.WithFields(log.Fields{"application": app.Name, "appNamespace": app.Namespace, "project": app.Spec.Project, "informer-writeBack": true})
1437
	err := ctrl.appInformer.GetStore().Update(app)
1438
	if err != nil {
1439
		logCtx.Errorf("failed to update informer store: %v", err)
1440
		return
1441
	}
1442
}
1443

1444
// PatchAppWithWriteBack patches an application and writes it back to the informer cache
1445
func (ctrl *ApplicationController) PatchAppWithWriteBack(ctx context.Context, name, ns string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *appv1.Application, err error) {
1446
	patchedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ns).Patch(ctx, name, pt, data, opts, subresources...)
1447
	if err != nil {
1448
		return patchedApp, err
1449
	}
1450
	ctrl.writeBackToInformer(patchedApp)
1451
	return patchedApp, err
1452
}
1453

1454
func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
1455
	patchMs := time.Duration(0) // time spent in doing patch/update calls
1456
	setOpMs := time.Duration(0) // time spent in doing Operation patch calls in autosync
1457
	appKey, shutdown := ctrl.appRefreshQueue.Get()
1458
	if shutdown {
1459
		processNext = false
1460
		return
1461
	}
1462
	processNext = true
1463
	defer func() {
1464
		if r := recover(); r != nil {
1465
			log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
1466
		}
1467
		ctrl.appRefreshQueue.Done(appKey)
1468
	}()
1469
	obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
1470
	if err != nil {
1471
		log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
1472
		return
1473
	}
1474
	if !exists {
1475
		// This happens after app was deleted, but the work queue still had an entry for it.
1476
		return
1477
	}
1478
	origApp, ok := obj.(*appv1.Application)
1479
	if !ok {
1480
		log.Warnf("Key '%s' in index is not an application", appKey)
1481
		return
1482
	}
1483
	origApp = origApp.DeepCopy()
1484
	needRefresh, refreshType, comparisonLevel := ctrl.needRefreshAppStatus(origApp, ctrl.statusRefreshTimeout, ctrl.statusHardRefreshTimeout)
1485

1486
	if !needRefresh {
1487
		return
1488
	}
1489
	app := origApp.DeepCopy()
1490
	logCtx := log.WithFields(log.Fields{
1491
		"application":    app.QualifiedName(),
1492
		"level":          comparisonLevel,
1493
		"dest-server":    origApp.Spec.Destination.Server,
1494
		"dest-name":      origApp.Spec.Destination.Name,
1495
		"dest-namespace": origApp.Spec.Destination.Namespace,
1496
	})
1497

1498
	startTime := time.Now()
1499
	defer func() {
1500
		reconcileDuration := time.Since(startTime)
1501
		ctrl.metricsServer.IncReconcile(origApp, reconcileDuration)
1502
		logCtx.WithFields(log.Fields{
1503
			"time_ms":  reconcileDuration.Milliseconds(),
1504
			"patch_ms": patchMs.Milliseconds(),
1505
			"setop_ms": setOpMs.Milliseconds(),
1506
		}).Info("Reconciliation completed")
1507
	}()
1508

1509
	if comparisonLevel == ComparisonWithNothing {
1510
		managedResources := make([]*appv1.ResourceDiff, 0)
1511
		if err := ctrl.cache.GetAppManagedResources(app.InstanceName(ctrl.namespace), &managedResources); err != nil {
1512
			logCtx.Warnf("Failed to get cached managed resources for tree reconciliation, fall back to full reconciliation")
1513
		} else {
1514
			var tree *appv1.ApplicationTree
1515
			if tree, err = ctrl.getResourceTree(app, managedResources); err == nil {
1516
				app.Status.Summary = tree.GetSummary(app)
1517
				if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), tree); err != nil {
1518
					logCtx.Errorf("Failed to cache resources tree: %v", err)
1519
					return
1520
				}
1521
			}
1522

1523
			patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1524
			return
1525
		}
1526
	}
1527

1528
	project, hasErrors := ctrl.refreshAppConditions(app)
1529
	if hasErrors {
1530
		app.Status.Sync.Status = appv1.SyncStatusCodeUnknown
1531
		app.Status.Health.Status = health.HealthStatusUnknown
1532
		patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1533

1534
		if err := ctrl.cache.SetAppResourcesTree(app.InstanceName(ctrl.namespace), &appv1.ApplicationTree{}); err != nil {
1535
			log.Warnf("failed to set app resource tree: %v", err)
1536
		}
1537
		if err := ctrl.cache.SetAppManagedResources(app.InstanceName(ctrl.namespace), nil); err != nil {
1538
			log.Warnf("failed to set app managed resources tree: %v", err)
1539
		}
1540
		return
1541
	}
1542

1543
	var localManifests []string
1544
	if opState := app.Status.OperationState; opState != nil && opState.Operation.Sync != nil {
1545
		localManifests = opState.Operation.Sync.Manifests
1546
	}
1547

1548
	revisions := make([]string, 0)
1549
	sources := make([]appv1.ApplicationSource, 0)
1550

1551
	hasMultipleSources := app.Spec.HasMultipleSources()
1552

1553
	// If we have multiple sources, we use all the sources under `sources` field and ignore source under `source` field.
1554
	// else we use the source under the source field.
1555
	if hasMultipleSources {
1556
		for _, source := range app.Spec.Sources {
1557
			// We do not perform any filtering of duplicate sources.
1558
			// Argo CD will apply and update the resources generated from the sources automatically
1559
			// based on the order in which manifests were generated
1560
			sources = append(sources, source)
1561
			revisions = append(revisions, source.TargetRevision)
1562
		}
1563
		if comparisonLevel == CompareWithRecent {
1564
			revisions = app.Status.Sync.Revisions
1565
		}
1566
	} else {
1567
		revision := app.Spec.GetSource().TargetRevision
1568
		if comparisonLevel == CompareWithRecent {
1569
			revision = app.Status.Sync.Revision
1570
		}
1571
		revisions = append(revisions, revision)
1572
		sources = append(sources, app.Spec.GetSource())
1573
	}
1574
	now := metav1.Now()
1575

1576
	compareResult, err := ctrl.appStateManager.CompareAppState(app, project, revisions, sources,
1577
		refreshType == appv1.RefreshTypeHard,
1578
		comparisonLevel == CompareWithLatestForceResolve, localManifests, hasMultipleSources)
1579

1580
	if goerrors.Is(err, CompareStateRepoError) {
1581
		logCtx.Warnf("Ignoring temporary failed attempt to compare app state against repo: %v", err)
1582
		return // short circuit if git error is encountered
1583
	}
1584

1585
	for k, v := range compareResult.timings {
1586
		logCtx = logCtx.WithField(k, v.Milliseconds())
1587
	}
1588

1589
	ctrl.normalizeApplication(origApp, app)
1590

1591
	tree, err := ctrl.setAppManagedResources(app, compareResult)
1592
	if err != nil {
1593
		logCtx.Errorf("Failed to cache app resources: %v", err)
1594
	} else {
1595
		app.Status.Summary = tree.GetSummary(app)
1596
	}
1597

1598
	if project.Spec.SyncWindows.Matches(app).CanSync(false) {
1599
		syncErrCond, opMS := ctrl.autoSync(app, compareResult.syncStatus, compareResult.resources)
1600
		setOpMs = opMS
1601
		if syncErrCond != nil {
1602
			app.Status.SetConditions(
1603
				[]appv1.ApplicationCondition{*syncErrCond},
1604
				map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
1605
			)
1606
		} else {
1607
			app.Status.SetConditions(
1608
				[]appv1.ApplicationCondition{},
1609
				map[appv1.ApplicationConditionType]bool{appv1.ApplicationConditionSyncError: true},
1610
			)
1611
		}
1612
	} else {
1613
		logCtx.Info("Sync prevented by sync window")
1614
	}
1615

1616
	if app.Status.ReconciledAt == nil || comparisonLevel >= CompareWithLatest {
1617
		app.Status.ReconciledAt = &now
1618
	}
1619
	app.Status.Sync = *compareResult.syncStatus
1620
	app.Status.Health = *compareResult.healthStatus
1621
	app.Status.Resources = compareResult.resources
1622
	sort.Slice(app.Status.Resources, func(i, j int) bool {
1623
		return resourceStatusKey(app.Status.Resources[i]) < resourceStatusKey(app.Status.Resources[j])
1624
	})
1625
	app.Status.SourceType = compareResult.appSourceType
1626
	app.Status.SourceTypes = compareResult.appSourceTypes
1627
	app.Status.ControllerNamespace = ctrl.namespace
1628
	patchMs = ctrl.persistAppStatus(origApp, &app.Status)
1629
	if (compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer() || compareResult.hasPostDeleteHooks != app.HasPostDeleteFinalizer("cleanup")) &&
1630
		app.GetDeletionTimestamp() == nil {
1631
		if compareResult.hasPostDeleteHooks {
1632
			app.SetPostDeleteFinalizer()
1633
			app.SetPostDeleteFinalizer("cleanup")
1634
		} else {
1635
			app.UnSetPostDeleteFinalizer()
1636
			app.UnSetPostDeleteFinalizer("cleanup")
1637
		}
1638

1639
		if err := ctrl.updateFinalizers(app); err != nil {
1640
			logCtx.Errorf("Failed to update finalizers: %v", err)
1641
		}
1642
	}
1643
	return
1644
}
1645

1646
func resourceStatusKey(res appv1.ResourceStatus) string {
1647
	return strings.Join([]string{res.Group, res.Kind, res.Namespace, res.Name}, "/")
1648
}
1649

1650
func currentSourceEqualsSyncedSource(app *appv1.Application) bool {
1651
	if app.Spec.HasMultipleSources() {
1652
		return app.Spec.Sources.Equals(app.Status.Sync.ComparedTo.Sources)
1653
	}
1654
	return app.Spec.Source.Equals(&app.Status.Sync.ComparedTo.Source)
1655
}
1656

1657
// needRefreshAppStatus answers if application status needs to be refreshed.
1658
// Returns true if application never been compared, has changed or comparison result has expired.
1659
// Additionally, it returns whether full refresh was requested or not.
1660
// If full refresh is requested then target and live state should be reconciled, else only live state tree should be updated.
1661
func (ctrl *ApplicationController) needRefreshAppStatus(app *appv1.Application, statusRefreshTimeout, statusHardRefreshTimeout time.Duration) (bool, appv1.RefreshType, CompareWith) {
1662
	logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1663
	var reason string
1664
	compareWith := CompareWithLatest
1665
	refreshType := appv1.RefreshTypeNormal
1666

1667
	softExpired := app.Status.ReconciledAt == nil || app.Status.ReconciledAt.Add(statusRefreshTimeout).Before(time.Now().UTC())
1668
	hardExpired := (app.Status.ReconciledAt == nil || app.Status.ReconciledAt.Add(statusHardRefreshTimeout).Before(time.Now().UTC())) && statusHardRefreshTimeout.Seconds() != 0
1669

1670
	if requestedType, ok := app.IsRefreshRequested(); ok {
1671
		compareWith = CompareWithLatestForceResolve
1672
		// user requested app refresh.
1673
		refreshType = requestedType
1674
		reason = fmt.Sprintf("%s refresh requested", refreshType)
1675
	} else {
1676
		if !currentSourceEqualsSyncedSource(app) {
1677
			reason = "spec.source differs"
1678
			compareWith = CompareWithLatestForceResolve
1679
			if app.Spec.HasMultipleSources() {
1680
				reason = "at least one of the spec.sources differs"
1681
			}
1682
		} else if hardExpired || softExpired {
1683
			// The commented line below mysteriously crashes if app.Status.ReconciledAt is nil
1684
			// reason = fmt.Sprintf("comparison expired. reconciledAt: %v, expiry: %v", app.Status.ReconciledAt, statusRefreshTimeout)
1685
			// TODO: find existing Golang bug or create a new one
1686
			reconciledAtStr := "never"
1687
			if app.Status.ReconciledAt != nil {
1688
				reconciledAtStr = app.Status.ReconciledAt.String()
1689
			}
1690
			reason = fmt.Sprintf("comparison expired, requesting refresh. reconciledAt: %v, expiry: %v", reconciledAtStr, statusRefreshTimeout)
1691
			if hardExpired {
1692
				reason = fmt.Sprintf("comparison expired, requesting hard refresh. reconciledAt: %v, expiry: %v", reconciledAtStr, statusHardRefreshTimeout)
1693
				refreshType = appv1.RefreshTypeHard
1694
			}
1695
		} else if !app.Spec.Destination.Equals(app.Status.Sync.ComparedTo.Destination) {
1696
			reason = "spec.destination differs"
1697
		} else if app.HasChangedManagedNamespaceMetadata() {
1698
			reason = "spec.syncPolicy.managedNamespaceMetadata differs"
1699
		} else if !app.Spec.IgnoreDifferences.Equals(app.Status.Sync.ComparedTo.IgnoreDifferences) {
1700
			reason = "spec.ignoreDifferences differs"
1701
		} else if requested, level := ctrl.isRefreshRequested(app.QualifiedName()); requested {
1702
			compareWith = level
1703
			reason = "controller refresh requested"
1704
		}
1705
	}
1706

1707
	if reason != "" {
1708
		logCtx.Infof("Refreshing app status (%s), level (%d)", reason, compareWith)
1709
		return true, refreshType, compareWith
1710
	}
1711
	return false, refreshType, compareWith
1712
}
1713

1714
func (ctrl *ApplicationController) refreshAppConditions(app *appv1.Application) (*appv1.AppProject, bool) {
1715
	errorConditions := make([]appv1.ApplicationCondition, 0)
1716
	proj, err := ctrl.getAppProj(app)
1717
	if err != nil {
1718
		errorConditions = append(errorConditions, ctrl.projectErrorToCondition(err, app))
1719
	} else {
1720
		specConditions, err := argo.ValidatePermissions(context.Background(), &app.Spec, proj, ctrl.db)
1721
		if err != nil {
1722
			errorConditions = append(errorConditions, appv1.ApplicationCondition{
1723
				Type:    appv1.ApplicationConditionUnknownError,
1724
				Message: err.Error(),
1725
			})
1726
		} else {
1727
			errorConditions = append(errorConditions, specConditions...)
1728
		}
1729
	}
1730
	app.Status.SetConditions(errorConditions, map[appv1.ApplicationConditionType]bool{
1731
		appv1.ApplicationConditionInvalidSpecError: true,
1732
		appv1.ApplicationConditionUnknownError:     true,
1733
	})
1734
	return proj, len(errorConditions) > 0
1735
}
1736

1737
// normalizeApplication normalizes an application.spec and additionally persists updates if it changed
1738
func (ctrl *ApplicationController) normalizeApplication(orig, app *appv1.Application) {
1739
	logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1740
	app.Spec = *argo.NormalizeApplicationSpec(&app.Spec)
1741

1742
	patch, modified, err := diff.CreateTwoWayMergePatch(orig, app, appv1.Application{})
1743

1744
	if err != nil {
1745
		logCtx.Errorf("error constructing app spec patch: %v", err)
1746
	} else if modified {
1747
		_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
1748
		if err != nil {
1749
			logCtx.Errorf("Error persisting normalized application spec: %v", err)
1750
		} else {
1751
			logCtx.Infof("Normalized app spec: %s", string(patch))
1752
		}
1753
	}
1754
}
1755

1756
// persistAppStatus persists updates to application status. If no changes were made, it is a no-op
1757
func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, newStatus *appv1.ApplicationStatus) (patchMs time.Duration) {
1758
	logCtx := log.WithFields(log.Fields{"application": orig.QualifiedName()})
1759
	if orig.Status.Sync.Status != newStatus.Sync.Status {
1760
		message := fmt.Sprintf("Updated sync status: %s -> %s", orig.Status.Sync.Status, newStatus.Sync.Status)
1761
		ctrl.auditLogger.LogAppEvent(orig, argo.EventInfo{Reason: argo.EventReasonResourceUpdated, Type: v1.EventTypeNormal}, message, "")
1762
	}
1763
	if orig.Status.Health.Status != newStatus.Health.Status {
1764
		message := fmt.Sprintf("Updated health status: %s -> %s", orig.Status.Health.Status, newStatus.Health.Status)
1765
		ctrl.auditLogger.LogAppEvent(orig, argo.EventInfo{Reason: argo.EventReasonResourceUpdated, Type: v1.EventTypeNormal}, message, "")
1766
	}
1767
	var newAnnotations map[string]string
1768
	if orig.GetAnnotations() != nil {
1769
		newAnnotations = make(map[string]string)
1770
		for k, v := range orig.GetAnnotations() {
1771
			newAnnotations[k] = v
1772
		}
1773
		delete(newAnnotations, appv1.AnnotationKeyRefresh)
1774
	}
1775
	patch, modified, err := diff.CreateTwoWayMergePatch(
1776
		&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: orig.GetAnnotations()}, Status: orig.Status},
1777
		&appv1.Application{ObjectMeta: metav1.ObjectMeta{Annotations: newAnnotations}, Status: *newStatus}, appv1.Application{})
1778
	if err != nil {
1779
		logCtx.Errorf("Error constructing app status patch: %v", err)
1780
		return
1781
	}
1782
	if !modified {
1783
		logCtx.Infof("No status changes. Skipping patch")
1784
		return
1785
	}
1786
	// calculate time for path call
1787
	start := time.Now()
1788
	defer func() {
1789
		patchMs = time.Since(start)
1790
	}()
1791
	_, err = ctrl.PatchAppWithWriteBack(context.Background(), orig.Name, orig.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
1792
	if err != nil {
1793
		logCtx.Warnf("Error updating application: %v", err)
1794
	} else {
1795
		logCtx.Infof("Update successful")
1796
	}
1797
	return patchMs
1798
}
1799

1800
// autoSync will initiate a sync operation for an application configured with automated sync
1801
func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *appv1.SyncStatus, resources []appv1.ResourceStatus) (*appv1.ApplicationCondition, time.Duration) {
1802
	if app.Spec.SyncPolicy == nil || app.Spec.SyncPolicy.Automated == nil {
1803
		return nil, 0
1804
	}
1805
	logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1806

1807
	if app.Operation != nil {
1808
		logCtx.Infof("Skipping auto-sync: another operation is in progress")
1809
		return nil, 0
1810
	}
1811
	if app.DeletionTimestamp != nil && !app.DeletionTimestamp.IsZero() {
1812
		logCtx.Infof("Skipping auto-sync: deletion in progress")
1813
		return nil, 0
1814
	}
1815

1816
	// Only perform auto-sync if we detect OutOfSync status. This is to prevent us from attempting
1817
	// a sync when application is already in a Synced or Unknown state
1818
	if syncStatus.Status != appv1.SyncStatusCodeOutOfSync {
1819
		logCtx.Infof("Skipping auto-sync: application status is %s", syncStatus.Status)
1820
		return nil, 0
1821
	}
1822

1823
	if !app.Spec.SyncPolicy.Automated.Prune {
1824
		requirePruneOnly := true
1825
		for _, r := range resources {
1826
			if r.Status != appv1.SyncStatusCodeSynced && !r.RequiresPruning {
1827
				requirePruneOnly = false
1828
				break
1829
			}
1830
		}
1831
		if requirePruneOnly {
1832
			logCtx.Infof("Skipping auto-sync: need to prune extra resources only but automated prune is disabled")
1833
			return nil, 0
1834
		}
1835
	}
1836

1837
	desiredCommitSHA := syncStatus.Revision
1838
	desiredCommitSHAsMS := syncStatus.Revisions
1839
	alreadyAttempted, attemptPhase := alreadyAttemptedSync(app, desiredCommitSHA, desiredCommitSHAsMS, app.Spec.HasMultipleSources())
1840
	selfHeal := app.Spec.SyncPolicy.Automated.SelfHeal
1841
	op := appv1.Operation{
1842
		Sync: &appv1.SyncOperation{
1843
			Revision:    desiredCommitSHA,
1844
			Prune:       app.Spec.SyncPolicy.Automated.Prune,
1845
			SyncOptions: app.Spec.SyncPolicy.SyncOptions,
1846
			Revisions:   desiredCommitSHAsMS,
1847
		},
1848
		InitiatedBy: appv1.OperationInitiator{Automated: true},
1849
		Retry:       appv1.RetryStrategy{Limit: 5},
1850
	}
1851
	if app.Spec.SyncPolicy.Retry != nil {
1852
		op.Retry = *app.Spec.SyncPolicy.Retry
1853
	}
1854
	// It is possible for manifests to remain OutOfSync even after a sync/kubectl apply (e.g.
1855
	// auto-sync with pruning disabled). We need to ensure that we do not keep Syncing an
1856
	// application in an infinite loop. To detect this, we only attempt the Sync if the revision
1857
	// and parameter overrides are different from our most recent sync operation.
1858
	if alreadyAttempted && (!selfHeal || !attemptPhase.Successful()) {
1859
		if !attemptPhase.Successful() {
1860
			logCtx.Warnf("Skipping auto-sync: failed previous sync attempt to %s", desiredCommitSHA)
1861
			message := fmt.Sprintf("Failed sync attempt to %s: %s", desiredCommitSHA, app.Status.OperationState.Message)
1862
			return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: message}, 0
1863
		}
1864
		logCtx.Infof("Skipping auto-sync: most recent sync already to %s", desiredCommitSHA)
1865
		return nil, 0
1866
	} else if alreadyAttempted && selfHeal {
1867
		if shouldSelfHeal, retryAfter := ctrl.shouldSelfHeal(app); shouldSelfHeal {
1868
			for _, resource := range resources {
1869
				if resource.Status != appv1.SyncStatusCodeSynced {
1870
					op.Sync.Resources = append(op.Sync.Resources, appv1.SyncOperationResource{
1871
						Kind:  resource.Kind,
1872
						Group: resource.Group,
1873
						Name:  resource.Name,
1874
					})
1875
				}
1876
			}
1877
		} else {
1878
			logCtx.Infof("Skipping auto-sync: already attempted sync to %s with timeout %v (retrying in %v)", desiredCommitSHA, ctrl.selfHealTimeout, retryAfter)
1879
			ctrl.requestAppRefresh(app.QualifiedName(), CompareWithLatest.Pointer(), &retryAfter)
1880
			return nil, 0
1881
		}
1882

1883
	}
1884

1885
	if app.Spec.SyncPolicy.Automated.Prune && !app.Spec.SyncPolicy.Automated.AllowEmpty {
1886
		bAllNeedPrune := true
1887
		for _, r := range resources {
1888
			if !r.RequiresPruning {
1889
				bAllNeedPrune = false
1890
			}
1891
		}
1892
		if bAllNeedPrune {
1893
			message := fmt.Sprintf("Skipping sync attempt to %s: auto-sync will wipe out all resources", desiredCommitSHA)
1894
			logCtx.Warnf(message)
1895
			return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: message}, 0
1896
		}
1897
	}
1898

1899
	appIf := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
1900
	start := time.Now()
1901
	updatedApp, err := argo.SetAppOperation(appIf, app.Name, &op)
1902
	setOpTime := time.Since(start)
1903
	if err != nil {
1904
		if goerrors.Is(err, argo.ErrAnotherOperationInProgress) {
1905
			// skipping auto-sync because another operation is in progress and was not noticed due to stale data in informer
1906
			// it is safe to skip auto-sync because it is already running
1907
			logCtx.Warnf("Failed to initiate auto-sync to %s: %v", desiredCommitSHA, err)
1908
			return nil, 0
1909
		}
1910

1911
		logCtx.Errorf("Failed to initiate auto-sync to %s: %v", desiredCommitSHA, err)
1912
		return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: err.Error()}, setOpTime
1913
	} else {
1914
		ctrl.writeBackToInformer(updatedApp)
1915
	}
1916
	message := fmt.Sprintf("Initiated automated sync to '%s'", desiredCommitSHA)
1917
	ctrl.auditLogger.LogAppEvent(app, argo.EventInfo{Reason: argo.EventReasonOperationStarted, Type: v1.EventTypeNormal}, message, "")
1918
	logCtx.Info(message)
1919
	return nil, setOpTime
1920
}
1921

1922
// alreadyAttemptedSync returns whether the most recent sync was performed against the
1923
// commitSHA and with the same app source config which are currently set in the app
1924
func alreadyAttemptedSync(app *appv1.Application, commitSHA string, commitSHAsMS []string, hasMultipleSources bool) (bool, synccommon.OperationPhase) {
1925
	if app.Status.OperationState == nil || app.Status.OperationState.Operation.Sync == nil || app.Status.OperationState.SyncResult == nil {
1926
		return false, ""
1927
	}
1928
	if hasMultipleSources {
1929
		if !reflect.DeepEqual(app.Status.OperationState.SyncResult.Revisions, commitSHAsMS) {
1930
			return false, ""
1931
		}
1932
	} else {
1933
		if app.Status.OperationState.SyncResult.Revision != commitSHA {
1934
			return false, ""
1935
		}
1936
	}
1937

1938
	if hasMultipleSources {
1939
		// Ignore differences in target revision, since we already just verified commitSHAs are equal,
1940
		// and we do not want to trigger auto-sync due to things like HEAD != master
1941
		specSources := app.Spec.Sources.DeepCopy()
1942
		syncSources := app.Status.OperationState.SyncResult.Sources.DeepCopy()
1943
		for _, source := range specSources {
1944
			source.TargetRevision = ""
1945
		}
1946
		for _, source := range syncSources {
1947
			source.TargetRevision = ""
1948
		}
1949
		return reflect.DeepEqual(app.Spec.Sources, app.Status.OperationState.SyncResult.Sources), app.Status.OperationState.Phase
1950
	} else {
1951
		// Ignore differences in target revision, since we already just verified commitSHAs are equal,
1952
		// and we do not want to trigger auto-sync due to things like HEAD != master
1953
		specSource := app.Spec.Source.DeepCopy()
1954
		specSource.TargetRevision = ""
1955
		syncResSource := app.Status.OperationState.SyncResult.Source.DeepCopy()
1956
		syncResSource.TargetRevision = ""
1957
		return reflect.DeepEqual(app.Spec.GetSource(), app.Status.OperationState.SyncResult.Source), app.Status.OperationState.Phase
1958
	}
1959
}
1960

1961
func (ctrl *ApplicationController) shouldSelfHeal(app *appv1.Application) (bool, time.Duration) {
1962
	if app.Status.OperationState == nil {
1963
		return true, time.Duration(0)
1964
	}
1965

1966
	var retryAfter time.Duration
1967
	if app.Status.OperationState.FinishedAt == nil {
1968
		retryAfter = ctrl.selfHealTimeout
1969
	} else {
1970
		retryAfter = ctrl.selfHealTimeout - time.Since(app.Status.OperationState.FinishedAt.Time)
1971
	}
1972
	return retryAfter <= 0, retryAfter
1973
}
1974

1975
// isAppNamespaceAllowed returns whether the application is allowed in the
1976
// namespace it's residing in.
1977
func (ctrl *ApplicationController) isAppNamespaceAllowed(app *appv1.Application) bool {
1978
	return app.Namespace == ctrl.namespace || glob.MatchStringInList(ctrl.applicationNamespaces, app.Namespace, false)
1979
}
1980

1981
func (ctrl *ApplicationController) canProcessApp(obj interface{}) bool {
1982
	app, ok := obj.(*appv1.Application)
1983
	if !ok {
1984
		return false
1985
	}
1986

1987
	// Only process given app if it exists in a watched namespace, or in the
1988
	// control plane's namespace.
1989
	if !ctrl.isAppNamespaceAllowed(app) {
1990
		return false
1991
	}
1992

1993
	if annotations := app.GetAnnotations(); annotations != nil {
1994
		if skipVal, ok := annotations[common.AnnotationKeyAppSkipReconcile]; ok {
1995
			logCtx := log.WithFields(log.Fields{"application": app.QualifiedName()})
1996
			if skipReconcile, err := strconv.ParseBool(skipVal); err == nil {
1997
				if skipReconcile {
1998
					logCtx.Debugf("Skipping Application reconcile based on annotation %s", common.AnnotationKeyAppSkipReconcile)
1999
					return false
2000
				}
2001
			} else {
2002
				logCtx.Debugf("Unable to determine if Application should skip reconcile based on annotation %s: %v", common.AnnotationKeyAppSkipReconcile, err)
2003
			}
2004
		}
2005
	}
2006

2007
	cluster, err := ctrl.db.GetCluster(context.Background(), app.Spec.Destination.Server)
2008
	if err != nil {
2009
		return ctrl.clusterSharding.IsManagedCluster(nil)
2010
	}
2011
	return ctrl.clusterSharding.IsManagedCluster(cluster)
2012
}
2013

2014
func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.SharedIndexInformer, applisters.ApplicationLister) {
2015
	watchNamespace := ctrl.namespace
2016
	// If we have at least one additional namespace configured, we need to
2017
	// watch on them all.
2018
	if len(ctrl.applicationNamespaces) > 0 {
2019
		watchNamespace = ""
2020
	}
2021
	refreshTimeout := ctrl.statusRefreshTimeout
2022
	if ctrl.statusHardRefreshTimeout.Seconds() != 0 && (ctrl.statusHardRefreshTimeout < ctrl.statusRefreshTimeout) {
2023
		refreshTimeout = ctrl.statusHardRefreshTimeout
2024
	}
2025
	informer := cache.NewSharedIndexInformer(
2026
		&cache.ListWatch{
2027
			ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
2028
				// We are only interested in apps that exist in namespaces the
2029
				// user wants to be enabled.
2030
				appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options)
2031
				if err != nil {
2032
					return nil, err
2033
				}
2034
				newItems := []appv1.Application{}
2035
				for _, app := range appList.Items {
2036
					if ctrl.isAppNamespaceAllowed(&app) {
2037
						newItems = append(newItems, app)
2038
					}
2039
				}
2040
				appList.Items = newItems
2041
				return appList, nil
2042
			},
2043
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
2044
				return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).Watch(context.TODO(), options)
2045
			},
2046
		},
2047
		&appv1.Application{},
2048
		refreshTimeout,
2049
		cache.Indexers{
2050
			cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
2051
				app, ok := obj.(*appv1.Application)
2052
				if ok {
2053
					// We only generally work with applications that are in one
2054
					// the allowed namespaces.
2055
					if ctrl.isAppNamespaceAllowed(app) {
2056
						// If the application is not allowed to use the project,
2057
						// log an error.
2058
						if _, err := ctrl.getAppProj(app); err != nil {
2059
							ctrl.setAppCondition(app, ctrl.projectErrorToCondition(err, app))
2060
						} else {
2061
							// This call to 'ValidateDestination' ensures that the .spec.destination field of all Applications
2062
							// returned by the informer/lister will have server field set (if not already set) based on the name.
2063
							// (or, if not found, an error app condition)
2064

2065
							// If the server field is not set, set it based on the cluster name; if the cluster name can't be found,
2066
							// log an error as an App Condition.
2067
							if err := argo.ValidateDestination(context.Background(), &app.Spec.Destination, ctrl.db); err != nil {
2068
								ctrl.setAppCondition(app, appv1.ApplicationCondition{Type: appv1.ApplicationConditionInvalidSpecError, Message: err.Error()})
2069
							}
2070
						}
2071
					}
2072
				}
2073

2074
				return cache.MetaNamespaceIndexFunc(obj)
2075
			},
2076
			orphanedIndex: func(obj interface{}) (i []string, e error) {
2077
				app, ok := obj.(*appv1.Application)
2078
				if !ok {
2079
					return nil, nil
2080
				}
2081

2082
				if !ctrl.isAppNamespaceAllowed(app) {
2083
					return nil, nil
2084
				}
2085

2086
				proj, err := ctrl.getAppProj(app)
2087
				if err != nil {
2088
					return nil, nil
2089
				}
2090
				if proj.Spec.OrphanedResources != nil {
2091
					return []string{app.Spec.Destination.Namespace}, nil
2092
				}
2093
				return nil, nil
2094
			},
2095
		},
2096
	)
2097
	lister := applisters.NewApplicationLister(informer.GetIndexer())
2098
	_, err := informer.AddEventHandler(
2099
		cache.ResourceEventHandlerFuncs{
2100
			AddFunc: func(obj interface{}) {
2101
				if !ctrl.canProcessApp(obj) {
2102
					return
2103
				}
2104
				key, err := cache.MetaNamespaceKeyFunc(obj)
2105
				if err == nil {
2106
					ctrl.appRefreshQueue.AddRateLimited(key)
2107
					ctrl.appOperationQueue.AddRateLimited(key)
2108
				}
2109
			},
2110
			UpdateFunc: func(old, new interface{}) {
2111
				if !ctrl.canProcessApp(new) {
2112
					return
2113
				}
2114

2115
				key, err := cache.MetaNamespaceKeyFunc(new)
2116
				if err != nil {
2117
					return
2118
				}
2119

2120
				var compareWith *CompareWith
2121
				var delay *time.Duration
2122

2123
				oldApp, oldOK := old.(*appv1.Application)
2124
				newApp, newOK := new.(*appv1.Application)
2125
				if oldOK && newOK {
2126
					if automatedSyncEnabled(oldApp, newApp) {
2127
						log.WithField("application", newApp.QualifiedName()).Info("Enabled automated sync")
2128
						compareWith = CompareWithLatest.Pointer()
2129
					}
2130
					if ctrl.statusRefreshJitter != 0 && oldApp.ResourceVersion == newApp.ResourceVersion {
2131
						// Handler is refreshing the apps, add a random jitter to spread the load and avoid spikes
2132
						jitter := time.Duration(float64(ctrl.statusRefreshJitter) * rand.Float64())
2133
						delay = &jitter
2134
					}
2135
				}
2136

2137
				ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay)
2138
				ctrl.appOperationQueue.AddRateLimited(key)
2139
			},
2140
			DeleteFunc: func(obj interface{}) {
2141
				if !ctrl.canProcessApp(obj) {
2142
					return
2143
				}
2144
				// IndexerInformer uses a delta queue, therefore for deletes we have to use this
2145
				// key function.
2146
				key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
2147
				if err == nil {
2148
					// for deletes, we immediately add to the refresh queue
2149
					ctrl.appRefreshQueue.Add(key)
2150
				}
2151
			},
2152
		},
2153
	)
2154
	if err != nil {
2155
		return nil, nil
2156
	}
2157
	return informer, lister
2158
}
2159

2160
func (ctrl *ApplicationController) projectErrorToCondition(err error, app *appv1.Application) appv1.ApplicationCondition {
2161
	var condition appv1.ApplicationCondition
2162
	if apierr.IsNotFound(err) {
2163
		condition = appv1.ApplicationCondition{
2164
			Type:    appv1.ApplicationConditionInvalidSpecError,
2165
			Message: fmt.Sprintf("Application referencing project %s which does not exist", app.Spec.Project),
2166
		}
2167
	} else {
2168
		condition = appv1.ApplicationCondition{Type: appv1.ApplicationConditionUnknownError, Message: err.Error()}
2169
	}
2170
	return condition
2171
}
2172

2173
func (ctrl *ApplicationController) RegisterClusterSecretUpdater(ctx context.Context) {
2174
	updater := NewClusterInfoUpdater(ctrl.stateCache, ctrl.db, ctrl.appLister.Applications(""), ctrl.cache, ctrl.clusterSharding.IsManagedCluster, ctrl.getAppProj, ctrl.namespace)
2175
	go updater.Run(ctx)
2176
}
2177

2178
func isOperationInProgress(app *appv1.Application) bool {
2179
	return app.Status.OperationState != nil && !app.Status.OperationState.Phase.Completed()
2180
}
2181

2182
// automatedSyncEnabled tests if an app went from auto-sync disabled to enabled.
2183
// if it was toggled to be enabled, the informer handler will force a refresh
2184
func automatedSyncEnabled(oldApp *appv1.Application, newApp *appv1.Application) bool {
2185
	oldEnabled := false
2186
	oldSelfHealEnabled := false
2187
	if oldApp.Spec.SyncPolicy != nil && oldApp.Spec.SyncPolicy.Automated != nil {
2188
		oldEnabled = true
2189
		oldSelfHealEnabled = oldApp.Spec.SyncPolicy.Automated.SelfHeal
2190
	}
2191

2192
	newEnabled := false
2193
	newSelfHealEnabled := false
2194
	if newApp.Spec.SyncPolicy != nil && newApp.Spec.SyncPolicy.Automated != nil {
2195
		newEnabled = true
2196
		newSelfHealEnabled = newApp.Spec.SyncPolicy.Automated.SelfHeal
2197
	}
2198
	if !oldEnabled && newEnabled {
2199
		return true
2200
	}
2201
	if !oldSelfHealEnabled && newSelfHealEnabled {
2202
		return true
2203
	}
2204
	// nothing changed
2205
	return false
2206
}
2207

2208
// toAppKey returns the application key from a given appName, that is, it will
2209
// replace underscores with forward-slashes to become a <namespace>/<name>
2210
// format. If the appName is an unqualified name (such as, "app"), it will use
2211
// the controller's namespace in the key.
2212
func (ctrl *ApplicationController) toAppKey(appName string) string {
2213
	if !strings.Contains(appName, "_") && !strings.Contains(appName, "/") {
2214
		return ctrl.namespace + "/" + appName
2215
	} else if strings.Contains(appName, "/") {
2216
		return appName
2217
	} else {
2218
		return strings.ReplaceAll(appName, "_", "/")
2219
	}
2220
}
2221

2222
func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace string) string {
2223
	return fmt.Sprintf("%s/%s", appNamespace, appName)
2224
}
2225

2226
type ClusterFilterFunction func(c *appv1.Cluster, distributionFunction sharding.DistributionFunction) bool
2227

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.