7
"github.com/Shopify/sarama"
10
// Consumer implements sarama's Consumer interface for testing purposes.
11
// Before you can start consuming from this consumer, you have to register
12
// topic/partitions using ExpectConsumePartition, and set expectations on them.
17
partitionConsumers map[string]map[int32]*PartitionConsumer
18
metadata map[string][]int32
21
// NewConsumer returns a new mock Consumer instance. The t argument should
22
// be the *testing.T instance of your test method. An error will be written to it if
23
// an expectation is violated. The config argument can be set to nil.
24
func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
26
config = sarama.NewConfig()
32
partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
37
///////////////////////////////////////////////////
38
// Consumer interface implementation
39
///////////////////////////////////////////////////
41
// ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
42
// Before you can start consuming a partition, you have to set expectations on it using
43
// ExpectConsumePartition. You can only consume a partition once per consumer.
44
func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
48
if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
49
c.t.Errorf("No expectations set for %s/%d", topic, partition)
50
return nil, errOutOfExpectations
53
pc := c.partitionConsumers[topic][partition]
55
return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
58
if pc.offset != AnyOffset && pc.offset != offset {
59
c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
66
// Topics returns a list of topics, as registered with SetTopicMetadata
67
func (c *Consumer) Topics() ([]string, error) {
71
if c.metadata == nil {
72
c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetTopicMetadata.")
73
return nil, sarama.ErrOutOfBrokers
77
for topic := range c.metadata {
78
result = append(result, topic)
83
// Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata
84
func (c *Consumer) Partitions(topic string) ([]int32, error) {
88
if c.metadata == nil {
89
c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetTopicMetadata.")
90
return nil, sarama.ErrOutOfBrokers
92
if c.metadata[topic] == nil {
93
return nil, sarama.ErrUnknownTopicOrPartition
96
return c.metadata[topic], nil
99
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 {
103
hwms := make(map[string]map[int32]int64, len(c.partitionConsumers))
104
for topic, partitionConsumers := range c.partitionConsumers {
105
hwm := make(map[int32]int64, len(partitionConsumers))
106
for partition, pc := range partitionConsumers {
107
hwm[partition] = pc.HighWaterMarkOffset()
115
// Close implements the Close method from the sarama.Consumer interface. It will close
116
// all registered PartitionConsumer instances.
117
func (c *Consumer) Close() error {
121
for _, partitions := range c.partitionConsumers {
122
for _, partitionConsumer := range partitions {
123
_ = partitionConsumer.Close()
130
// Pause implements Consumer.
131
func (c *Consumer) Pause(topicPartitions map[string][]int32) {
135
for topic, partitions := range topicPartitions {
136
for _, partition := range partitions {
137
if topicConsumers, ok := c.partitionConsumers[topic]; ok {
138
if partitionConsumer, ok := topicConsumers[partition]; ok {
139
partitionConsumer.Pause()
146
// Resume implements Consumer.
147
func (c *Consumer) Resume(topicPartitions map[string][]int32) {
151
for topic, partitions := range topicPartitions {
152
for _, partition := range partitions {
153
if topicConsumers, ok := c.partitionConsumers[topic]; ok {
154
if partitionConsumer, ok := topicConsumers[partition]; ok {
155
partitionConsumer.Resume()
162
// PauseAll implements Consumer.
163
func (c *Consumer) PauseAll() {
167
for _, partitions := range c.partitionConsumers {
168
for _, partitionConsumer := range partitions {
169
partitionConsumer.Pause()
174
// ResumeAll implements Consumer.
175
func (c *Consumer) ResumeAll() {
179
for _, partitions := range c.partitionConsumers {
180
for _, partitionConsumer := range partitions {
181
partitionConsumer.Resume()
186
///////////////////////////////////////////////////
188
///////////////////////////////////////////////////
190
// SetTopicMetadata sets the clusters topic/partition metadata,
191
// which will be returned by Topics() and Partitions().
192
func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
196
c.metadata = metadata
199
// ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
200
// The registered PartitionConsumer will be returned, so you can set expectations
201
// on it using method chaining. Once a topic/partition is registered, you are
202
// expected to start consuming it using ConsumePartition. If that doesn't happen,
203
// an error will be written to the error reporter once the mock consumer is closed. It will
204
// also expect that the
205
func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
209
if c.partitionConsumers[topic] == nil {
210
c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
213
if c.partitionConsumers[topic][partition] == nil {
214
highWatermarkOffset := offset
215
if offset == sarama.OffsetOldest {
216
highWatermarkOffset = 0
219
c.partitionConsumers[topic][partition] = &PartitionConsumer{
220
highWaterMarkOffset: highWatermarkOffset,
223
partition: partition,
225
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
226
suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
227
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
231
return c.partitionConsumers[topic][partition]
234
///////////////////////////////////////////////////
235
// PartitionConsumer mock type
236
///////////////////////////////////////////////////
238
// PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
239
// It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
240
// registered first using the Consumer's ExpectConsumePartition method. Before consuming the
241
// Errors and Messages channel, you should specify what values will be provided on these
242
// channels using YieldMessage and YieldError.
243
type PartitionConsumer struct {
244
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
250
messages chan *sarama.ConsumerMessage
251
suppressedMessages chan *sarama.ConsumerMessage
252
suppressedHighWaterMarkOffset int64
253
errors chan *sarama.ConsumerError
254
singleClose sync.Once
256
errorsShouldBeDrained bool
257
messagesShouldBeDrained bool
261
///////////////////////////////////////////////////
262
// PartitionConsumer interface implementation
263
///////////////////////////////////////////////////
265
// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
266
func (pc *PartitionConsumer) AsyncClose() {
267
pc.singleClose.Do(func() {
268
close(pc.suppressedMessages)
274
// Close implements the Close method from the sarama.PartitionConsumer interface. It will
275
// verify whether the partition consumer was actually started.
276
func (pc *PartitionConsumer) Close() error {
278
pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
279
return errPartitionConsumerNotStarted
282
if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
283
pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
286
if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
287
pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
301
errs := make(sarama.ConsumerErrors, 0)
302
for err := range pc.errors {
303
errs = append(errs, err)
314
for range pc.messages {
322
for range pc.suppressedMessages {
331
// Errors implements the Errors method from the sarama.PartitionConsumer interface.
332
func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
336
// Messages implements the Messages method from the sarama.PartitionConsumer interface.
337
func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
341
func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
342
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
345
// Pause implements the Pause method from the sarama.PartitionConsumer interface.
346
func (pc *PartitionConsumer) Pause() {
350
pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset)
355
// Resume implements the Resume method from the sarama.PartitionConsumer interface.
356
func (pc *PartitionConsumer) Resume() {
360
pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset)
361
for len(pc.suppressedMessages) > 0 {
362
msg := <-pc.suppressedMessages
369
// IsPaused implements the IsPaused method from the sarama.PartitionConsumer interface.
370
func (pc *PartitionConsumer) IsPaused() bool {
377
///////////////////////////////////////////////////
379
///////////////////////////////////////////////////
381
// YieldMessage will yield a messages Messages channel of this partition consumer
382
// when it is consumed. By default, the mock consumer will not verify whether this
383
// message was consumed from the Messages channel, because there are legitimate
384
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
385
// verify that the channel is empty on close.
386
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer {
391
msg.Partition = pc.partition
394
msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1
395
pc.suppressedMessages <- msg
397
msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1
404
// YieldError will yield an error on the Errors channel of this partition consumer
405
// when it is consumed. By default, the mock consumer will not verify whether this error was
406
// consumed from the Errors channel, because there are legitimate reasons for this
407
// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
408
// the channel is empty on close.
409
func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer {
410
pc.errors <- &sarama.ConsumerError{
412
Partition: pc.partition,
419
// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
420
// that the messages channel will be fully drained when Close is called. If this
421
// expectation is not met, an error is reported to the error reporter.
422
func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer {
423
pc.messagesShouldBeDrained = true
428
// ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
429
// that the errors channel will be fully drained when Close is called. If this
430
// expectation is not met, an error is reported to the error reporter.
431
func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer {
432
pc.errorsShouldBeDrained = true