argo-cd

Форк
0
423 строки · 16.7 Кб
1
package sharding
2

3
import (
4
	"context"
5
	"fmt"
6
	"hash/fnv"
7
	"math"
8
	"os"
9
	"sort"
10
	"strconv"
11
	"strings"
12
	"time"
13

14
	"encoding/json"
15

16
	"github.com/argoproj/argo-cd/v2/common"
17
	"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
18
	v1 "k8s.io/api/core/v1"
19
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
	"k8s.io/client-go/kubernetes"
21

22
	"github.com/argoproj/argo-cd/v2/util/db"
23
	"github.com/argoproj/argo-cd/v2/util/env"
24
	"github.com/argoproj/argo-cd/v2/util/errors"
25
	"github.com/argoproj/argo-cd/v2/util/settings"
26
	log "github.com/sirupsen/logrus"
27
	kubeerrors "k8s.io/apimachinery/pkg/api/errors"
28
)
29

30
// Make it overridable for testing
31
var osHostnameFunction = os.Hostname
32

33
// Make it overridable for testing
34
var heartbeatCurrentTime = metav1.Now
35

36
var (
37
	HeartbeatDuration = env.ParseNumFromEnv(common.EnvControllerHeartbeatTime, 10, 10, 60)
38
	HeartbeatTimeout  = 3 * HeartbeatDuration
39
)
40

41
const ShardControllerMappingKey = "shardControllerMapping"
42

43
type DistributionFunction func(c *v1alpha1.Cluster) int
44
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool
45
type clusterAccessor func() []*v1alpha1.Cluster
46

47
// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
48
// It also stores the heartbeat of last synced time of the application controller.
49
type shardApplicationControllerMapping struct {
50
	ShardNumber    int
51
	ControllerName string
52
	HeartbeatTime  metav1.Time
53
}
54

55
// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
56
// and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction
57
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
58
// the function will return true.
59
func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, replicas, shard int) ClusterFilterFunction {
60
	return func(c *v1alpha1.Cluster) bool {
61
		clusterShard := 0
62
		if c != nil && c.Shard != nil {
63
			requestedShard := int(*c.Shard)
64
			if requestedShard < replicas {
65
				clusterShard = requestedShard
66
			} else {
67
				log.Warnf("Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically.", requestedShard, c.Name)
68
			}
69
		} else {
70
			clusterShard = distributionFunction(c)
71
		}
72
		return clusterShard == shard
73
	}
74
}
75

76
// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
77
// the current datas.
78
func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
79
	log.Debugf("Using filter function:  %s", shardingAlgorithm)
80
	distributionFunction := LegacyDistributionFunction(replicasCount)
81
	switch shardingAlgorithm {
82
	case common.RoundRobinShardingAlgorithm:
83
		distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount)
84
	case common.LegacyShardingAlgorithm:
85
		distributionFunction = LegacyDistributionFunction(replicasCount)
86
	default:
87
		log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm)
88
	}
89
	return distributionFunction
90
}
91

92
// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
93
// for a given cluster the function will return the shard number based on the cluster id. This function
94
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
95
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
96
// kept for compatibility reasons
97
func LegacyDistributionFunction(replicas int) DistributionFunction {
98
	return func(c *v1alpha1.Cluster) int {
99
		if replicas == 0 {
100
			log.Debugf("Replicas count is : %d, returning -1", replicas)
101
			return -1
102
		}
103
		if c == nil {
104
			log.Debug("In-cluster: returning 0")
105
			return 0
106
		}
107
		// if Shard is manually set and the assigned value is lower than the number of replicas,
108
		// then its value is returned otherwise it is the default calculated value
109
		if c.Shard != nil && int(*c.Shard) < replicas {
110
			return int(*c.Shard)
111
		}
112
		id := c.ID
113
		log.Debugf("Calculating cluster shard for cluster id: %s", id)
114
		if id == "" {
115
			return 0
116
		} else {
117
			h := fnv.New32a()
118
			_, _ = h.Write([]byte(id))
119
			shard := int32(h.Sum32() % uint32(replicas))
120
			log.Debugf("Cluster with id=%s will be processed by shard %d", id, shard)
121
			return int(shard)
122
		}
123
	}
124
}
125

126
// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
127
// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
128
// the cluster's list sorted by uid on the shard number.
129
// This function ensures an homogenous distribution: each shards got assigned the same number of
130
// clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes
131
// in the cluster list
132

133
func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction {
134
	return func(c *v1alpha1.Cluster) int {
135
		if replicas > 0 {
136
			if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here.
137
				return 0
138
			}
139
			// if Shard is manually set and the assigned value is lower than the number of replicas,
140
			// then its value is returned otherwise it is the default calculated value
141
			if c.Shard != nil && int(*c.Shard) < replicas {
142
				return int(*c.Shard)
143
			} else {
144
				clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap(clusters)
145
				clusterIndex, ok := clusterIndexdByClusterIdMap[c.ID]
146
				if !ok {
147
					log.Warnf("Cluster with id=%s not found in cluster map.", c.ID)
148
					return -1
149
				}
150
				shard := int(clusterIndex % replicas)
151
				log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard)
152
				return shard
153
			}
154
		}
155
		log.Warnf("The number of replicas (%d) is lower than 1", replicas)
156
		return -1
157
	}
158
}
159

160
// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
161
// the function is created for API compatibility purposes and is not supposed to be activated.
162
func NoShardingDistributionFunction() DistributionFunction {
163
	return func(c *v1alpha1.Cluster) int { return 0 }
164
}
165

166
// InferShard extracts the shard index based on its hostname.
167
func InferShard() (int, error) {
168
	hostname, err := osHostnameFunction()
169
	if err != nil {
170
		return -1, err
171
	}
172
	parts := strings.Split(hostname, "-")
173
	if len(parts) == 0 {
174
		log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
175
		return 0, nil
176
	}
177
	shard, err := strconv.Atoi(parts[len(parts)-1])
178
	if err != nil {
179
		log.Warnf("hostname should end with shard number separated by '-' but got: %s", hostname)
180
		return 0, nil
181
	}
182
	return int(shard), nil
183
}
184

185
func getSortedClustersList(getCluster clusterAccessor) []*v1alpha1.Cluster {
186
	clusters := getCluster()
187
	sort.Slice(clusters, func(i, j int) bool {
188
		return clusters[i].ID < clusters[j].ID
189
	})
190
	return clusters
191
}
192

193
func createClusterIndexByClusterIdMap(getCluster clusterAccessor) map[string]int {
194
	clusters := getSortedClustersList(getCluster)
195
	log.Debugf("ClustersList has %d items", len(clusters))
196
	clusterById := make(map[string]*v1alpha1.Cluster)
197
	clusterIndexedByClusterId := make(map[string]int)
198
	for i, cluster := range clusters {
199
		log.Debugf("Adding cluster with id=%s and name=%s to cluster's map", cluster.ID, cluster.Name)
200
		clusterById[cluster.ID] = cluster
201
		clusterIndexedByClusterId[cluster.ID] = i
202
	}
203
	return clusterIndexedByClusterId
204
}
205

206
// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist,
207
// the function creates the shard mapping configmap.
208
// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function.
209
// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
210
// we default the shard number to 0 for computing the default config map.
211
func GetOrUpdateShardFromConfigMap(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) {
212
	hostname, err := osHostnameFunction()
213
	if err != nil {
214
		return -1, err
215
	}
216

217
	// fetch the shard mapping configMap
218
	shardMappingCM, err := kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Get(context.Background(), common.ArgoCDAppControllerShardConfigMapName, metav1.GetOptions{})
219

220
	if err != nil {
221
		if !kubeerrors.IsNotFound(err) {
222
			return -1, fmt.Errorf("error getting sharding config map: %s", err)
223
		}
224
		log.Infof("shard mapping configmap %s not found. Creating default shard mapping configmap.", common.ArgoCDAppControllerShardConfigMapName)
225

226
		// if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM
227
		if shard == -1 {
228
			shard = 0
229
		}
230
		shardMappingCM, err = generateDefaultShardMappingCM(settingsMgr.GetNamespace(), hostname, replicas, shard)
231
		if err != nil {
232
			return -1, fmt.Errorf("error generating default shard mapping configmap %s", err)
233
		}
234
		if _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Create(context.Background(), shardMappingCM, metav1.CreateOptions{}); err != nil {
235
			return -1, fmt.Errorf("error creating shard mapping configmap %s", err)
236
		}
237
		// return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap
238
		return shard, nil
239
	} else {
240
		// Identify the available shard and update the ConfigMap
241
		data := shardMappingCM.Data[ShardControllerMappingKey]
242
		var shardMappingData []shardApplicationControllerMapping
243
		err := json.Unmarshal([]byte(data), &shardMappingData)
244
		if err != nil {
245
			return -1, fmt.Errorf("error unmarshalling shard config map data: %s", err)
246
		}
247

248
		shard, shardMappingData := getOrUpdateShardNumberForController(shardMappingData, hostname, replicas, shard)
249
		updatedShardMappingData, err := json.Marshal(shardMappingData)
250
		if err != nil {
251
			return -1, fmt.Errorf("error marshalling data of shard mapping ConfigMap: %s", err)
252
		}
253
		shardMappingCM.Data[ShardControllerMappingKey] = string(updatedShardMappingData)
254

255
		_, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Update(context.Background(), shardMappingCM, metav1.UpdateOptions{})
256
		if err != nil {
257
			return -1, err
258
		}
259
		return shard, nil
260
	}
261
}
262

263
// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number
264
func getOrUpdateShardNumberForController(shardMappingData []shardApplicationControllerMapping, hostname string, replicas, shard int) (int, []shardApplicationControllerMapping) {
265

266
	// if current length of shardMappingData in shard mapping configMap is less than the number of replicas,
267
	// create additional empty entries for missing shard numbers in shardMappingDataconfigMap
268
	if len(shardMappingData) < replicas {
269
		// generate extra default mappings
270
		for currentShard := len(shardMappingData); currentShard < replicas; currentShard++ {
271
			shardMappingData = append(shardMappingData, shardApplicationControllerMapping{
272
				ShardNumber: currentShard,
273
			})
274
		}
275
	}
276

277
	// if current length of shardMappingData in shard mapping configMap is more than the number of replicas,
278
	// we replace the config map with default config map and let controllers self assign the new shard to itself
279
	if len(shardMappingData) > replicas {
280
		shardMappingData = getDefaultShardMappingData(replicas)
281
	}
282

283
	if shard != -1 && shard < replicas {
284
		log.Debugf("update heartbeat for shard %d", shard)
285
		for i := range shardMappingData {
286
			shardMapping := shardMappingData[i]
287
			if shardMapping.ShardNumber == shard {
288
				log.Debugf("Shard found. Updating heartbeat!!")
289
				shardMapping.ControllerName = hostname
290
				shardMapping.HeartbeatTime = heartbeatCurrentTime()
291
				shardMappingData[i] = shardMapping
292
				break
293
			}
294
		}
295
	} else {
296
		// find the matching shard with assigned controllerName
297
		for i := range shardMappingData {
298
			shardMapping := shardMappingData[i]
299
			if shardMapping.ControllerName == hostname {
300
				log.Debugf("Shard matched. Updating heartbeat!!")
301
				shard = int(shardMapping.ShardNumber)
302
				shardMapping.HeartbeatTime = heartbeatCurrentTime()
303
				shardMappingData[i] = shardMapping
304
				break
305
			}
306
		}
307
	}
308

309
	// at this point, we have still not found a shard with matching hostname.
310
	// So, find a shard with either no controller assigned or assigned controller
311
	// with heartbeat past threshold
312
	if shard == -1 {
313
		for i := range shardMappingData {
314
			shardMapping := shardMappingData[i]
315
			if (shardMapping.ControllerName == "") || (metav1.Now().After(shardMapping.HeartbeatTime.Add(time.Duration(HeartbeatTimeout) * time.Second))) {
316
				shard = int(shardMapping.ShardNumber)
317
				log.Debugf("Empty shard found %d", shard)
318
				shardMapping.ControllerName = hostname
319
				shardMapping.HeartbeatTime = heartbeatCurrentTime()
320
				shardMappingData[i] = shardMapping
321
				break
322
			}
323
		}
324
	}
325
	return shard, shardMappingData
326
}
327

328
// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0.
329
func generateDefaultShardMappingCM(namespace, hostname string, replicas, shard int) (*v1.ConfigMap, error) {
330

331
	shardingCM := &v1.ConfigMap{
332
		ObjectMeta: metav1.ObjectMeta{
333
			Name:      common.ArgoCDAppControllerShardConfigMapName,
334
			Namespace: namespace,
335
		},
336
		Data: map[string]string{},
337
	}
338

339
	shardMappingData := getDefaultShardMappingData(replicas)
340

341
	// if shard is not assigned to a controller, we use shard 0
342
	if shard == -1 || shard > replicas {
343
		shard = 0
344
	}
345
	shardMappingData[shard].ControllerName = hostname
346
	shardMappingData[shard].HeartbeatTime = heartbeatCurrentTime()
347

348
	data, err := json.Marshal(shardMappingData)
349
	if err != nil {
350
		return nil, fmt.Errorf("error generating default ConfigMap: %s", err)
351
	}
352
	shardingCM.Data[ShardControllerMappingKey] = string(data)
353

354
	return shardingCM, nil
355
}
356

357
func getDefaultShardMappingData(replicas int) []shardApplicationControllerMapping {
358
	shardMappingData := make([]shardApplicationControllerMapping, 0)
359

360
	for i := 0; i < replicas; i++ {
361
		mapping := shardApplicationControllerMapping{
362
			ShardNumber: i,
363
		}
364
		shardMappingData = append(shardMappingData, mapping)
365
	}
366
	return shardMappingData
367
}
368

369
func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (ClusterShardingCache, error) {
370
	var replicasCount int
371
	if enableDynamicClusterDistribution {
372
		applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
373
		appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
374

375
		// if app controller deployment is not found when dynamic cluster distribution is enabled error out
376
		if err != nil {
377
			return nil, fmt.Errorf("(dynamic cluster distribution) failed to get app controller deployment: %v", err)
378
		}
379

380
		if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
381
			replicasCount = int(*appControllerDeployment.Spec.Replicas)
382
		} else {
383
			return nil, fmt.Errorf("(dynamic cluster distribution) failed to get app controller deployment replica count")
384
		}
385

386
	} else {
387
		replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
388
	}
389
	shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
390
	if replicasCount > 1 {
391
		// check for shard mapping using configmap if application-controller is a deployment
392
		// else use existing logic to infer shard from pod name if application-controller is a statefulset
393
		if enableDynamicClusterDistribution {
394
			var err error
395
			// retry 3 times if we find a conflict while updating shard mapping configMap.
396
			// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
397
			for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
398
				shardNumber, err = GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicasCount, shardNumber)
399
				if err != nil && !kubeerrors.IsConflict(err) {
400
					err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
401
					break
402
				}
403
				log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
404
			}
405
			errors.CheckError(err)
406
		} else {
407
			if shardNumber < 0 {
408
				var err error
409
				shardNumber, err = InferShard()
410
				errors.CheckError(err)
411
			}
412
			if shardNumber > replicasCount {
413
				log.Warnf("Calculated shard number %d is greated than the number of replicas count. Defaulting to 0", shardNumber)
414
				shardNumber = 0
415
			}
416
		}
417
	} else {
418
		log.Info("Processing all cluster shards")
419
		shardNumber = 0
420
	}
421
	db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
422
	return NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
423
}
424

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.