argo-cd

Форк
0
185 строк · 5.6 Кб
1
package sharding
2

3
import (
4
	"sync"
5

6
	"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
7
	"github.com/argoproj/argo-cd/v2/util/db"
8
	log "github.com/sirupsen/logrus"
9
)
10

11
type ClusterShardingCache interface {
12
	Init(clusters *v1alpha1.ClusterList)
13
	Add(c *v1alpha1.Cluster)
14
	Delete(clusterServer string)
15
	Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
16
	IsManagedCluster(c *v1alpha1.Cluster) bool
17
	GetDistribution() map[string]int
18
}
19

20
type ClusterSharding struct {
21
	Shard           int
22
	Replicas        int
23
	Shards          map[string]int
24
	Clusters        map[string]*v1alpha1.Cluster
25
	lock            sync.RWMutex
26
	getClusterShard DistributionFunction
27
}
28

29
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
30
	log.Debugf("Processing clusters from shard %d: Using filter function:  %s", shard, shardingAlgorithm)
31
	clusterSharding := &ClusterSharding{
32
		Shard:    shard,
33
		Replicas: replicas,
34
		Shards:   make(map[string]int),
35
		Clusters: make(map[string]*v1alpha1.Cluster),
36
	}
37
	distributionFunction := NoShardingDistributionFunction()
38
	if replicas > 1 {
39
		log.Debugf("Processing clusters from shard %d: Using filter function:  %s", shard, shardingAlgorithm)
40
		distributionFunction = GetDistributionFunction(clusterSharding.GetClusterAccessor(), shardingAlgorithm, replicas)
41
	} else {
42
		log.Info("Processing all cluster shards")
43
	}
44
	clusterSharding.getClusterShard = distributionFunction
45
	return clusterSharding
46
}
47

48
// IsManagedCluster returns wheter or not the cluster should be processed by a given shard.
49
func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool {
50
	s.lock.RLock()
51
	defer s.lock.RUnlock()
52
	if c == nil { // nil cluster (in-cluster) is always managed by current clusterShard
53
		return true
54
	}
55
	clusterShard := 0
56
	if shard, ok := s.Shards[c.Server]; ok {
57
		clusterShard = shard
58
	} else {
59
		log.Warnf("The cluster %s has no assigned shard.", c.Server)
60
	}
61
	log.Debugf("Checking if cluster %s with clusterShard %d should be processed by shard %d", c.Server, clusterShard, s.Shard)
62
	return clusterShard == s.Shard
63
}
64

65
func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
66
	sharding.lock.Lock()
67
	defer sharding.lock.Unlock()
68
	newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
69
	for _, c := range clusters.Items {
70
		cluster := c
71
		newClusters[c.Server] = &cluster
72
	}
73
	sharding.Clusters = newClusters
74
	sharding.updateDistribution()
75
}
76

77
func (sharding *ClusterSharding) Add(c *v1alpha1.Cluster) {
78
	sharding.lock.Lock()
79
	defer sharding.lock.Unlock()
80

81
	old, ok := sharding.Clusters[c.Server]
82
	sharding.Clusters[c.Server] = c
83
	if !ok || hasShardingUpdates(old, c) {
84
		sharding.updateDistribution()
85
	} else {
86
		log.Debugf("Skipping sharding distribution update. Cluster already added")
87
	}
88
}
89

90
func (sharding *ClusterSharding) Delete(clusterServer string) {
91
	sharding.lock.Lock()
92
	defer sharding.lock.Unlock()
93
	if _, ok := sharding.Clusters[clusterServer]; ok {
94
		delete(sharding.Clusters, clusterServer)
95
		delete(sharding.Shards, clusterServer)
96
		sharding.updateDistribution()
97
	}
98
}
99

100
func (sharding *ClusterSharding) Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster) {
101
	sharding.lock.Lock()
102
	defer sharding.lock.Unlock()
103

104
	if _, ok := sharding.Clusters[oldCluster.Server]; ok && oldCluster.Server != newCluster.Server {
105
		delete(sharding.Clusters, oldCluster.Server)
106
		delete(sharding.Shards, oldCluster.Server)
107
	}
108
	sharding.Clusters[newCluster.Server] = newCluster
109
	if hasShardingUpdates(oldCluster, newCluster) {
110
		sharding.updateDistribution()
111
	} else {
112
		log.Debugf("Skipping sharding distribution update. No relevant changes")
113
	}
114
}
115

116
func (sharding *ClusterSharding) GetDistribution() map[string]int {
117
	sharding.lock.RLock()
118
	defer sharding.lock.RUnlock()
119
	shards := sharding.Shards
120

121
	distribution := make(map[string]int, len(shards))
122
	for k, v := range shards {
123
		distribution[k] = v
124
	}
125
	return distribution
126
}
127

128
func (sharding *ClusterSharding) updateDistribution() {
129
	for k, c := range sharding.Clusters {
130
		shard := 0
131
		if c.Shard != nil {
132
			requestedShard := int(*c.Shard)
133
			if requestedShard < sharding.Replicas {
134
				shard = requestedShard
135
			} else {
136
				log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard (%d). Using shard 0.", requestedShard, c.Server, sharding.Replicas)
137
			}
138
		} else {
139
			shard = sharding.getClusterShard(c)
140
		}
141

142
		existingShard, ok := sharding.Shards[k]
143
		if ok && existingShard != shard {
144
			log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
145
		} else if !ok {
146
			log.Infof("Cluster %s has been assigned to shard %d", k, shard)
147
		} else {
148
			log.Debugf("Cluster %s has not changed shard", k)
149
		}
150
		sharding.Shards[k] = shard
151
	}
152
}
153

154
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
155
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
156
	if old == nil || new == nil {
157
		return false
158
	}
159

160
	// returns true if the cluster id has changed because some sharding algorithms depend on it.
161
	if old.ID != new.ID {
162
		return true
163
	}
164

165
	if old.Server != new.Server {
166
		return true
167
	}
168

169
	// return false if the shard field has not been modified
170
	if old.Shard == nil && new.Shard == nil {
171
		return false
172
	}
173
	return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
174
}
175

176
func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
177
	return func() []*v1alpha1.Cluster {
178
		// no need to lock, as this is only called from the updateDistribution function
179
		clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
180
		for _, c := range d.Clusters {
181
			clusters = append(clusters, c)
182
		}
183
		return clusters
184
	}
185
}
186

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.