cubefs
1136 строк · 43.6 Кб
1package sarama2
3import (4"container/heap"5"errors"6"fmt"7"math"8"sort"9"strings"10)
11
12const (13// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy14RangeBalanceStrategyName = "range"15
16// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy17RoundRobinBalanceStrategyName = "roundrobin"18
19// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy20StickyBalanceStrategyName = "sticky"21
22defaultGeneration = -123)
24
25// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
26// It contains an allocation of topic/partitions by memberID in the form of
27// a `memberID -> topic -> partitions` map.
28type BalanceStrategyPlan map[string]map[string][]int3229
30// Add assigns a topic with a number partitions to a member.
31func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {32if len(partitions) == 0 {33return34}35if _, ok := p[memberID]; !ok {36p[memberID] = make(map[string][]int32, 1)37}38p[memberID][topic] = append(p[memberID][topic], partitions...)39}
40
41// --------------------------------------------------------------------
42
43// BalanceStrategy is used to balance topics and partitions
44// across members of a consumer group
45type BalanceStrategy interface {46// Name uniquely identifies the strategy.47Name() string48
49// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`50// and returns a distribution plan.51Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)52
53// AssignmentData returns the serialized assignment data for the specified54// memberID55AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)56}
57
58// --------------------------------------------------------------------
59
60// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
61// Example with one topic T with six partitions (0..5) and two members (M1, M2):
62// M1: {T: [0, 1, 2]}
63// M2: {T: [3, 4, 5]}
64var BalanceStrategyRange = &balanceStrategy{65name: RangeBalanceStrategyName,66coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {67step := float64(len(partitions)) / float64(len(memberIDs))68
69for i, memberID := range memberIDs {70pos := float64(i)71min := int(math.Floor(pos*step + 0.5))72max := int(math.Floor((pos+1)*step + 0.5))73plan.Add(memberID, topic, partitions[min:max]...)74}75},76}
77
78// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
79// while maintain a balanced partition distribution.
80// Example with topic T with six partitions (0..5) and two members (M1, M2):
81// M1: {T: [0, 2, 4]}
82// M2: {T: [1, 3, 5]}
83//
84// On reassignment with an additional consumer, you might get an assignment plan like:
85// M1: {T: [0, 2]}
86// M2: {T: [1, 3]}
87// M3: {T: [4, 5]}
88//
89var BalanceStrategySticky = &stickyBalanceStrategy{}90
91// --------------------------------------------------------------------
92
93type balanceStrategy struct {94name string95coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)96}
97
98// Name implements BalanceStrategy.
99func (s *balanceStrategy) Name() string { return s.name }100
101// Plan implements BalanceStrategy.
102func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {103// Build members by topic map104mbt := make(map[string][]string)105for memberID, meta := range members {106for _, topic := range meta.Topics {107mbt[topic] = append(mbt[topic], memberID)108}109}110
111// Sort members for each topic112for topic, memberIDs := range mbt {113sort.Sort(&balanceStrategySortable{114topic: topic,115memberIDs: memberIDs,116})117}118
119// Assemble plan120plan := make(BalanceStrategyPlan, len(members))121for topic, memberIDs := range mbt {122s.coreFn(plan, memberIDs, topic, topics[topic])123}124return plan, nil125}
126
127// AssignmentData simple strategies do not require any shared assignment data
128func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {129return nil, nil130}
131
132type balanceStrategySortable struct {133topic string134memberIDs []string135}
136
137func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }138func (p balanceStrategySortable) Swap(i, j int) {139p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]140}
141
142func (p balanceStrategySortable) Less(i, j int) bool {143return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])144}
145
146func balanceStrategyHashValue(vv ...string) uint32 {147h := uint32(2166136261)148for _, s := range vv {149for _, c := range s {150h ^= uint32(c)151h *= 16777619152}153}154return h155}
156
157type stickyBalanceStrategy struct {158movements partitionMovements
159}
160
161// Name implements BalanceStrategy.
162func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }163
164// Plan implements BalanceStrategy.
165func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {166// track partition movements during generation of the partition assignment plan167s.movements = partitionMovements{168Movements: make(map[topicPartitionAssignment]consumerPair),169PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),170}171
172// prepopulate the current assignment state from userdata on the consumer group members173currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)174if err != nil {175return nil, err176}177
178// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state179isFreshAssignment := false180if len(currentAssignment) == 0 {181isFreshAssignment = true182}183
184// create a mapping of all current topic partitions and the consumers that can be assigned to them185partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)186for topic, partitions := range topics {187for _, partition := range partitions {188partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}189}190}191
192// create a mapping of all consumers to all potential topic partitions that can be assigned to them193// also, populate the mapping of partitions to potential consumers194consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))195for memberID, meta := range members {196consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)197for _, topicSubscription := range meta.Topics {198// only evaluate topic subscriptions that are present in the supplied topics map199if _, found := topics[topicSubscription]; found {200for _, partition := range topics[topicSubscription] {201topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}202consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)203partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)204}205}206}207
208// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist209if _, exists := currentAssignment[memberID]; !exists {210currentAssignment[memberID] = make([]topicPartitionAssignment, 0)211}212}213
214// create a mapping of each partition to its current consumer, where possible215currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))216unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))217for partition := range partition2AllPotentialConsumers {218unvisitedPartitions[partition] = true219}220var unassignedPartitions []topicPartitionAssignment221for memberID, partitions := range currentAssignment {222var keepPartitions []topicPartitionAssignment223for _, partition := range partitions {224// If this partition no longer exists at all, likely due to the225// topic being deleted, we remove the partition from the member.226if _, exists := partition2AllPotentialConsumers[partition]; !exists {227continue228}229delete(unvisitedPartitions, partition)230currentPartitionConsumers[partition] = memberID231
232if !strsContains(members[memberID].Topics, partition.Topic) {233unassignedPartitions = append(unassignedPartitions, partition)234continue235}236keepPartitions = append(keepPartitions, partition)237}238currentAssignment[memberID] = keepPartitions239}240for unvisited := range unvisitedPartitions {241unassignedPartitions = append(unassignedPartitions, unvisited)242}243
244// sort the topic partitions in order of priority for reassignment245sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)246
247// at this point we have preserved all valid topic partition to consumer assignments and removed248// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions249// to consumers so that the topic partition assignments are as balanced as possible.250
251// an ascending sorted set of consumers based on how many topic partitions are already assigned to them252sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)253s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)254
255// Assemble plan256plan := make(BalanceStrategyPlan, len(currentAssignment))257for memberID, assignments := range currentAssignment {258if len(assignments) == 0 {259plan[memberID] = make(map[string][]int32)260} else {261for _, assignment := range assignments {262plan.Add(memberID, assignment.Topic, assignment.Partition)263}264}265}266return plan, nil267}
268
269// AssignmentData serializes the set of topics currently assigned to the
270// specified member as part of the supplied balance plan
271func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {272return encode(&StickyAssignorUserDataV1{273Topics: topics,274Generation: generationID,275}, nil)276}
277
278func strsContains(s []string, value string) bool {279for _, entry := range s {280if entry == value {281return true282}283}284return false285}
286
287// Balance assignments across consumers for maximum fairness and stickiness.
288func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {289initializing := false290if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {291initializing = true292}293
294// assign all unassigned partitions295for _, partition := range unassignedPartitions {296// skip if there is no potential consumer for the partition297if len(partition2AllPotentialConsumers[partition]) == 0 {298continue299}300sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)301}302
303// narrow down the reassignment scope to only those partitions that can actually be reassigned304for partition := range partition2AllPotentialConsumers {305if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {306sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)307}308}309
310// narrow down the reassignment scope to only those consumers that are subject to reassignment311fixedAssignments := make(map[string][]topicPartitionAssignment)312for memberID := range consumer2AllPotentialPartitions {313if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {314fixedAssignments[memberID] = currentAssignment[memberID]315delete(currentAssignment, memberID)316sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)317}318}319
320// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later321preBalanceAssignment := deepCopyAssignment(currentAssignment)322preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))323for k, v := range currentPartitionConsumer {324preBalancePartitionConsumers[k] = v325}326
327reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)328
329// if we are not preserving existing assignments and we have made changes to the current assignment330// make sure we are getting a more balanced assignment; otherwise, revert to previous assignment331if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {332currentAssignment = deepCopyAssignment(preBalanceAssignment)333currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))334for k, v := range preBalancePartitionConsumers {335currentPartitionConsumer[k] = v336}337}338
339// add the fixed assignments (those that could not change) back340for consumer, assignments := range fixedAssignments {341currentAssignment[consumer] = assignments342}343}
344
345// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
346// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
347// M0: [t0p0, t0p2, t1p1]
348// M1: [t0p1, t1p0, t1p2]
349var BalanceStrategyRoundRobin = new(roundRobinBalancer)350
351type roundRobinBalancer struct{}352
353func (b *roundRobinBalancer) Name() string {354return RoundRobinBalanceStrategyName355}
356
357func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {358if len(memberAndMetadata) == 0 || len(topics) == 0 {359return nil, errors.New("members and topics are not provided")360}361// sort partitions362var topicPartitions []topicAndPartition363for topic, partitions := range topics {364for _, partition := range partitions {365topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})366}367}368sort.SliceStable(topicPartitions, func(i, j int) bool {369pi := topicPartitions[i]370pj := topicPartitions[j]371return pi.comparedValue() < pj.comparedValue()372})373
374// sort members375var members []memberAndTopic376for memberID, meta := range memberAndMetadata {377m := memberAndTopic{378memberID: memberID,379topics: make(map[string]struct{}),380}381for _, t := range meta.Topics {382m.topics[t] = struct{}{}383}384members = append(members, m)385}386sort.SliceStable(members, func(i, j int) bool {387mi := members[i]388mj := members[j]389return mi.memberID < mj.memberID390})391
392// assign partitions393plan := make(BalanceStrategyPlan, len(members))394i := 0395n := len(members)396for _, tp := range topicPartitions {397m := members[i%n]398for !m.hasTopic(tp.topic) {399i++400m = members[i%n]401}402plan.Add(m.memberID, tp.topic, tp.partition)403i++404}405return plan, nil406}
407
408func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {409return nil, nil // do nothing for now410}
411
412type topicAndPartition struct {413topic string414partition int32415}
416
417func (tp *topicAndPartition) comparedValue() string {418return fmt.Sprintf("%s-%d", tp.topic, tp.partition)419}
420
421type memberAndTopic struct {422memberID string423topics map[string]struct{}424}
425
426func (m *memberAndTopic) hasTopic(topic string) bool {427_, isExist := m.topics[topic]428return isExist429}
430
431// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
432// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
433// Lower balance score indicates a more balanced assignment.
434func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {435consumer2AssignmentSize := make(map[string]int, len(assignment))436for memberID, partitions := range assignment {437consumer2AssignmentSize[memberID] = len(partitions)438}439
440var score float64441for memberID, consumerAssignmentSize := range consumer2AssignmentSize {442delete(consumer2AssignmentSize, memberID)443for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {444score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))445}446}447return int(score)448}
449
450// Determine whether the current assignment plan is balanced.
451func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {452sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)453min := len(currentAssignment[sortedCurrentSubscriptions[0]])454max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])455if min >= max-1 {456// if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true457return true458}459
460// create a mapping from partitions to the consumer assigned to them461allPartitions := make(map[topicPartitionAssignment]string)462for memberID, partitions := range currentAssignment {463for _, partition := range partitions {464if _, exists := allPartitions[partition]; exists {465Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)466}467allPartitions[partition] = memberID468}469}470
471// for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it472// could but did not get cannot be moved to it (because that would break the balance)473for _, memberID := range sortedCurrentSubscriptions {474consumerPartitions := currentAssignment[memberID]475consumerPartitionCount := len(consumerPartitions)476
477// skip if this consumer already has all the topic partitions it can get478if consumerPartitionCount == len(allSubscriptions[memberID]) {479continue480}481
482// otherwise make sure it cannot get any more483potentialTopicPartitions := allSubscriptions[memberID]484for _, partition := range potentialTopicPartitions {485if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {486otherConsumer := allPartitions[partition]487otherConsumerPartitionCount := len(currentAssignment[otherConsumer])488if consumerPartitionCount < otherConsumerPartitionCount {489return false490}491}492}493}494return true495}
496
497// Reassign all topic partitions that need reassignment until balanced.
498func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {499reassignmentPerformed := false500modified := false501
502// repeat reassignment until no partition can be moved to improve the balance503for {504modified = false505// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)506// until the full list is processed or a balance is achieved507for _, partition := range reassignablePartitions {508if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {509break510}511
512// the partition must have at least two consumers513if len(partition2AllPotentialConsumers[partition]) <= 1 {514Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)515}516
517// the partition must have a consumer518consumer := currentPartitionConsumer[partition]519if consumer == "" {520Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)521}522
523if _, exists := prevAssignment[partition]; exists {524if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {525sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)526reassignmentPerformed = true527modified = true528continue529}530}531
532// check if a better-suited consumer exists for the partition; if so, reassign it533for _, otherConsumer := range partition2AllPotentialConsumers[partition] {534if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {535sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)536reassignmentPerformed = true537modified = true538break539}540}541}542if !modified {543return reassignmentPerformed544}545}546}
547
548// Identify a new consumer for a topic partition and reassign it.
549func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {550for _, anotherConsumer := range sortedCurrentSubscriptions {551if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {552return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)553}554}555return sortedCurrentSubscriptions556}
557
558// Reassign a specific partition to a new consumer
559func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {560consumer := currentPartitionConsumer[partition]561// find the correct partition movement considering the stickiness requirement562partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)563return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)564}
565
566// Track the movement of a topic partition after assignment
567func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {568oldConsumer := currentPartitionConsumer[partition]569s.movements.movePartition(partition, oldConsumer, newConsumer)570
571currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)572currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)573currentPartitionConsumer[partition] = newConsumer574return sortMemberIDsByPartitionAssignments(currentAssignment)575}
576
577// Determine whether a specific consumer should be considered for topic partition assignment.
578func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {579currentPartitions := currentAssignment[memberID]580currentAssignmentSize := len(currentPartitions)581maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])582if currentAssignmentSize > maxAssignmentSize {583Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)584}585if currentAssignmentSize < maxAssignmentSize {586// if a consumer is not assigned all its potential partitions it is subject to reassignment587return true588}589for _, partition := range currentPartitions {590if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {591return true592}593}594return false595}
596
597// Only consider reassigning those topic partitions that have two or more potential consumers.
598func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {599return len(partition2AllPotentialConsumers[partition]) >= 2600}
601
602// The assignment should improve the overall balance of the partition assignments to consumers.
603func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {604for _, memberID := range sortedCurrentSubscriptions {605if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {606currentAssignment[memberID] = append(currentAssignment[memberID], partition)607currentPartitionConsumer[partition] = memberID608break609}610}611return sortMemberIDsByPartitionAssignments(currentAssignment)612}
613
614// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
615func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {616userDataV1 := &StickyAssignorUserDataV1{}617if err := decode(userDataBytes, userDataV1); err != nil {618userDataV0 := &StickyAssignorUserDataV0{}619if err := decode(userDataBytes, userDataV0); err != nil {620return nil, err621}622return userDataV0, nil623}624return userDataV1, nil625}
626
627// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
628// to those topic partitions currently reported by the Kafka cluster.
629func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {630assignments := deepCopyAssignment(currentAssignment)631for memberID, partitions := range assignments {632// perform in-place filtering633i := 0634for _, partition := range partitions {635if _, exists := partition2AllPotentialConsumers[partition]; exists {636partitions[i] = partition637i++638}639}640assignments[memberID] = partitions[:i]641}642return assignments643}
644
645func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {646for i, assignment := range assignments {647if assignment == topic {648return append(assignments[:i], assignments[i+1:]...)649}650}651return assignments652}
653
654func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {655for _, assignment := range assignments {656if assignment == topic {657return true658}659}660return false661}
662
663func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {664unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))665for partition := range partition2AllPotentialConsumers {666unassignedPartitions[partition] = true667}668
669sortedPartitions := make([]topicPartitionAssignment, 0)670if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {671// if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)672// then we just need to simply list partitions in a round robin fashion (from consumers with673// most assigned partitions to those with least)674assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)675
676// use priority-queue to evaluate consumer group members in descending-order based on677// the number of topic partition assignments (i.e. consumers with most assignments first)678pq := make(assignmentPriorityQueue, len(assignments))679i := 0680for consumerID, consumerAssignments := range assignments {681pq[i] = &consumerGroupMember{682id: consumerID,683assignments: consumerAssignments,684}685i++686}687heap.Init(&pq)688
689for {690// loop until no consumer-group members remain691if pq.Len() == 0 {692break693}694member := pq[0]695
696// partitions that were assigned to a different consumer last time697var prevPartitionIndex int698for i, partition := range member.assignments {699if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {700prevPartitionIndex = i701break702}703}704
705if len(member.assignments) > 0 {706partition := member.assignments[prevPartitionIndex]707sortedPartitions = append(sortedPartitions, partition)708delete(unassignedPartitions, partition)709if prevPartitionIndex == 0 {710member.assignments = member.assignments[1:]711} else {712member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)713}714heap.Fix(&pq, 0)715} else {716heap.Pop(&pq)717}718}719
720for partition := range unassignedPartitions {721sortedPartitions = append(sortedPartitions, partition)722}723} else {724// an ascending sorted set of topic partitions based on how many consumers can potentially use them725sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)726}727return sortedPartitions728}
729
730func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {731// sort the members by the number of partition assignments in ascending order732sortedMemberIDs := make([]string, 0, len(assignments))733for memberID := range assignments {734sortedMemberIDs = append(sortedMemberIDs, memberID)735}736sort.SliceStable(sortedMemberIDs, func(i, j int) bool {737ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])738if ret == 0 {739return sortedMemberIDs[i] < sortedMemberIDs[j]740}741return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])742})743return sortedMemberIDs744}
745
746func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {747// sort the members by the number of partition assignments in descending order748sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))749i := 0750for partition := range partition2AllPotentialConsumers {751sortedPartionIDs[i] = partition752i++753}754sort.Slice(sortedPartionIDs, func(i, j int) bool {755if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {756ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)757if ret == 0 {758return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition759}760return ret < 0761}762return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])763})764return sortedPartionIDs765}
766
767func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {768m := make(map[string][]topicPartitionAssignment, len(assignment))769for memberID, subscriptions := range assignment {770m[memberID] = append(subscriptions[:0:0], subscriptions...)771}772return m773}
774
775func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {776curMembers := make(map[string]int)777for _, cur := range partition2AllPotentialConsumers {778if len(curMembers) == 0 {779for _, curMembersElem := range cur {780curMembers[curMembersElem]++781}782continue783}784
785if len(curMembers) != len(cur) {786return false787}788
789yMap := make(map[string]int)790for _, yElem := range cur {791yMap[yElem]++792}793
794for curMembersMapKey, curMembersMapVal := range curMembers {795if yMap[curMembersMapKey] != curMembersMapVal {796return false797}798}799}800
801curPartitions := make(map[topicPartitionAssignment]int)802for _, cur := range consumer2AllPotentialPartitions {803if len(curPartitions) == 0 {804for _, curPartitionElem := range cur {805curPartitions[curPartitionElem]++806}807continue808}809
810if len(curPartitions) != len(cur) {811return false812}813
814yMap := make(map[topicPartitionAssignment]int)815for _, yElem := range cur {816yMap[yElem]++817}818
819for curMembersMapKey, curMembersMapVal := range curPartitions {820if yMap[curMembersMapKey] != curMembersMapVal {821return false822}823}824}825return true826}
827
828// We need to process subscriptions' user data with each consumer's reported generation in mind
829// higher generations overwrite lower generations in case of a conflict
830// note that a conflict could exist only if user data is for different generations
831func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {832currentAssignment := make(map[string][]topicPartitionAssignment)833prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)834
835// for each partition we create a sorted map of its consumers by generation836sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)837for memberID, meta := range members {838consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)839if err != nil {840return nil, nil, err841}842for _, partition := range consumerUserData.partitions() {843if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {844if consumerUserData.hasGeneration() {845if _, generationExists := consumers[consumerUserData.generation()]; generationExists {846// same partition is assigned to two consumers during the same rebalance.847// log a warning and skip this record848Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())849continue850} else {851consumers[consumerUserData.generation()] = memberID852}853} else {854consumers[defaultGeneration] = memberID855}856} else {857generation := defaultGeneration858if consumerUserData.hasGeneration() {859generation = consumerUserData.generation()860}861sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}862}863}864}865
866// prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition867// current and previous consumers are the last two consumers of each partition in the above sorted map868for partition, consumers := range sortedPartitionConsumersByGeneration {869// sort consumers by generation in decreasing order870var generations []int871for generation := range consumers {872generations = append(generations, generation)873}874sort.Sort(sort.Reverse(sort.IntSlice(generations)))875
876consumer := consumers[generations[0]]877if _, exists := currentAssignment[consumer]; !exists {878currentAssignment[consumer] = []topicPartitionAssignment{partition}879} else {880currentAssignment[consumer] = append(currentAssignment[consumer], partition)881}882
883// check for previous assignment, if any884if len(generations) > 1 {885prevAssignment[partition] = consumerGenerationPair{886MemberID: consumers[generations[1]],887Generation: generations[1],888}889}890}891return currentAssignment, prevAssignment, nil892}
893
894type consumerGenerationPair struct {895MemberID string896Generation int897}
898
899// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
900type consumerPair struct {901SrcMemberID string902DstMemberID string903}
904
905// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
906type partitionMovements struct {907PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool908Movements map[topicPartitionAssignment]consumerPair909}
910
911func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {912pair := p.Movements[partition]913delete(p.Movements, partition)914
915partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]916delete(partitionMovementsForThisTopic[pair], partition)917if len(partitionMovementsForThisTopic[pair]) == 0 {918delete(partitionMovementsForThisTopic, pair)919}920if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {921delete(p.PartitionMovementsByTopic, partition.Topic)922}923return pair924}
925
926func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {927p.Movements[partition] = pair928if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {929p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)930}931partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]932if _, exists := partitionMovementsForThisTopic[pair]; !exists {933partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)934}935partitionMovementsForThisTopic[pair][partition] = true936}
937
938func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {939pair := consumerPair{940SrcMemberID: oldConsumer,941DstMemberID: newConsumer,942}943if _, exists := p.Movements[partition]; exists {944// this partition has previously moved945existingPair := p.removeMovementRecordOfPartition(partition)946if existingPair.DstMemberID != oldConsumer {947Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)948}949if existingPair.SrcMemberID != newConsumer {950// the partition is not moving back to its previous consumer951p.addPartitionMovementRecord(partition, consumerPair{952SrcMemberID: existingPair.SrcMemberID,953DstMemberID: newConsumer,954})955}956} else {957p.addPartitionMovementRecord(partition, pair)958}959}
960
961func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {962if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {963return partition964}965if _, exists := p.Movements[partition]; exists {966// this partition has previously moved967if oldConsumer != p.Movements[partition].DstMemberID {968Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)969}970oldConsumer = p.Movements[partition].SrcMemberID971}972
973partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]974reversePair := consumerPair{975SrcMemberID: newConsumer,976DstMemberID: oldConsumer,977}978if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {979return partition980}981var reversePairPartition topicPartitionAssignment982for otherPartition := range partitionMovementsForThisTopic[reversePair] {983reversePairPartition = otherPartition984}985return reversePairPartition986}
987
988func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {989if src == dst {990return currentPath, false991}992if len(pairs) == 0 {993return currentPath, false994}995for _, pair := range pairs {996if src == pair.SrcMemberID && dst == pair.DstMemberID {997currentPath = append(currentPath, src, dst)998return currentPath, true999}1000}1001
1002for _, pair := range pairs {1003if pair.SrcMemberID == src {1004// create a deep copy of the pairs, excluding the current pair1005reducedSet := make([]consumerPair, len(pairs)-1)1006i := 01007for _, p := range pairs {1008if p != pair {1009reducedSet[i] = pair1010i++1011}1012}1013
1014currentPath = append(currentPath, pair.SrcMemberID)1015return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)1016}1017}1018return currentPath, false1019}
1020
1021func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {1022superCycle := make([]string, len(cycle)-1)1023for i := 0; i < len(cycle)-1; i++ {1024superCycle[i] = cycle[i]1025}1026superCycle = append(superCycle, cycle...)1027for _, foundCycle := range cycles {1028if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {1029return true1030}1031}1032return false1033}
1034
1035func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {1036cycles := make([][]string, 0)1037for _, pair := range pairs {1038// create a deep copy of the pairs, excluding the current pair1039reducedPairs := make([]consumerPair, len(pairs)-1)1040i := 01041for _, p := range pairs {1042if p != pair {1043reducedPairs[i] = pair1044i++1045}1046}1047if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {1048if !p.in(path, cycles) {1049cycles = append(cycles, path)1050Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)1051}1052}1053}1054
1055// for now we want to make sure there is no partition movements of the same topic between a pair of consumers.1056// the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized1057// tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.1058for _, cycle := range cycles {1059if len(cycle) == 3 {1060return true1061}1062}1063return false1064}
1065
1066func (p *partitionMovements) isSticky() bool {1067for topic, movements := range p.PartitionMovementsByTopic {1068movementPairs := make([]consumerPair, len(movements))1069i := 01070for pair := range movements {1071movementPairs[i] = pair1072i++1073}1074if p.hasCycles(movementPairs) {1075Logger.Printf("Stickiness is violated for topic %s", topic)1076Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)1077return false1078}1079}1080return true1081}
1082
1083func indexOfSubList(source []string, target []string) int {1084targetSize := len(target)1085maxCandidate := len(source) - targetSize1086nextCand:1087for candidate := 0; candidate <= maxCandidate; candidate++ {1088j := candidate1089for i := 0; i < targetSize; i++ {1090if target[i] != source[j] {1091// Element mismatch, try next cand1092continue nextCand1093}1094j++1095}1096// All elements of candidate matched target1097return candidate1098}1099return -11100}
1101
1102type consumerGroupMember struct {1103id string1104assignments []topicPartitionAssignment1105}
1106
1107// assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
1108// in descending order (most assignments to least assignments).
1109type assignmentPriorityQueue []*consumerGroupMember1110
1111func (pq assignmentPriorityQueue) Len() int { return len(pq) }1112
1113func (pq assignmentPriorityQueue) Less(i, j int) bool {1114// order asssignment priority queue in descending order using assignment-count/member-id1115if len(pq[i].assignments) == len(pq[j].assignments) {1116return strings.Compare(pq[i].id, pq[j].id) > 01117}1118return len(pq[i].assignments) > len(pq[j].assignments)1119}
1120
1121func (pq assignmentPriorityQueue) Swap(i, j int) {1122pq[i], pq[j] = pq[j], pq[i]1123}
1124
1125func (pq *assignmentPriorityQueue) Push(x interface{}) {1126member := x.(*consumerGroupMember)1127*pq = append(*pq, member)1128}
1129
1130func (pq *assignmentPriorityQueue) Pop() interface{} {1131old := *pq1132n := len(old)1133member := old[n-1]1134*pq = old[0 : n-1]1135return member1136}
1137