7
"github.com/Shopify/sarama"
10
// SyncProducer implements sarama's SyncProducer interface for testing purposes.
11
// Before you can use it, you have to set expectations on the mock SyncProducer
12
// to tell it how to handle calls to SendMessage, so you can easily test success
13
// and failure scenarios.
14
type SyncProducer struct {
17
expectations []*producerExpectation
21
newPartitioner sarama.PartitionerConstructor
22
partitioners map[string]sarama.Partitioner
25
// NewSyncProducer instantiates a new SyncProducer mock. The t argument should
26
// be the *testing.T instance of your test method. An error will be written to it if
27
// an expectation is violated. The config argument is used to handle partitioning.
28
func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
30
config = sarama.NewConfig()
34
expectations: make([]*producerExpectation, 0),
35
TopicConfig: NewTopicConfig(),
36
newPartitioner: config.Producer.Partitioner,
37
partitioners: make(map[string]sarama.Partitioner, 1),
41
////////////////////////////////////////////////
42
// Implement SyncProducer interface
43
////////////////////////////////////////////////
45
// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
46
// You have to set expectations on the mock producer before calling SendMessage, so it knows
47
// how to handle them. You can set a function in each expectation so that the message value
48
// checked by this function and an error is returned if the match fails.
49
// If there is no more remaining expectation when SendMessage is called,
50
// the mock producer will write an error to the test state object.
51
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
55
if len(sp.expectations) > 0 {
56
expectation := sp.expectations[0]
57
sp.expectations = sp.expectations[1:]
59
partition, err := sp.partitioner(topic).Partition(msg, sp.partitions(topic))
61
sp.t.Errorf("Partitioner returned an error: %s", err.Error())
64
msg.Partition = partition
65
if expectation.CheckFunction != nil {
66
errCheck := expectation.CheckFunction(msg)
68
sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
69
return -1, -1, errCheck
72
if errors.Is(expectation.Result, errProduceSuccess) {
74
msg.Offset = sp.lastOffset
75
return 0, msg.Offset, nil
77
return -1, -1, expectation.Result
79
sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
80
return -1, -1, errOutOfExpectations
83
// SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
84
// You have to set expectations on the mock producer before calling SendMessages, so it knows
85
// how to handle them. If there is no more remaining expectations when SendMessages is called,
86
// the mock producer will write an error to the test state object.
87
func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
91
if len(sp.expectations) >= len(msgs) {
92
expectations := sp.expectations[0:len(msgs)]
93
sp.expectations = sp.expectations[len(msgs):]
95
for i, expectation := range expectations {
96
topic := msgs[i].Topic
97
partition, err := sp.partitioner(topic).Partition(msgs[i], sp.partitions(topic))
99
sp.t.Errorf("Partitioner returned an error: %s", err.Error())
102
msgs[i].Partition = partition
103
if expectation.CheckFunction != nil {
104
errCheck := expectation.CheckFunction(msgs[i])
106
sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
110
if !errors.Is(expectation.Result, errProduceSuccess) {
111
return expectation.Result
114
msgs[i].Offset = sp.lastOffset
118
sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
119
return errOutOfExpectations
122
func (sp *SyncProducer) partitioner(topic string) sarama.Partitioner {
123
partitioner := sp.partitioners[topic]
124
if partitioner == nil {
125
partitioner = sp.newPartitioner(topic)
126
sp.partitioners[topic] = partitioner
131
// Close corresponds with the Close method of sarama's SyncProducer implementation.
132
// By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
133
// so it will write an error to the test state if there's any remaining expectations.
134
func (sp *SyncProducer) Close() error {
138
if len(sp.expectations) > 0 {
139
sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
145
////////////////////////////////////////////////
146
// Setting expectations
147
////////////////////////////////////////////////
149
// ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer
150
// that SendMessage will be called. The mock producer will first call the given function to check
151
// the message. It will cascade the error of the function, if any, or handle the message as if it
152
// produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
153
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer {
156
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
161
// ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that
162
// SendMessage will be called. The mock producer will first call the given function to check the
163
// message. It will cascade the error of the function, if any, or handle the message as if it
164
// failed to produce successfully, i.e. by returning the provided error.
165
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer {
168
sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
173
// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
174
// will be called. The mock producer will first call the given function to check the message value.
175
// It will cascade the error of the function, if any, or handle the message as if it produced
176
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
177
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer {
178
sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf))
183
// ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
184
// called. The mock producer will first call the given function to check the message value.
185
// It will cascade the error of the function, if any, or handle the message as if it failed
186
// to produce successfully, i.e. by returning the provided error.
187
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer {
188
sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err)
193
// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
194
// called. The mock producer will handle the message as if it produced successfully, i.e. by
195
// returning a valid partition, and offset, and a nil error.
196
func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer {
197
sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(nil)
202
// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
203
// called. The mock producer will handle the message as if it failed to produce
204
// successfully, i.e. by returning the provided error.
205
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer {
206
sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(nil, err)