cubefs
1package mocks2
3import (4"errors"5"sync"6
7"github.com/Shopify/sarama"8)
9
10// AsyncProducer implements sarama's Producer interface for testing purposes.
11// Before you can send messages to it's Input channel, you have to set expectations
12// so it knows how to handle the input; it returns an error if the number of messages
13// received is bigger then the number of expectations set. You can also set a
14// function in each expectation so that the message is checked by this function and
15// an error is returned if the match fails.
16type AsyncProducer struct {17l sync.Mutex18t ErrorReporter
19expectations []*producerExpectation20closed chan struct{}21input chan *sarama.ProducerMessage22successes chan *sarama.ProducerMessage23errors chan *sarama.ProducerError24lastOffset int6425*TopicConfig26}
27
28// NewAsyncProducer instantiates a new Producer mock. The t argument should
29// be the *testing.T instance of your test method. An error will be written to it if
30// an expectation is violated. The config argument is used to determine whether it
31// should ack successes on the Successes channel and to handle partitioning.
32func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {33if config == nil {34config = sarama.NewConfig()35}36mp := &AsyncProducer{37t: t,38closed: make(chan struct{}),39expectations: make([]*producerExpectation, 0),40input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),41successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),42errors: make(chan *sarama.ProducerError, config.ChannelBufferSize),43TopicConfig: NewTopicConfig(),44}45
46go func() {47defer func() {48close(mp.successes)49close(mp.errors)50close(mp.closed)51}()52
53partitioners := make(map[string]sarama.Partitioner, 1)54
55for msg := range mp.input {56partitioner := partitioners[msg.Topic]57if partitioner == nil {58partitioner = config.Producer.Partitioner(msg.Topic)59partitioners[msg.Topic] = partitioner60}61mp.l.Lock()62if mp.expectations == nil || len(mp.expectations) == 0 {63mp.expectations = nil64mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")65} else {66expectation := mp.expectations[0]67mp.expectations = mp.expectations[1:]68
69partition, err := partitioner.Partition(msg, mp.partitions(msg.Topic))70if err != nil {71mp.t.Errorf("Partitioner returned an error: %s", err.Error())72mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}73} else {74msg.Partition = partition75if expectation.CheckFunction != nil {76err := expectation.CheckFunction(msg)77if err != nil {78mp.t.Errorf("Check function returned an error: %s", err.Error())79mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}80}81}82if errors.Is(expectation.Result, errProduceSuccess) {83mp.lastOffset++84if config.Producer.Return.Successes {85msg.Offset = mp.lastOffset86mp.successes <- msg87}88} else {89if config.Producer.Return.Errors {90mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}91}92}93}94}95mp.l.Unlock()96}97
98mp.l.Lock()99if len(mp.expectations) > 0 {100mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))101}102mp.l.Unlock()103}()104
105return mp106}
107
108////////////////////////////////////////////////
109// Implement Producer interface
110////////////////////////////////////////////////
111
112// AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation.
113// By closing a mock producer, you also tell it that no more input will be provided, so it will
114// write an error to the test state if there's any remaining expectations.
115func (mp *AsyncProducer) AsyncClose() {116close(mp.input)117}
118
119// Close corresponds with the Close method of sarama's Producer implementation.
120// By closing a mock producer, you also tell it that no more input will be provided, so it will
121// write an error to the test state if there's any remaining expectations.
122func (mp *AsyncProducer) Close() error {123mp.AsyncClose()124<-mp.closed125return nil126}
127
128// Input corresponds with the Input method of sarama's Producer implementation.
129// You have to set expectations on the mock producer before writing messages to the Input
130// channel, so it knows how to handle them. If there is no more remaining expectations and
131// a messages is written to the Input channel, the mock producer will write an error to the test
132// state object.
133func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage {134return mp.input135}
136
137// Successes corresponds with the Successes method of sarama's Producer implementation.
138func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {139return mp.successes140}
141
142// Errors corresponds with the Errors method of sarama's Producer implementation.
143func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {144return mp.errors145}
146
147////////////////////////////////////////////////
148// Setting expectations
149////////////////////////////////////////////////
150
151// ExpectInputWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that a
152// message will be provided on the input channel. The mock producer will call the given function to
153// check the message. If an error is returned it will be made available on the Errors channel
154// otherwise the mock will handle the message as if it produced successfully, i.e. it will make it
155// available on the Successes channel if the Producer.Return.Successes setting is set to true.
156func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer {157mp.l.Lock()158defer mp.l.Unlock()159mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})160
161return mp162}
163
164// ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a
165// message will be provided on the input channel. The mock producer will first call the given
166// function to check the message. If an error is returned it will be made available on the Errors
167// channel otherwise the mock will handle the message as if it failed to produce successfully. This
168// means it will make a ProducerError available on the Errors channel.
169func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer {170mp.l.Lock()171defer mp.l.Unlock()172mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})173
174return mp175}
176
177// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
178// will be provided on the input channel. The mock producer will call the given function to check
179// the message value. If an error is returned it will be made available on the Errors channel
180// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
181// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
182func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer {183mp.ExpectInputWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf))184
185return mp186}
187
188// ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message
189// will be provided on the input channel. The mock producer will first call the given function to
190// check the message value. If an error is returned it will be made available on the Errors channel
191// otherwise the mock will handle the message as if it failed to produce successfully. This means
192// it will make a ProducerError available on the Errors channel.
193func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer {194mp.ExpectInputWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err)195
196return mp197}
198
199// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
200// on the input channel. The mock producer will handle the message as if it is produced successfully,
201// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
202// is set to true.
203func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer {204mp.ExpectInputWithMessageCheckerFunctionAndSucceed(nil)205
206return mp207}
208
209// ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
210// on the input channel. The mock producer will handle the message as if it failed to produce
211// successfully. This means it will make a ProducerError available on the Errors channel.
212func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer {213mp.ExpectInputWithMessageCheckerFunctionAndFail(nil, err)214
215return mp216}
217