cubefs
217 строк · 7.3 Кб
1package sarama
2
3import (
4"hash"
5"hash/fnv"
6"math/rand"
7"time"
8)
9
10// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
11// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
12// as simple default implementations.
13type Partitioner interface {
14// Partition takes a message and partition count and chooses a partition
15Partition(message *ProducerMessage, numPartitions int32) (int32, error)
16
17// RequiresConsistency indicates to the user of the partitioner whether the
18// mapping of key->partition is consistent or not. Specifically, if a
19// partitioner requires consistency then it must be allowed to choose from all
20// partitions (even ones known to be unavailable), and its choice must be
21// respected by the caller. The obvious example is the HashPartitioner.
22RequiresConsistency() bool
23}
24
25// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
26// in order to allow more flexibility than is originally allowed by the
27// RequiresConsistency method in the Partitioner interface. This allows
28// partitioners to require consistency sometimes, but not all times. It's useful
29// for, e.g., the HashPartitioner, which does not require consistency if the
30// message key is nil.
31type DynamicConsistencyPartitioner interface {
32Partitioner
33
34// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
35// but takes in the message being partitioned so that the partitioner can
36// make a per-message determination.
37MessageRequiresConsistency(message *ProducerMessage) bool
38}
39
40// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
41type PartitionerConstructor func(topic string) Partitioner
42
43type manualPartitioner struct{}
44
45// HashPartitionerOption lets you modify default values of the partitioner
46type HashPartitionerOption func(*hashPartitioner)
47
48// WithAbsFirst means that the partitioner handles absolute values
49// in the same way as the reference Java implementation
50func WithAbsFirst() HashPartitionerOption {
51return func(hp *hashPartitioner) {
52hp.referenceAbs = true
53}
54}
55
56// WithCustomHashFunction lets you specify what hash function to use for the partitioning
57func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
58return func(hp *hashPartitioner) {
59hp.hasher = hasher()
60}
61}
62
63// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
64func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
65return func(hp *hashPartitioner) {
66hp.random = randomHP
67}
68}
69
70// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
71// ProducerMessage's Partition field as the partition to produce to.
72func NewManualPartitioner(topic string) Partitioner {
73return new(manualPartitioner)
74}
75
76func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
77return message.Partition, nil
78}
79
80func (p *manualPartitioner) RequiresConsistency() bool {
81return true
82}
83
84type randomPartitioner struct {
85generator *rand.Rand
86}
87
88// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
89func NewRandomPartitioner(topic string) Partitioner {
90p := new(randomPartitioner)
91p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
92return p
93}
94
95func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
96return int32(p.generator.Intn(int(numPartitions))), nil
97}
98
99func (p *randomPartitioner) RequiresConsistency() bool {
100return false
101}
102
103type roundRobinPartitioner struct {
104partition int32
105}
106
107// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
108func NewRoundRobinPartitioner(topic string) Partitioner {
109return &roundRobinPartitioner{}
110}
111
112func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
113if p.partition >= numPartitions {
114p.partition = 0
115}
116ret := p.partition
117p.partition++
118return ret, nil
119}
120
121func (p *roundRobinPartitioner) RequiresConsistency() bool {
122return false
123}
124
125type hashPartitioner struct {
126random Partitioner
127hasher hash.Hash32
128referenceAbs bool
129}
130
131// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
132// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
133// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
134func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
135return func(topic string) Partitioner {
136p := new(hashPartitioner)
137p.random = NewRandomPartitioner(topic)
138p.hasher = hasher()
139p.referenceAbs = false
140return p
141}
142}
143
144// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
145func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
146return func(topic string) Partitioner {
147p := new(hashPartitioner)
148p.random = NewRandomPartitioner(topic)
149p.hasher = fnv.New32a()
150p.referenceAbs = false
151for _, option := range options {
152option(p)
153}
154return p
155}
156}
157
158// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
159// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
160// modulus the number of partitions. This ensures that messages with the same key always end up on the
161// same partition.
162func NewHashPartitioner(topic string) Partitioner {
163p := new(hashPartitioner)
164p.random = NewRandomPartitioner(topic)
165p.hasher = fnv.New32a()
166p.referenceAbs = false
167return p
168}
169
170// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
171// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
172// that but it had a mistake and now there are people depending on both behaviors. This will
173// all go away on the next major version bump.
174func NewReferenceHashPartitioner(topic string) Partitioner {
175p := new(hashPartitioner)
176p.random = NewRandomPartitioner(topic)
177p.hasher = fnv.New32a()
178p.referenceAbs = true
179return p
180}
181
182func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
183if message.Key == nil {
184return p.random.Partition(message, numPartitions)
185}
186bytes, err := message.Key.Encode()
187if err != nil {
188return -1, err
189}
190p.hasher.Reset()
191_, err = p.hasher.Write(bytes)
192if err != nil {
193return -1, err
194}
195var partition int32
196// Turns out we were doing our absolute value in a subtly different way from the upstream
197// implementation, but now we need to maintain backwards compat for people who started using
198// the old version; if referenceAbs is set we are compatible with the reference java client
199// but not past Sarama versions
200if p.referenceAbs {
201partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
202} else {
203partition = int32(p.hasher.Sum32()) % numPartitions
204if partition < 0 {
205partition = -partition
206}
207}
208return partition, nil
209}
210
211func (p *hashPartitioner) RequiresConsistency() bool {
212return true
213}
214
215func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
216return message.Key != nil
217}
218