argo-cd
423 строки · 16.7 Кб
1package sharding
2
3import (
4"context"
5"fmt"
6"hash/fnv"
7"math"
8"os"
9"sort"
10"strconv"
11"strings"
12"time"
13
14"encoding/json"
15
16"github.com/argoproj/argo-cd/v2/common"
17"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
18v1 "k8s.io/api/core/v1"
19metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20"k8s.io/client-go/kubernetes"
21
22"github.com/argoproj/argo-cd/v2/util/db"
23"github.com/argoproj/argo-cd/v2/util/env"
24"github.com/argoproj/argo-cd/v2/util/errors"
25"github.com/argoproj/argo-cd/v2/util/settings"
26log "github.com/sirupsen/logrus"
27kubeerrors "k8s.io/apimachinery/pkg/api/errors"
28)
29
30// Make it overridable for testing
31var osHostnameFunction = os.Hostname
32
33// Make it overridable for testing
34var heartbeatCurrentTime = metav1.Now
35
36var (
37HeartbeatDuration = env.ParseNumFromEnv(common.EnvControllerHeartbeatTime, 10, 10, 60)
38HeartbeatTimeout = 3 * HeartbeatDuration
39)
40
41const ShardControllerMappingKey = "shardControllerMapping"
42
43type DistributionFunction func(c *v1alpha1.Cluster) int
44type ClusterFilterFunction func(c *v1alpha1.Cluster) bool
45type clusterAccessor func() []*v1alpha1.Cluster
46
47// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
48// It also stores the heartbeat of last synced time of the application controller.
49type shardApplicationControllerMapping struct {
50ShardNumber int
51ControllerName string
52HeartbeatTime metav1.Time
53}
54
55// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
56// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
57// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
58// the function will return true.
59func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, replicas, shard int) ClusterFilterFunction {
60return func(c *v1alpha1.Cluster) bool {
61clusterShard := 0
62if c != nil && c.Shard != nil {
63requestedShard := int(*c.Shard)
64if requestedShard < replicas {
65clusterShard = requestedShard
66} else {
67log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name)
68}
69} else {
70clusterShard = distributionFunction(c)
71}
72return clusterShard == shard
73}
74}
75
76// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
77// the current datas.
78func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
79log.Debugf("Using filter function: %s", shardingAlgorithm)
80distributionFunction := LegacyDistributionFunction(replicasCount)
81switch shardingAlgorithm {
82case common.RoundRobinShardingAlgorithm:
83distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
84case common.LegacyShardingAlgorithm:
85distributionFunction = LegacyDistributionFunction(replicasCount)
86default:
87log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
88}
89return distributionFunction
90}
91
92// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
93// for a given cluster the function will return the shard number based on the cluster id. This function
94// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
95// some shards may get assigned more clusters than others. It is the legacy function distribution that is
96// kept for compatibility reasons
97func LegacyDistributionFunction(replicas int) DistributionFunction {
98return func(c *v1alpha1.Cluster) int {
99if replicas == 0 {
100log.Debugf("Replicas count is : %d, returning -1", replicas)
101return -1
102}
103if c == nil {
104log.Debug("In-cluster: returning 0")
105return 0
106}
107// if Shard is manually set and the assigned value is lower than the number of replicas,
108// then its value is returned otherwise it is the default calculated value
109if c.Shard != nil && int(*c.Shard) < replicas {
110return int(*c.Shard)
111}
112id := c.ID
113log.Debugf("Calculating cluster shard for cluster id: %s", id)
114if id == "" {
115return 0
116} else {
117h := fnv.New32a()
118_, _ = h.Write([]byte(id))
119shard := int32(h.Sum32() % uint32(replicas))
120log.Debugf("Cluster with id=%s will be processed by shard %d", id, shard)
121return int(shard)
122}
123}
124}
125
126// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
127// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
128// the cluster's list sorted by uid on the shard number.
129// This function ensures an homogenous distribution: each shards got assigned the same number of
130// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
131// in the cluster list
132
133func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
134return func(c *v1alpha1.Cluster) int {
135if replicas > 0 {
136if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
137return 0
138}
139// if Shard is manually set and the assigned value is lower than the number of replicas,
140// then its value is returned otherwise it is the default calculated value
141if c.Shard != nil && int(*c.Shard) < replicas {
142return int(*c.Shard)
143} else {
144clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(clusters)
145clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
146if !ok {
147log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
148return -1
149}
150shard := int(clusterIndex % replicas)
151log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
152return shard
153}
154}
155log.Warnf("The number of replicas (%d) is lower than 1", replicas)
156return -1
157}
158}
159
160// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
161// the function is created for API compatibility purposes and is not supposed to be activated.
162func NoShardingDistributionFunction() DistributionFunction {
163return func(c *v1alpha1.Cluster) int { return 0 }
164}
165
166// InferShard extracts the shard index based on its hostname.
167func InferShard() (int, error) {
168hostname, err := osHostnameFunction()
169if err != nil {
170return -1, err
171}
172parts := strings.Split(hostname, "-")
173if len(parts) == 0 {
174log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
175return 0, nil
176}
177shard, err := strconv.Atoi(parts[len(parts)-1])
178if err != nil {
179log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
180return 0, nil
181}
182return int(shard), nil
183}
184
185func getSortedClustersList(getCluster clusterAccessor) []*v1alpha1.Cluster {
186clusters := getCluster()
187sort.Slice(clusters, func(i, j int) bool {
188return clusters[i].ID < clusters[j].ID
189})
190return clusters
191}
192
193func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int {
194clusters := getSortedClustersList(getCluster)
195log.Debugf("ClustersList has %d items", len(clusters))
196clusterById := make(map[string]*v1alpha1.Cluster)
197clusterIndexedByClusterId := make(map[string]int)
198for i, cluster := range clusters {
199log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
200clusterById[cluster.ID] = cluster
201clusterIndexedByClusterId[cluster.ID] = i
202}
203return clusterIndexedByClusterId
204}
205
206// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist,
207// the function creates the shard mapping configmap.
208// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function.
209// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
210// we default the shard number to 0 for computing the default config map.
211func GetOrUpdateShardFromConfigMap(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) {
212hostname, err := osHostnameFunction()
213if err != nil {
214return -1, err
215}
216
217// fetch the shard mapping configMap
218shardMappingCM, err := kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Get(context.Background(), common.ArgoCDAppControllerShardConfigMapName, metav1.GetOptions{})
219
220if err != nil {
221if !kubeerrors.IsNotFound(err) {
222return -1, fmt.Errorf("error getting sharding config map: %s", err)
223}
224log.Infof("shard mapping configmap %s not found. Creating default shard mapping configmap.", common.ArgoCDAppControllerShardConfigMapName)
225
226// if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM
227if shard == -1 {
228shard = 0
229}
230shardMappingCM, err = generateDefaultShardMappingCM(settingsMgr.GetNamespace(), hostname, replicas, shard)
231if err != nil {
232return -1, fmt.Errorf("error generating default shard mapping configmap %s", err)
233}
234if _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Create(context.Background(), shardMappingCM, metav1.CreateOptions{}); err != nil {
235return -1, fmt.Errorf("error creating shard mapping configmap %s", err)
236}
237// return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap
238return shard, nil
239} else {
240// Identify the available shard and update the ConfigMap
241data := shardMappingCM.Data[ShardControllerMappingKey]
242var shardMappingData []shardApplicationControllerMapping
243err := json.Unmarshal([]byte(data), &shardMappingData)
244if err != nil {
245return -1, fmt.Errorf("error unmarshalling shard config map data: %s", err)
246}
247
248shard, shardMappingData := getOrUpdateShardNumberForController(shardMappingData, hostname, replicas, shard)
249updatedShardMappingData, err := json.Marshal(shardMappingData)
250if err != nil {
251return -1, fmt.Errorf("error marshalling data of shard mapping ConfigMap: %s", err)
252}
253shardMappingCM.Data[ShardControllerMappingKey] = string(updatedShardMappingData)
254
255_, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Update(context.Background(), shardMappingCM, metav1.UpdateOptions{})
256if err != nil {
257return -1, err
258}
259return shard, nil
260}
261}
262
263// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number
264func getOrUpdateShardNumberForController(shardMappingData []shardApplicationControllerMapping, hostname string, replicas, shard int) (int, []shardApplicationControllerMapping) {
265
266// if current length of shardMappingData in shard mapping configMap is less than the number of replicas,
267// create additional empty entries for missing shard numbers in shardMappingDataconfigMap
268if len(shardMappingData) < replicas {
269// generate extra default mappings
270for currentShard := len(shardMappingData); currentShard < replicas; currentShard++ {
271shardMappingData = append(shardMappingData, shardApplicationControllerMapping{
272ShardNumber: currentShard,
273})
274}
275}
276
277// if current length of shardMappingData in shard mapping configMap is more than the number of replicas,
278// we replace the config map with default config map and let controllers self assign the new shard to itself
279if len(shardMappingData) > replicas {
280shardMappingData = getDefaultShardMappingData(replicas)
281}
282
283if shard != -1 && shard < replicas {
284log.Debugf("update heartbeat for shard %d", shard)
285for i := range shardMappingData {
286shardMapping := shardMappingData[i]
287if shardMapping.ShardNumber == shard {
288log.Debugf("Shard found. Updating heartbeat!!")
289shardMapping.ControllerName = hostname
290shardMapping.HeartbeatTime = heartbeatCurrentTime()
291shardMappingData[i] = shardMapping
292break
293}
294}
295} else {
296// find the matching shard with assigned controllerName
297for i := range shardMappingData {
298shardMapping := shardMappingData[i]
299if shardMapping.ControllerName == hostname {
300log.Debugf("Shard matched. Updating heartbeat!!")
301shard = int(shardMapping.ShardNumber)
302shardMapping.HeartbeatTime = heartbeatCurrentTime()
303shardMappingData[i] = shardMapping
304break
305}
306}
307}
308
309// at this point, we have still not found a shard with matching hostname.
310// So, find a shard with either no controller assigned or assigned controller
311// with heartbeat past threshold
312if shard == -1 {
313for i := range shardMappingData {
314shardMapping := shardMappingData[i]
315if (shardMapping.ControllerName == "") || (metav1.Now().After(shardMapping.HeartbeatTime.Add(time.Duration(HeartbeatTimeout) * time.Second))) {
316shard = int(shardMapping.ShardNumber)
317log.Debugf("Empty shard found %d", shard)
318shardMapping.ControllerName = hostname
319shardMapping.HeartbeatTime = heartbeatCurrentTime()
320shardMappingData[i] = shardMapping
321break
322}
323}
324}
325return shard, shardMappingData
326}
327
328// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0.
329func generateDefaultShardMappingCM(namespace, hostname string, replicas, shard int) (*v1.ConfigMap, error) {
330
331shardingCM := &v1.ConfigMap{
332ObjectMeta: metav1.ObjectMeta{
333Name: common.ArgoCDAppControllerShardConfigMapName,
334Namespace: namespace,
335},
336Data: map[string]string{},
337}
338
339shardMappingData := getDefaultShardMappingData(replicas)
340
341// if shard is not assigned to a controller, we use shard 0
342if shard == -1 || shard > replicas {
343shard = 0
344}
345shardMappingData[shard].ControllerName = hostname
346shardMappingData[shard].HeartbeatTime = heartbeatCurrentTime()
347
348data, err := json.Marshal(shardMappingData)
349if err != nil {
350return nil, fmt.Errorf("error generating default ConfigMap: %s", err)
351}
352shardingCM.Data[ShardControllerMappingKey] = string(data)
353
354return shardingCM, nil
355}
356
357func getDefaultShardMappingData(replicas int) []shardApplicationControllerMapping {
358shardMappingData := make([]shardApplicationControllerMapping, 0)
359
360for i := 0; i < replicas; i++ {
361mapping := shardApplicationControllerMapping{
362ShardNumber: i,
363}
364shardMappingData = append(shardMappingData, mapping)
365}
366return shardMappingData
367}
368
369func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (ClusterShardingCache, error) {
370var replicasCount int
371if enableDynamicClusterDistribution {
372applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
373appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
374
375// if app controller deployment is not found when dynamic cluster distribution is enabled error out
376if err != nil {
377return nil, fmt.Errorf("(dynamic cluster distribution) failed to get app controller deployment: %v", err)
378}
379
380if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
381replicasCount = int(*appControllerDeployment.Spec.Replicas)
382} else {
383return nil, fmt.Errorf("(dynamic cluster distribution) failed to get app controller deployment replica count")
384}
385
386} else {
387replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
388}
389shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
390if replicasCount > 1 {
391// check for shard mapping using configmap if application-controller is a deployment
392// else use existing logic to infer shard from pod name if application-controller is a statefulset
393if enableDynamicClusterDistribution {
394var err error
395// retry 3 times if we find a conflict while updating shard mapping configMap.
396// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
397for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
398shardNumber, err = GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
399if err != nil && !kubeerrors.IsConflict(err) {
400err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
401break
402}
403log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
404}
405errors.CheckError(err)
406} else {
407if shardNumber < 0 {
408var err error
409shardNumber, err = InferShard()
410errors.CheckError(err)
411}
412if shardNumber > replicasCount {
413log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
414shardNumber = 0
415}
416}
417} else {
418log.Info("Processing all cluster shards")
419shardNumber = 0
420}
421db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
422return NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
423}
424