cubefs

Форк
0
216 строк · 8.6 Кб
1
package mocks
2

3
import (
4
	"errors"
5
	"sync"
6

7
	"github.com/Shopify/sarama"
8
)
9

10
// AsyncProducer implements sarama's Producer interface for testing purposes.
11
// Before you can send messages to it's Input channel, you have to set expectations
12
// so it knows how to handle the input; it returns an error if the number of messages
13
// received is bigger then the number of expectations set. You can also set a
14
// function in each expectation so that the message is checked by this function and
15
// an error is returned if the match fails.
16
type AsyncProducer struct {
17
	l            sync.Mutex
18
	t            ErrorReporter
19
	expectations []*producerExpectation
20
	closed       chan struct{}
21
	input        chan *sarama.ProducerMessage
22
	successes    chan *sarama.ProducerMessage
23
	errors       chan *sarama.ProducerError
24
	lastOffset   int64
25
	*TopicConfig
26
}
27

28
// NewAsyncProducer instantiates a new Producer mock. The t argument should
29
// be the *testing.T instance of your test method. An error will be written to it if
30
// an expectation is violated. The config argument is used to determine whether it
31
// should ack successes on the Successes channel and to handle partitioning.
32
func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
33
	if config == nil {
34
		config = sarama.NewConfig()
35
	}
36
	mp := &AsyncProducer{
37
		t:            t,
38
		closed:       make(chan struct{}),
39
		expectations: make([]*producerExpectation, 0),
40
		input:        make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
41
		successes:    make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
42
		errors:       make(chan *sarama.ProducerError, config.ChannelBufferSize),
43
		TopicConfig:  NewTopicConfig(),
44
	}
45

46
	go func() {
47
		defer func() {
48
			close(mp.successes)
49
			close(mp.errors)
50
			close(mp.closed)
51
		}()
52

53
		partitioners := make(map[string]sarama.Partitioner, 1)
54

55
		for msg := range mp.input {
56
			partitioner := partitioners[msg.Topic]
57
			if partitioner == nil {
58
				partitioner = config.Producer.Partitioner(msg.Topic)
59
				partitioners[msg.Topic] = partitioner
60
			}
61
			mp.l.Lock()
62
			if mp.expectations == nil || len(mp.expectations) == 0 {
63
				mp.expectations = nil
64
				mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
65
			} else {
66
				expectation := mp.expectations[0]
67
				mp.expectations = mp.expectations[1:]
68

69
				partition, err := partitioner.Partition(msg, mp.partitions(msg.Topic))
70
				if err != nil {
71
					mp.t.Errorf("Partitioner returned an error: %s", err.Error())
72
					mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
73
				} else {
74
					msg.Partition = partition
75
					if expectation.CheckFunction != nil {
76
						err := expectation.CheckFunction(msg)
77
						if err != nil {
78
							mp.t.Errorf("Check function returned an error: %s", err.Error())
79
							mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
80
						}
81
					}
82
					if errors.Is(expectation.Result, errProduceSuccess) {
83
						mp.lastOffset++
84
						if config.Producer.Return.Successes {
85
							msg.Offset = mp.lastOffset
86
							mp.successes <- msg
87
						}
88
					} else {
89
						if config.Producer.Return.Errors {
90
							mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
91
						}
92
					}
93
				}
94
			}
95
			mp.l.Unlock()
96
		}
97

98
		mp.l.Lock()
99
		if len(mp.expectations) > 0 {
100
			mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
101
		}
102
		mp.l.Unlock()
103
	}()
104

105
	return mp
106
}
107

108
////////////////////////////////////////////////
109
// Implement Producer interface
110
////////////////////////////////////////////////
111

112
// AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation.
113
// By closing a mock producer, you also tell it that no more input will be provided, so it will
114
// write an error to the test state if there's any remaining expectations.
115
func (mp *AsyncProducer) AsyncClose() {
116
	close(mp.input)
117
}
118

119
// Close corresponds with the Close method of sarama's Producer implementation.
120
// By closing a mock producer, you also tell it that no more input will be provided, so it will
121
// write an error to the test state if there's any remaining expectations.
122
func (mp *AsyncProducer) Close() error {
123
	mp.AsyncClose()
124
	<-mp.closed
125
	return nil
126
}
127

128
// Input corresponds with the Input method of sarama's Producer implementation.
129
// You have to set expectations on the mock producer before writing messages to the Input
130
// channel, so it knows how to handle them. If there is no more remaining expectations and
131
// a messages is written to the Input channel, the mock producer will write an error to the test
132
// state object.
133
func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage {
134
	return mp.input
135
}
136

137
// Successes corresponds with the Successes method of sarama's Producer implementation.
138
func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage {
139
	return mp.successes
140
}
141

142
// Errors corresponds with the Errors method of sarama's Producer implementation.
143
func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
144
	return mp.errors
145
}
146

147
////////////////////////////////////////////////
148
// Setting expectations
149
////////////////////////////////////////////////
150

151
// ExpectInputWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that a
152
// message will be provided on the input channel. The mock producer will call the given function to
153
// check the message. If an error is returned it will be made available on the Errors channel
154
// otherwise the mock will handle the message as if it produced successfully, i.e. it will make it
155
// available on the Successes channel if the Producer.Return.Successes setting is set to true.
156
func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer {
157
	mp.l.Lock()
158
	defer mp.l.Unlock()
159
	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
160

161
	return mp
162
}
163

164
// ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a
165
// message will be provided on the input channel. The mock producer will first call the given
166
// function to check the message. If an error is returned it will be made available on the Errors
167
// channel otherwise the mock will handle the message as if it failed to produce successfully. This
168
// means it will make a ProducerError available on the Errors channel.
169
func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer {
170
	mp.l.Lock()
171
	defer mp.l.Unlock()
172
	mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
173

174
	return mp
175
}
176

177
// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
178
// will be provided on the input channel. The mock producer will call the given function to check
179
// the message value. If an error is returned it will be made available on the Errors channel
180
// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
181
// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
182
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer {
183
	mp.ExpectInputWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf))
184

185
	return mp
186
}
187

188
// ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message
189
// will be provided on the input channel. The mock producer will first call the given function to
190
// check the message value. If an error is returned it will be made available on the Errors channel
191
// otherwise the mock will handle the message as if it failed to produce successfully. This means
192
// it will make a ProducerError available on the Errors channel.
193
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer {
194
	mp.ExpectInputWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err)
195

196
	return mp
197
}
198

199
// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
200
// on the input channel. The mock producer will handle the message as if it is produced successfully,
201
// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
202
// is set to true.
203
func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer {
204
	mp.ExpectInputWithMessageCheckerFunctionAndSucceed(nil)
205

206
	return mp
207
}
208

209
// ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
210
// on the input channel. The mock producer will handle the message as if it failed to produce
211
// successfully. This means it will make a ProducerError available on the Errors channel.
212
func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer {
213
	mp.ExpectInputWithMessageCheckerFunctionAndFail(nil, err)
214

215
	return mp
216
}
217

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.