cubefs

Форк
0
217 строк · 7.3 Кб
1
package sarama
2

3
import (
4
	"hash"
5
	"hash/fnv"
6
	"math/rand"
7
	"time"
8
)
9

10
// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
11
// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
12
// as simple default implementations.
13
type Partitioner interface {
14
	// Partition takes a message and partition count and chooses a partition
15
	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
16

17
	// RequiresConsistency indicates to the user of the partitioner whether the
18
	// mapping of key->partition is consistent or not. Specifically, if a
19
	// partitioner requires consistency then it must be allowed to choose from all
20
	// partitions (even ones known to be unavailable), and its choice must be
21
	// respected by the caller. The obvious example is the HashPartitioner.
22
	RequiresConsistency() bool
23
}
24

25
// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
26
// in order to allow more flexibility than is originally allowed by the
27
// RequiresConsistency method in the Partitioner interface. This allows
28
// partitioners to require consistency sometimes, but not all times. It's useful
29
// for, e.g., the HashPartitioner, which does not require consistency if the
30
// message key is nil.
31
type DynamicConsistencyPartitioner interface {
32
	Partitioner
33

34
	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
35
	// but takes in the message being partitioned so that the partitioner can
36
	// make a per-message determination.
37
	MessageRequiresConsistency(message *ProducerMessage) bool
38
}
39

40
// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
41
type PartitionerConstructor func(topic string) Partitioner
42

43
type manualPartitioner struct{}
44

45
// HashPartitionerOption lets you modify default values of the partitioner
46
type HashPartitionerOption func(*hashPartitioner)
47

48
// WithAbsFirst means that the partitioner handles absolute values
49
// in the same way as the reference Java implementation
50
func WithAbsFirst() HashPartitionerOption {
51
	return func(hp *hashPartitioner) {
52
		hp.referenceAbs = true
53
	}
54
}
55

56
// WithCustomHashFunction lets you specify what hash function to use for the partitioning
57
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
58
	return func(hp *hashPartitioner) {
59
		hp.hasher = hasher()
60
	}
61
}
62

63
// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
64
func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
65
	return func(hp *hashPartitioner) {
66
		hp.random = randomHP
67
	}
68
}
69

70
// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
71
// ProducerMessage's Partition field as the partition to produce to.
72
func NewManualPartitioner(topic string) Partitioner {
73
	return new(manualPartitioner)
74
}
75

76
func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
77
	return message.Partition, nil
78
}
79

80
func (p *manualPartitioner) RequiresConsistency() bool {
81
	return true
82
}
83

84
type randomPartitioner struct {
85
	generator *rand.Rand
86
}
87

88
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
89
func NewRandomPartitioner(topic string) Partitioner {
90
	p := new(randomPartitioner)
91
	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
92
	return p
93
}
94

95
func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
96
	return int32(p.generator.Intn(int(numPartitions))), nil
97
}
98

99
func (p *randomPartitioner) RequiresConsistency() bool {
100
	return false
101
}
102

103
type roundRobinPartitioner struct {
104
	partition int32
105
}
106

107
// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
108
func NewRoundRobinPartitioner(topic string) Partitioner {
109
	return &roundRobinPartitioner{}
110
}
111

112
func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
113
	if p.partition >= numPartitions {
114
		p.partition = 0
115
	}
116
	ret := p.partition
117
	p.partition++
118
	return ret, nil
119
}
120

121
func (p *roundRobinPartitioner) RequiresConsistency() bool {
122
	return false
123
}
124

125
type hashPartitioner struct {
126
	random       Partitioner
127
	hasher       hash.Hash32
128
	referenceAbs bool
129
}
130

131
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
132
// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
133
// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
134
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
135
	return func(topic string) Partitioner {
136
		p := new(hashPartitioner)
137
		p.random = NewRandomPartitioner(topic)
138
		p.hasher = hasher()
139
		p.referenceAbs = false
140
		return p
141
	}
142
}
143

144
// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
145
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
146
	return func(topic string) Partitioner {
147
		p := new(hashPartitioner)
148
		p.random = NewRandomPartitioner(topic)
149
		p.hasher = fnv.New32a()
150
		p.referenceAbs = false
151
		for _, option := range options {
152
			option(p)
153
		}
154
		return p
155
	}
156
}
157

158
// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
159
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
160
// modulus the number of partitions. This ensures that messages with the same key always end up on the
161
// same partition.
162
func NewHashPartitioner(topic string) Partitioner {
163
	p := new(hashPartitioner)
164
	p.random = NewRandomPartitioner(topic)
165
	p.hasher = fnv.New32a()
166
	p.referenceAbs = false
167
	return p
168
}
169

170
// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
171
// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
172
// that but it had a mistake and now there are people depending on both behaviors. This will
173
// all go away on the next major version bump.
174
func NewReferenceHashPartitioner(topic string) Partitioner {
175
	p := new(hashPartitioner)
176
	p.random = NewRandomPartitioner(topic)
177
	p.hasher = fnv.New32a()
178
	p.referenceAbs = true
179
	return p
180
}
181

182
func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
183
	if message.Key == nil {
184
		return p.random.Partition(message, numPartitions)
185
	}
186
	bytes, err := message.Key.Encode()
187
	if err != nil {
188
		return -1, err
189
	}
190
	p.hasher.Reset()
191
	_, err = p.hasher.Write(bytes)
192
	if err != nil {
193
		return -1, err
194
	}
195
	var partition int32
196
	// Turns out we were doing our absolute value in a subtly different way from the upstream
197
	// implementation, but now we need to maintain backwards compat for people who started using
198
	// the old version; if referenceAbs is set we are compatible with the reference java client
199
	// but not past Sarama versions
200
	if p.referenceAbs {
201
		partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
202
	} else {
203
		partition = int32(p.hasher.Sum32()) % numPartitions
204
		if partition < 0 {
205
			partition = -partition
206
		}
207
	}
208
	return partition, nil
209
}
210

211
func (p *hashPartitioner) RequiresConsistency() bool {
212
	return true
213
}
214

215
func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
216
	return message.Key != nil
217
}
218

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.