cubefs
1/*
2Package mocks provides mocks that can be used for testing applications
3that use Sarama. The mock types provided by this package implement the
4interfaces Sarama exports, so you can use them for dependency injection
5in your tests.
6
7All mock instances require you to set expectations on them before you
8can use them. It will determine how the mock will behave. If an
9expectation is not met, it will make your test fail.
10
11NOTE: this package currently does not fall under the API stability
12guarantee of Sarama as it is still considered experimental.
13*/
14package mocks15
16import (17"errors"18"fmt"19
20"github.com/Shopify/sarama"21)
22
23// ErrorReporter is a simple interface that includes the testing.T methods we use to report
24// expectation violations when using the mock objects.
25type ErrorReporter interface {26Errorf(string, ...interface{})27}
28
29// ValueChecker is a function type to be set in each expectation of the producer mocks
30// to check the value passed.
31type ValueChecker func(val []byte) error32
33// MessageChecker is a function type to be set in each expectation of the producer mocks
34// to check the message passed.
35type MessageChecker func(*sarama.ProducerMessage) error36
37// messageValueChecker wraps a ValueChecker into a MessageChecker.
38// Failure to encode the message value will return an error and not call
39// the wrapped ValueChecker.
40func messageValueChecker(f ValueChecker) MessageChecker {41if f == nil {42return nil43}44return func(msg *sarama.ProducerMessage) error {45val, err := msg.Value.Encode()46if err != nil {47return fmt.Errorf("Input message encoding failed: %w", err)48}49return f(val)50}51}
52
53var (54errProduceSuccess error = nil55errOutOfExpectations = errors.New("No more expectations set on mock")56errPartitionConsumerNotStarted = errors.New("The partition consumer was never started")57)
58
59const AnyOffset int64 = -100060
61type producerExpectation struct {62Result error63CheckFunction MessageChecker
64}
65
66// TopicConfig describes a mock topic structure for the mock producers’ partitioning needs.
67type TopicConfig struct {68overridePartitions map[string]int3269defaultPartitions int3270}
71
72// NewTopicConfig makes a configuration which defaults to 32 partitions for every topic.
73func NewTopicConfig() *TopicConfig {74return &TopicConfig{75overridePartitions: make(map[string]int32, 0),76defaultPartitions: 32,77}78}
79
80// SetDefaultPartitions sets the number of partitions any topic not explicitly configured otherwise
81// (by SetPartitions) will have from the perspective of created partitioners.
82func (pc *TopicConfig) SetDefaultPartitions(n int32) {83pc.defaultPartitions = n84}
85
86// SetPartitions sets the number of partitions the partitioners will see for specific topics. This
87// only applies to messages produced after setting them.
88func (pc *TopicConfig) SetPartitions(partitions map[string]int32) {89for p, n := range partitions {90pc.overridePartitions[p] = n91}92}
93
94func (pc *TopicConfig) partitions(topic string) int32 {95if n, found := pc.overridePartitions[topic]; found {96return n97}98return pc.defaultPartitions99}
100
101// NewTestConfig returns a config meant to be used by tests.
102// Due to inconsistencies with the request versions the clients send using the default Kafka version
103// and the response versions our mocks use, we default to the minimum Kafka version in most tests
104func NewTestConfig() *sarama.Config {105config := sarama.NewConfig()106config.Version = sarama.MinVersion107return config108}
109