cubefs

Форк
0
/
balance_strategy.go 
1136 строк · 43.6 Кб
1
package sarama
2

3
import (
4
	"container/heap"
5
	"errors"
6
	"fmt"
7
	"math"
8
	"sort"
9
	"strings"
10
)
11

12
const (
13
	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
14
	RangeBalanceStrategyName = "range"
15

16
	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
17
	RoundRobinBalanceStrategyName = "roundrobin"
18

19
	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
20
	StickyBalanceStrategyName = "sticky"
21

22
	defaultGeneration = -1
23
)
24

25
// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
26
// It contains an allocation of topic/partitions by memberID in the form of
27
// a `memberID -> topic -> partitions` map.
28
type BalanceStrategyPlan map[string]map[string][]int32
29

30
// Add assigns a topic with a number partitions to a member.
31
func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
32
	if len(partitions) == 0 {
33
		return
34
	}
35
	if _, ok := p[memberID]; !ok {
36
		p[memberID] = make(map[string][]int32, 1)
37
	}
38
	p[memberID][topic] = append(p[memberID][topic], partitions...)
39
}
40

41
// --------------------------------------------------------------------
42

43
// BalanceStrategy is used to balance topics and partitions
44
// across members of a consumer group
45
type BalanceStrategy interface {
46
	// Name uniquely identifies the strategy.
47
	Name() string
48

49
	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
50
	// and returns a distribution plan.
51
	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
52

53
	// AssignmentData returns the serialized assignment data for the specified
54
	// memberID
55
	AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
56
}
57

58
// --------------------------------------------------------------------
59

60
// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
61
// Example with one topic T with six partitions (0..5) and two members (M1, M2):
62
//   M1: {T: [0, 1, 2]}
63
//   M2: {T: [3, 4, 5]}
64
var BalanceStrategyRange = &balanceStrategy{
65
	name: RangeBalanceStrategyName,
66
	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
67
		step := float64(len(partitions)) / float64(len(memberIDs))
68

69
		for i, memberID := range memberIDs {
70
			pos := float64(i)
71
			min := int(math.Floor(pos*step + 0.5))
72
			max := int(math.Floor((pos+1)*step + 0.5))
73
			plan.Add(memberID, topic, partitions[min:max]...)
74
		}
75
	},
76
}
77

78
// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
79
// while maintain a balanced partition distribution.
80
// Example with topic T with six partitions (0..5) and two members (M1, M2):
81
//   M1: {T: [0, 2, 4]}
82
//   M2: {T: [1, 3, 5]}
83
//
84
// On reassignment with an additional consumer, you might get an assignment plan like:
85
//   M1: {T: [0, 2]}
86
//   M2: {T: [1, 3]}
87
//   M3: {T: [4, 5]}
88
//
89
var BalanceStrategySticky = &stickyBalanceStrategy{}
90

91
// --------------------------------------------------------------------
92

93
type balanceStrategy struct {
94
	name   string
95
	coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
96
}
97

98
// Name implements BalanceStrategy.
99
func (s *balanceStrategy) Name() string { return s.name }
100

101
// Plan implements BalanceStrategy.
102
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
103
	// Build members by topic map
104
	mbt := make(map[string][]string)
105
	for memberID, meta := range members {
106
		for _, topic := range meta.Topics {
107
			mbt[topic] = append(mbt[topic], memberID)
108
		}
109
	}
110

111
	// Sort members for each topic
112
	for topic, memberIDs := range mbt {
113
		sort.Sort(&balanceStrategySortable{
114
			topic:     topic,
115
			memberIDs: memberIDs,
116
		})
117
	}
118

119
	// Assemble plan
120
	plan := make(BalanceStrategyPlan, len(members))
121
	for topic, memberIDs := range mbt {
122
		s.coreFn(plan, memberIDs, topic, topics[topic])
123
	}
124
	return plan, nil
125
}
126

127
// AssignmentData simple strategies do not require any shared assignment data
128
func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
129
	return nil, nil
130
}
131

132
type balanceStrategySortable struct {
133
	topic     string
134
	memberIDs []string
135
}
136

137
func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
138
func (p balanceStrategySortable) Swap(i, j int) {
139
	p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
140
}
141

142
func (p balanceStrategySortable) Less(i, j int) bool {
143
	return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
144
}
145

146
func balanceStrategyHashValue(vv ...string) uint32 {
147
	h := uint32(2166136261)
148
	for _, s := range vv {
149
		for _, c := range s {
150
			h ^= uint32(c)
151
			h *= 16777619
152
		}
153
	}
154
	return h
155
}
156

157
type stickyBalanceStrategy struct {
158
	movements partitionMovements
159
}
160

161
// Name implements BalanceStrategy.
162
func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
163

164
// Plan implements BalanceStrategy.
165
func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
166
	// track partition movements during generation of the partition assignment plan
167
	s.movements = partitionMovements{
168
		Movements:                 make(map[topicPartitionAssignment]consumerPair),
169
		PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
170
	}
171

172
	// prepopulate the current assignment state from userdata on the consumer group members
173
	currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
174
	if err != nil {
175
		return nil, err
176
	}
177

178
	// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
179
	isFreshAssignment := false
180
	if len(currentAssignment) == 0 {
181
		isFreshAssignment = true
182
	}
183

184
	// create a mapping of all current topic partitions and the consumers that can be assigned to them
185
	partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
186
	for topic, partitions := range topics {
187
		for _, partition := range partitions {
188
			partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
189
		}
190
	}
191

192
	// create a mapping of all consumers to all potential topic partitions that can be assigned to them
193
	// also, populate the mapping of partitions to potential consumers
194
	consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
195
	for memberID, meta := range members {
196
		consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
197
		for _, topicSubscription := range meta.Topics {
198
			// only evaluate topic subscriptions that are present in the supplied topics map
199
			if _, found := topics[topicSubscription]; found {
200
				for _, partition := range topics[topicSubscription] {
201
					topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
202
					consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
203
					partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
204
				}
205
			}
206
		}
207

208
		// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
209
		if _, exists := currentAssignment[memberID]; !exists {
210
			currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
211
		}
212
	}
213

214
	// create a mapping of each partition to its current consumer, where possible
215
	currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
216
	unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
217
	for partition := range partition2AllPotentialConsumers {
218
		unvisitedPartitions[partition] = true
219
	}
220
	var unassignedPartitions []topicPartitionAssignment
221
	for memberID, partitions := range currentAssignment {
222
		var keepPartitions []topicPartitionAssignment
223
		for _, partition := range partitions {
224
			// If this partition no longer exists at all, likely due to the
225
			// topic being deleted, we remove the partition from the member.
226
			if _, exists := partition2AllPotentialConsumers[partition]; !exists {
227
				continue
228
			}
229
			delete(unvisitedPartitions, partition)
230
			currentPartitionConsumers[partition] = memberID
231

232
			if !strsContains(members[memberID].Topics, partition.Topic) {
233
				unassignedPartitions = append(unassignedPartitions, partition)
234
				continue
235
			}
236
			keepPartitions = append(keepPartitions, partition)
237
		}
238
		currentAssignment[memberID] = keepPartitions
239
	}
240
	for unvisited := range unvisitedPartitions {
241
		unassignedPartitions = append(unassignedPartitions, unvisited)
242
	}
243

244
	// sort the topic partitions in order of priority for reassignment
245
	sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
246

247
	// at this point we have preserved all valid topic partition to consumer assignments and removed
248
	// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
249
	// to consumers so that the topic partition assignments are as balanced as possible.
250

251
	// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
252
	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
253
	s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
254

255
	// Assemble plan
256
	plan := make(BalanceStrategyPlan, len(currentAssignment))
257
	for memberID, assignments := range currentAssignment {
258
		if len(assignments) == 0 {
259
			plan[memberID] = make(map[string][]int32)
260
		} else {
261
			for _, assignment := range assignments {
262
				plan.Add(memberID, assignment.Topic, assignment.Partition)
263
			}
264
		}
265
	}
266
	return plan, nil
267
}
268

269
// AssignmentData serializes the set of topics currently assigned to the
270
// specified member as part of the supplied balance plan
271
func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
272
	return encode(&StickyAssignorUserDataV1{
273
		Topics:     topics,
274
		Generation: generationID,
275
	}, nil)
276
}
277

278
func strsContains(s []string, value string) bool {
279
	for _, entry := range s {
280
		if entry == value {
281
			return true
282
		}
283
	}
284
	return false
285
}
286

287
// Balance assignments across consumers for maximum fairness and stickiness.
288
func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
289
	initializing := false
290
	if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
291
		initializing = true
292
	}
293

294
	// assign all unassigned partitions
295
	for _, partition := range unassignedPartitions {
296
		// skip if there is no potential consumer for the partition
297
		if len(partition2AllPotentialConsumers[partition]) == 0 {
298
			continue
299
		}
300
		sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
301
	}
302

303
	// narrow down the reassignment scope to only those partitions that can actually be reassigned
304
	for partition := range partition2AllPotentialConsumers {
305
		if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
306
			sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
307
		}
308
	}
309

310
	// narrow down the reassignment scope to only those consumers that are subject to reassignment
311
	fixedAssignments := make(map[string][]topicPartitionAssignment)
312
	for memberID := range consumer2AllPotentialPartitions {
313
		if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
314
			fixedAssignments[memberID] = currentAssignment[memberID]
315
			delete(currentAssignment, memberID)
316
			sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
317
		}
318
	}
319

320
	// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
321
	preBalanceAssignment := deepCopyAssignment(currentAssignment)
322
	preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
323
	for k, v := range currentPartitionConsumer {
324
		preBalancePartitionConsumers[k] = v
325
	}
326

327
	reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
328

329
	// if we are not preserving existing assignments and we have made changes to the current assignment
330
	// make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
331
	if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
332
		currentAssignment = deepCopyAssignment(preBalanceAssignment)
333
		currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
334
		for k, v := range preBalancePartitionConsumers {
335
			currentPartitionConsumer[k] = v
336
		}
337
	}
338

339
	// add the fixed assignments (those that could not change) back
340
	for consumer, assignments := range fixedAssignments {
341
		currentAssignment[consumer] = assignments
342
	}
343
}
344

345
// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
346
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
347
// M0: [t0p0, t0p2, t1p1]
348
// M1: [t0p1, t1p0, t1p2]
349
var BalanceStrategyRoundRobin = new(roundRobinBalancer)
350

351
type roundRobinBalancer struct{}
352

353
func (b *roundRobinBalancer) Name() string {
354
	return RoundRobinBalanceStrategyName
355
}
356

357
func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
358
	if len(memberAndMetadata) == 0 || len(topics) == 0 {
359
		return nil, errors.New("members and topics are not provided")
360
	}
361
	// sort partitions
362
	var topicPartitions []topicAndPartition
363
	for topic, partitions := range topics {
364
		for _, partition := range partitions {
365
			topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
366
		}
367
	}
368
	sort.SliceStable(topicPartitions, func(i, j int) bool {
369
		pi := topicPartitions[i]
370
		pj := topicPartitions[j]
371
		return pi.comparedValue() < pj.comparedValue()
372
	})
373

374
	// sort members
375
	var members []memberAndTopic
376
	for memberID, meta := range memberAndMetadata {
377
		m := memberAndTopic{
378
			memberID: memberID,
379
			topics:   make(map[string]struct{}),
380
		}
381
		for _, t := range meta.Topics {
382
			m.topics[t] = struct{}{}
383
		}
384
		members = append(members, m)
385
	}
386
	sort.SliceStable(members, func(i, j int) bool {
387
		mi := members[i]
388
		mj := members[j]
389
		return mi.memberID < mj.memberID
390
	})
391

392
	// assign partitions
393
	plan := make(BalanceStrategyPlan, len(members))
394
	i := 0
395
	n := len(members)
396
	for _, tp := range topicPartitions {
397
		m := members[i%n]
398
		for !m.hasTopic(tp.topic) {
399
			i++
400
			m = members[i%n]
401
		}
402
		plan.Add(m.memberID, tp.topic, tp.partition)
403
		i++
404
	}
405
	return plan, nil
406
}
407

408
func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
409
	return nil, nil // do nothing for now
410
}
411

412
type topicAndPartition struct {
413
	topic     string
414
	partition int32
415
}
416

417
func (tp *topicAndPartition) comparedValue() string {
418
	return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
419
}
420

421
type memberAndTopic struct {
422
	memberID string
423
	topics   map[string]struct{}
424
}
425

426
func (m *memberAndTopic) hasTopic(topic string) bool {
427
	_, isExist := m.topics[topic]
428
	return isExist
429
}
430

431
// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
432
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
433
// Lower balance score indicates a more balanced assignment.
434
func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
435
	consumer2AssignmentSize := make(map[string]int, len(assignment))
436
	for memberID, partitions := range assignment {
437
		consumer2AssignmentSize[memberID] = len(partitions)
438
	}
439

440
	var score float64
441
	for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
442
		delete(consumer2AssignmentSize, memberID)
443
		for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
444
			score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
445
		}
446
	}
447
	return int(score)
448
}
449

450
// Determine whether the current assignment plan is balanced.
451
func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
452
	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
453
	min := len(currentAssignment[sortedCurrentSubscriptions[0]])
454
	max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
455
	if min >= max-1 {
456
		// if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
457
		return true
458
	}
459

460
	// create a mapping from partitions to the consumer assigned to them
461
	allPartitions := make(map[topicPartitionAssignment]string)
462
	for memberID, partitions := range currentAssignment {
463
		for _, partition := range partitions {
464
			if _, exists := allPartitions[partition]; exists {
465
				Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
466
			}
467
			allPartitions[partition] = memberID
468
		}
469
	}
470

471
	// for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
472
	// could but did not get cannot be moved to it (because that would break the balance)
473
	for _, memberID := range sortedCurrentSubscriptions {
474
		consumerPartitions := currentAssignment[memberID]
475
		consumerPartitionCount := len(consumerPartitions)
476

477
		// skip if this consumer already has all the topic partitions it can get
478
		if consumerPartitionCount == len(allSubscriptions[memberID]) {
479
			continue
480
		}
481

482
		// otherwise make sure it cannot get any more
483
		potentialTopicPartitions := allSubscriptions[memberID]
484
		for _, partition := range potentialTopicPartitions {
485
			if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
486
				otherConsumer := allPartitions[partition]
487
				otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
488
				if consumerPartitionCount < otherConsumerPartitionCount {
489
					return false
490
				}
491
			}
492
		}
493
	}
494
	return true
495
}
496

497
// Reassign all topic partitions that need reassignment until balanced.
498
func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
499
	reassignmentPerformed := false
500
	modified := false
501

502
	// repeat reassignment until no partition can be moved to improve the balance
503
	for {
504
		modified = false
505
		// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
506
		// until the full list is processed or a balance is achieved
507
		for _, partition := range reassignablePartitions {
508
			if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
509
				break
510
			}
511

512
			// the partition must have at least two consumers
513
			if len(partition2AllPotentialConsumers[partition]) <= 1 {
514
				Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
515
			}
516

517
			// the partition must have a consumer
518
			consumer := currentPartitionConsumer[partition]
519
			if consumer == "" {
520
				Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
521
			}
522

523
			if _, exists := prevAssignment[partition]; exists {
524
				if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
525
					sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
526
					reassignmentPerformed = true
527
					modified = true
528
					continue
529
				}
530
			}
531

532
			// check if a better-suited consumer exists for the partition; if so, reassign it
533
			for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
534
				if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
535
					sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
536
					reassignmentPerformed = true
537
					modified = true
538
					break
539
				}
540
			}
541
		}
542
		if !modified {
543
			return reassignmentPerformed
544
		}
545
	}
546
}
547

548
// Identify a new consumer for a topic partition and reassign it.
549
func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
550
	for _, anotherConsumer := range sortedCurrentSubscriptions {
551
		if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
552
			return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
553
		}
554
	}
555
	return sortedCurrentSubscriptions
556
}
557

558
// Reassign a specific partition to a new consumer
559
func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
560
	consumer := currentPartitionConsumer[partition]
561
	// find the correct partition movement considering the stickiness requirement
562
	partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
563
	return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
564
}
565

566
// Track the movement of a topic partition after assignment
567
func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
568
	oldConsumer := currentPartitionConsumer[partition]
569
	s.movements.movePartition(partition, oldConsumer, newConsumer)
570

571
	currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
572
	currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
573
	currentPartitionConsumer[partition] = newConsumer
574
	return sortMemberIDsByPartitionAssignments(currentAssignment)
575
}
576

577
// Determine whether a specific consumer should be considered for topic partition assignment.
578
func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
579
	currentPartitions := currentAssignment[memberID]
580
	currentAssignmentSize := len(currentPartitions)
581
	maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
582
	if currentAssignmentSize > maxAssignmentSize {
583
		Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
584
	}
585
	if currentAssignmentSize < maxAssignmentSize {
586
		// if a consumer is not assigned all its potential partitions it is subject to reassignment
587
		return true
588
	}
589
	for _, partition := range currentPartitions {
590
		if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
591
			return true
592
		}
593
	}
594
	return false
595
}
596

597
// Only consider reassigning those topic partitions that have two or more potential consumers.
598
func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
599
	return len(partition2AllPotentialConsumers[partition]) >= 2
600
}
601

602
// The assignment should improve the overall balance of the partition assignments to consumers.
603
func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
604
	for _, memberID := range sortedCurrentSubscriptions {
605
		if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
606
			currentAssignment[memberID] = append(currentAssignment[memberID], partition)
607
			currentPartitionConsumer[partition] = memberID
608
			break
609
		}
610
	}
611
	return sortMemberIDsByPartitionAssignments(currentAssignment)
612
}
613

614
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
615
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
616
	userDataV1 := &StickyAssignorUserDataV1{}
617
	if err := decode(userDataBytes, userDataV1); err != nil {
618
		userDataV0 := &StickyAssignorUserDataV0{}
619
		if err := decode(userDataBytes, userDataV0); err != nil {
620
			return nil, err
621
		}
622
		return userDataV0, nil
623
	}
624
	return userDataV1, nil
625
}
626

627
// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
628
// to those topic partitions currently reported by the Kafka cluster.
629
func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
630
	assignments := deepCopyAssignment(currentAssignment)
631
	for memberID, partitions := range assignments {
632
		// perform in-place filtering
633
		i := 0
634
		for _, partition := range partitions {
635
			if _, exists := partition2AllPotentialConsumers[partition]; exists {
636
				partitions[i] = partition
637
				i++
638
			}
639
		}
640
		assignments[memberID] = partitions[:i]
641
	}
642
	return assignments
643
}
644

645
func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
646
	for i, assignment := range assignments {
647
		if assignment == topic {
648
			return append(assignments[:i], assignments[i+1:]...)
649
		}
650
	}
651
	return assignments
652
}
653

654
func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
655
	for _, assignment := range assignments {
656
		if assignment == topic {
657
			return true
658
		}
659
	}
660
	return false
661
}
662

663
func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
664
	unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
665
	for partition := range partition2AllPotentialConsumers {
666
		unassignedPartitions[partition] = true
667
	}
668

669
	sortedPartitions := make([]topicPartitionAssignment, 0)
670
	if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
671
		// if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
672
		// then we just need to simply list partitions in a round robin fashion (from consumers with
673
		// most assigned partitions to those with least)
674
		assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
675

676
		// use priority-queue to evaluate consumer group members in descending-order based on
677
		// the number of topic partition assignments (i.e. consumers with most assignments first)
678
		pq := make(assignmentPriorityQueue, len(assignments))
679
		i := 0
680
		for consumerID, consumerAssignments := range assignments {
681
			pq[i] = &consumerGroupMember{
682
				id:          consumerID,
683
				assignments: consumerAssignments,
684
			}
685
			i++
686
		}
687
		heap.Init(&pq)
688

689
		for {
690
			// loop until no consumer-group members remain
691
			if pq.Len() == 0 {
692
				break
693
			}
694
			member := pq[0]
695

696
			// partitions that were assigned to a different consumer last time
697
			var prevPartitionIndex int
698
			for i, partition := range member.assignments {
699
				if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
700
					prevPartitionIndex = i
701
					break
702
				}
703
			}
704

705
			if len(member.assignments) > 0 {
706
				partition := member.assignments[prevPartitionIndex]
707
				sortedPartitions = append(sortedPartitions, partition)
708
				delete(unassignedPartitions, partition)
709
				if prevPartitionIndex == 0 {
710
					member.assignments = member.assignments[1:]
711
				} else {
712
					member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
713
				}
714
				heap.Fix(&pq, 0)
715
			} else {
716
				heap.Pop(&pq)
717
			}
718
		}
719

720
		for partition := range unassignedPartitions {
721
			sortedPartitions = append(sortedPartitions, partition)
722
		}
723
	} else {
724
		// an ascending sorted set of topic partitions based on how many consumers can potentially use them
725
		sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
726
	}
727
	return sortedPartitions
728
}
729

730
func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
731
	// sort the members by the number of partition assignments in ascending order
732
	sortedMemberIDs := make([]string, 0, len(assignments))
733
	for memberID := range assignments {
734
		sortedMemberIDs = append(sortedMemberIDs, memberID)
735
	}
736
	sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
737
		ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
738
		if ret == 0 {
739
			return sortedMemberIDs[i] < sortedMemberIDs[j]
740
		}
741
		return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
742
	})
743
	return sortedMemberIDs
744
}
745

746
func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
747
	// sort the members by the number of partition assignments in descending order
748
	sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
749
	i := 0
750
	for partition := range partition2AllPotentialConsumers {
751
		sortedPartionIDs[i] = partition
752
		i++
753
	}
754
	sort.Slice(sortedPartionIDs, func(i, j int) bool {
755
		if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
756
			ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
757
			if ret == 0 {
758
				return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
759
			}
760
			return ret < 0
761
		}
762
		return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
763
	})
764
	return sortedPartionIDs
765
}
766

767
func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
768
	m := make(map[string][]topicPartitionAssignment, len(assignment))
769
	for memberID, subscriptions := range assignment {
770
		m[memberID] = append(subscriptions[:0:0], subscriptions...)
771
	}
772
	return m
773
}
774

775
func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
776
	curMembers := make(map[string]int)
777
	for _, cur := range partition2AllPotentialConsumers {
778
		if len(curMembers) == 0 {
779
			for _, curMembersElem := range cur {
780
				curMembers[curMembersElem]++
781
			}
782
			continue
783
		}
784

785
		if len(curMembers) != len(cur) {
786
			return false
787
		}
788

789
		yMap := make(map[string]int)
790
		for _, yElem := range cur {
791
			yMap[yElem]++
792
		}
793

794
		for curMembersMapKey, curMembersMapVal := range curMembers {
795
			if yMap[curMembersMapKey] != curMembersMapVal {
796
				return false
797
			}
798
		}
799
	}
800

801
	curPartitions := make(map[topicPartitionAssignment]int)
802
	for _, cur := range consumer2AllPotentialPartitions {
803
		if len(curPartitions) == 0 {
804
			for _, curPartitionElem := range cur {
805
				curPartitions[curPartitionElem]++
806
			}
807
			continue
808
		}
809

810
		if len(curPartitions) != len(cur) {
811
			return false
812
		}
813

814
		yMap := make(map[topicPartitionAssignment]int)
815
		for _, yElem := range cur {
816
			yMap[yElem]++
817
		}
818

819
		for curMembersMapKey, curMembersMapVal := range curPartitions {
820
			if yMap[curMembersMapKey] != curMembersMapVal {
821
				return false
822
			}
823
		}
824
	}
825
	return true
826
}
827

828
// We need to process subscriptions' user data with each consumer's reported generation in mind
829
// higher generations overwrite lower generations in case of a conflict
830
// note that a conflict could exist only if user data is for different generations
831
func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
832
	currentAssignment := make(map[string][]topicPartitionAssignment)
833
	prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
834

835
	// for each partition we create a sorted map of its consumers by generation
836
	sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
837
	for memberID, meta := range members {
838
		consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
839
		if err != nil {
840
			return nil, nil, err
841
		}
842
		for _, partition := range consumerUserData.partitions() {
843
			if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
844
				if consumerUserData.hasGeneration() {
845
					if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
846
						// same partition is assigned to two consumers during the same rebalance.
847
						// log a warning and skip this record
848
						Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
849
						continue
850
					} else {
851
						consumers[consumerUserData.generation()] = memberID
852
					}
853
				} else {
854
					consumers[defaultGeneration] = memberID
855
				}
856
			} else {
857
				generation := defaultGeneration
858
				if consumerUserData.hasGeneration() {
859
					generation = consumerUserData.generation()
860
				}
861
				sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
862
			}
863
		}
864
	}
865

866
	// prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
867
	// current and previous consumers are the last two consumers of each partition in the above sorted map
868
	for partition, consumers := range sortedPartitionConsumersByGeneration {
869
		// sort consumers by generation in decreasing order
870
		var generations []int
871
		for generation := range consumers {
872
			generations = append(generations, generation)
873
		}
874
		sort.Sort(sort.Reverse(sort.IntSlice(generations)))
875

876
		consumer := consumers[generations[0]]
877
		if _, exists := currentAssignment[consumer]; !exists {
878
			currentAssignment[consumer] = []topicPartitionAssignment{partition}
879
		} else {
880
			currentAssignment[consumer] = append(currentAssignment[consumer], partition)
881
		}
882

883
		// check for previous assignment, if any
884
		if len(generations) > 1 {
885
			prevAssignment[partition] = consumerGenerationPair{
886
				MemberID:   consumers[generations[1]],
887
				Generation: generations[1],
888
			}
889
		}
890
	}
891
	return currentAssignment, prevAssignment, nil
892
}
893

894
type consumerGenerationPair struct {
895
	MemberID   string
896
	Generation int
897
}
898

899
// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
900
type consumerPair struct {
901
	SrcMemberID string
902
	DstMemberID string
903
}
904

905
// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
906
type partitionMovements struct {
907
	PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
908
	Movements                 map[topicPartitionAssignment]consumerPair
909
}
910

911
func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
912
	pair := p.Movements[partition]
913
	delete(p.Movements, partition)
914

915
	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
916
	delete(partitionMovementsForThisTopic[pair], partition)
917
	if len(partitionMovementsForThisTopic[pair]) == 0 {
918
		delete(partitionMovementsForThisTopic, pair)
919
	}
920
	if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
921
		delete(p.PartitionMovementsByTopic, partition.Topic)
922
	}
923
	return pair
924
}
925

926
func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
927
	p.Movements[partition] = pair
928
	if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
929
		p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
930
	}
931
	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
932
	if _, exists := partitionMovementsForThisTopic[pair]; !exists {
933
		partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
934
	}
935
	partitionMovementsForThisTopic[pair][partition] = true
936
}
937

938
func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
939
	pair := consumerPair{
940
		SrcMemberID: oldConsumer,
941
		DstMemberID: newConsumer,
942
	}
943
	if _, exists := p.Movements[partition]; exists {
944
		// this partition has previously moved
945
		existingPair := p.removeMovementRecordOfPartition(partition)
946
		if existingPair.DstMemberID != oldConsumer {
947
			Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
948
		}
949
		if existingPair.SrcMemberID != newConsumer {
950
			// the partition is not moving back to its previous consumer
951
			p.addPartitionMovementRecord(partition, consumerPair{
952
				SrcMemberID: existingPair.SrcMemberID,
953
				DstMemberID: newConsumer,
954
			})
955
		}
956
	} else {
957
		p.addPartitionMovementRecord(partition, pair)
958
	}
959
}
960

961
func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
962
	if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
963
		return partition
964
	}
965
	if _, exists := p.Movements[partition]; exists {
966
		// this partition has previously moved
967
		if oldConsumer != p.Movements[partition].DstMemberID {
968
			Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
969
		}
970
		oldConsumer = p.Movements[partition].SrcMemberID
971
	}
972

973
	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
974
	reversePair := consumerPair{
975
		SrcMemberID: newConsumer,
976
		DstMemberID: oldConsumer,
977
	}
978
	if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
979
		return partition
980
	}
981
	var reversePairPartition topicPartitionAssignment
982
	for otherPartition := range partitionMovementsForThisTopic[reversePair] {
983
		reversePairPartition = otherPartition
984
	}
985
	return reversePairPartition
986
}
987

988
func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
989
	if src == dst {
990
		return currentPath, false
991
	}
992
	if len(pairs) == 0 {
993
		return currentPath, false
994
	}
995
	for _, pair := range pairs {
996
		if src == pair.SrcMemberID && dst == pair.DstMemberID {
997
			currentPath = append(currentPath, src, dst)
998
			return currentPath, true
999
		}
1000
	}
1001

1002
	for _, pair := range pairs {
1003
		if pair.SrcMemberID == src {
1004
			// create a deep copy of the pairs, excluding the current pair
1005
			reducedSet := make([]consumerPair, len(pairs)-1)
1006
			i := 0
1007
			for _, p := range pairs {
1008
				if p != pair {
1009
					reducedSet[i] = pair
1010
					i++
1011
				}
1012
			}
1013

1014
			currentPath = append(currentPath, pair.SrcMemberID)
1015
			return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
1016
		}
1017
	}
1018
	return currentPath, false
1019
}
1020

1021
func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
1022
	superCycle := make([]string, len(cycle)-1)
1023
	for i := 0; i < len(cycle)-1; i++ {
1024
		superCycle[i] = cycle[i]
1025
	}
1026
	superCycle = append(superCycle, cycle...)
1027
	for _, foundCycle := range cycles {
1028
		if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
1029
			return true
1030
		}
1031
	}
1032
	return false
1033
}
1034

1035
func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
1036
	cycles := make([][]string, 0)
1037
	for _, pair := range pairs {
1038
		// create a deep copy of the pairs, excluding the current pair
1039
		reducedPairs := make([]consumerPair, len(pairs)-1)
1040
		i := 0
1041
		for _, p := range pairs {
1042
			if p != pair {
1043
				reducedPairs[i] = pair
1044
				i++
1045
			}
1046
		}
1047
		if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
1048
			if !p.in(path, cycles) {
1049
				cycles = append(cycles, path)
1050
				Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
1051
			}
1052
		}
1053
	}
1054

1055
	// for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
1056
	// the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
1057
	// tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
1058
	for _, cycle := range cycles {
1059
		if len(cycle) == 3 {
1060
			return true
1061
		}
1062
	}
1063
	return false
1064
}
1065

1066
func (p *partitionMovements) isSticky() bool {
1067
	for topic, movements := range p.PartitionMovementsByTopic {
1068
		movementPairs := make([]consumerPair, len(movements))
1069
		i := 0
1070
		for pair := range movements {
1071
			movementPairs[i] = pair
1072
			i++
1073
		}
1074
		if p.hasCycles(movementPairs) {
1075
			Logger.Printf("Stickiness is violated for topic %s", topic)
1076
			Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
1077
			return false
1078
		}
1079
	}
1080
	return true
1081
}
1082

1083
func indexOfSubList(source []string, target []string) int {
1084
	targetSize := len(target)
1085
	maxCandidate := len(source) - targetSize
1086
nextCand:
1087
	for candidate := 0; candidate <= maxCandidate; candidate++ {
1088
		j := candidate
1089
		for i := 0; i < targetSize; i++ {
1090
			if target[i] != source[j] {
1091
				// Element mismatch, try next cand
1092
				continue nextCand
1093
			}
1094
			j++
1095
		}
1096
		// All elements of candidate matched target
1097
		return candidate
1098
	}
1099
	return -1
1100
}
1101

1102
type consumerGroupMember struct {
1103
	id          string
1104
	assignments []topicPartitionAssignment
1105
}
1106

1107
// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1108
// in descending order (most assignments to least assignments).
1109
type assignmentPriorityQueue []*consumerGroupMember
1110

1111
func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1112

1113
func (pq assignmentPriorityQueue) Less(i, j int) bool {
1114
	// order asssignment priority queue in descending order using assignment-count/member-id
1115
	if len(pq[i].assignments) == len(pq[j].assignments) {
1116
		return strings.Compare(pq[i].id, pq[j].id) > 0
1117
	}
1118
	return len(pq[i].assignments) > len(pq[j].assignments)
1119
}
1120

1121
func (pq assignmentPriorityQueue) Swap(i, j int) {
1122
	pq[i], pq[j] = pq[j], pq[i]
1123
}
1124

1125
func (pq *assignmentPriorityQueue) Push(x interface{}) {
1126
	member := x.(*consumerGroupMember)
1127
	*pq = append(*pq, member)
1128
}
1129

1130
func (pq *assignmentPriorityQueue) Pop() interface{} {
1131
	old := *pq
1132
	n := len(old)
1133
	member := old[n-1]
1134
	*pq = old[0 : n-1]
1135
	return member
1136
}
1137

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.