argo-cd
185 строк · 5.6 Кб
1package sharding
2
3import (
4"sync"
5
6"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
7"github.com/argoproj/argo-cd/v2/util/db"
8log "github.com/sirupsen/logrus"
9)
10
11type ClusterShardingCache interface {
12Init(clusters *v1alpha1.ClusterList)
13Add(c *v1alpha1.Cluster)
14Delete(clusterServer string)
15Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
16IsManagedCluster(c *v1alpha1.Cluster) bool
17GetDistribution() map[string]int
18}
19
20type ClusterSharding struct {
21Shard int
22Replicas int
23Shards map[string]int
24Clusters map[string]*v1alpha1.Cluster
25lock sync.RWMutex
26getClusterShard DistributionFunction
27}
28
29func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
30log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
31clusterSharding := &ClusterSharding{
32Shard: shard,
33Replicas: replicas,
34Shards: make(map[string]int),
35Clusters: make(map[string]*v1alpha1.Cluster),
36}
37distributionFunction := NoShardingDistributionFunction()
38if replicas > 1 {
39log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
40distributionFunction = GetDistributionFunction(clusterSharding.GetClusterAccessor(), shardingAlgorithm, replicas)
41} else {
42log.Info("Processing all cluster shards")
43}
44clusterSharding.getClusterShard = distributionFunction
45return clusterSharding
46}
47
48// IsManagedCluster returns wheter or not the cluster should be processed by a given shard.
49func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool {
50s.lock.RLock()
51defer s.lock.RUnlock()
52if c == nil { // nil cluster (in-cluster) is always managed by current clusterShard
53return true
54}
55clusterShard := 0
56if shard, ok := s.Shards[c.Server]; ok {
57clusterShard = shard
58} else {
59log.Warnf("The cluster %s has no assigned shard.", c.Server)
60}
61log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, s.Shard)
62return clusterShard == s.Shard
63}
64
65func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
66sharding.lock.Lock()
67defer sharding.lock.Unlock()
68newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
69for _, c := range clusters.Items {
70cluster := c
71newClusters[c.Server] = &cluster
72}
73sharding.Clusters = newClusters
74sharding.updateDistribution()
75}
76
77func (sharding *ClusterSharding) Add(c *v1alpha1.Cluster) {
78sharding.lock.Lock()
79defer sharding.lock.Unlock()
80
81old, ok := sharding.Clusters[c.Server]
82sharding.Clusters[c.Server] = c
83if !ok || hasShardingUpdates(old, c) {
84sharding.updateDistribution()
85} else {
86log.Debugf("Skipping sharding distribution update. Cluster already added")
87}
88}
89
90func (sharding *ClusterSharding) Delete(clusterServer string) {
91sharding.lock.Lock()
92defer sharding.lock.Unlock()
93if _, ok := sharding.Clusters[clusterServer]; ok {
94delete(sharding.Clusters, clusterServer)
95delete(sharding.Shards, clusterServer)
96sharding.updateDistribution()
97}
98}
99
100func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
101sharding.lock.Lock()
102defer sharding.lock.Unlock()
103
104if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
105delete(sharding.Clusters, oldCluster.Server)
106delete(sharding.Shards, oldCluster.Server)
107}
108sharding.Clusters[newCluster.Server] = newCluster
109if hasShardingUpdates(oldCluster, newCluster) {
110sharding.updateDistribution()
111} else {
112log.Debugf("Skipping sharding distribution update. No relevant changes")
113}
114}
115
116func (sharding *ClusterSharding) GetDistribution() map[string]int {
117sharding.lock.RLock()
118defer sharding.lock.RUnlock()
119shards := sharding.Shards
120
121distribution := make(map[string]int, len(shards))
122for k, v := range shards {
123distribution[k] = v
124}
125return distribution
126}
127
128func (sharding *ClusterSharding) updateDistribution() {
129for k, c := range sharding.Clusters {
130shard := 0
131if c.Shard != nil {
132requestedShard := int(*c.Shard)
133if requestedShard < sharding.Replicas {
134shard = requestedShard
135} else {
136log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard (%d). Using shard 0.", requestedShard, c.Server, sharding.Replicas)
137}
138} else {
139shard = sharding.getClusterShard(c)
140}
141
142existingShard, ok := sharding.Shards[k]
143if ok && existingShard != shard {
144log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
145} else if !ok {
146log.Infof("Cluster %s has been assigned to shard %d", k, shard)
147} else {
148log.Debugf("Cluster %s has not changed shard", k)
149}
150sharding.Shards[k] = shard
151}
152}
153
154// hasShardingUpdates returns true if the sharding distribution has explicitly changed
155func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
156if old == nil || new == nil {
157return false
158}
159
160// returns true if the cluster id has changed because some sharding algorithms depend on it.
161if old.ID != new.ID {
162return true
163}
164
165if old.Server != new.Server {
166return true
167}
168
169// return false if the shard field has not been modified
170if old.Shard == nil && new.Shard == nil {
171return false
172}
173return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
174}
175
176func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
177return func() []*v1alpha1.Cluster {
178// no need to lock, as this is only called from the updateDistribution function
179clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
180for _, c := range d.Clusters {
181clusters = append(clusters, c)
182}
183return clusters
184}
185}
186