cubefs

Форк
0
209 строк · 8.2 Кб
1
package mocks
2

3
import (
4
	"errors"
5
	"sync"
6

7
	"github.com/Shopify/sarama"
8
)
9

10
// SyncProducer implements sarama's SyncProducer interface for testing purposes.
11
// Before you can use it, you have to set expectations on the mock SyncProducer
12
// to tell it how to handle calls to SendMessage, so you can easily test success
13
// and failure scenarios.
14
type SyncProducer struct {
15
	l            sync.Mutex
16
	t            ErrorReporter
17
	expectations []*producerExpectation
18
	lastOffset   int64
19

20
	*TopicConfig
21
	newPartitioner sarama.PartitionerConstructor
22
	partitioners   map[string]sarama.Partitioner
23
}
24

25
// NewSyncProducer instantiates a new SyncProducer mock. The t argument should
26
// be the *testing.T instance of your test method. An error will be written to it if
27
// an expectation is violated. The config argument is used to handle partitioning.
28
func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
29
	if config == nil {
30
		config = sarama.NewConfig()
31
	}
32
	return &SyncProducer{
33
		t:              t,
34
		expectations:   make([]*producerExpectation, 0),
35
		TopicConfig:    NewTopicConfig(),
36
		newPartitioner: config.Producer.Partitioner,
37
		partitioners:   make(map[string]sarama.Partitioner, 1),
38
	}
39
}
40

41
////////////////////////////////////////////////
42
// Implement SyncProducer interface
43
////////////////////////////////////////////////
44

45
// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
46
// You have to set expectations on the mock producer before calling SendMessage, so it knows
47
// how to handle them. You can set a function in each expectation so that the message value
48
// checked by this function and an error is returned if the match fails.
49
// If there is no more remaining expectation when SendMessage is called,
50
// the mock producer will write an error to the test state object.
51
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
52
	sp.l.Lock()
53
	defer sp.l.Unlock()
54

55
	if len(sp.expectations) > 0 {
56
		expectation := sp.expectations[0]
57
		sp.expectations = sp.expectations[1:]
58
		topic := msg.Topic
59
		partition, err := sp.partitioner(topic).Partition(msg, sp.partitions(topic))
60
		if err != nil {
61
			sp.t.Errorf("Partitioner returned an error: %s", err.Error())
62
			return -1, -1, err
63
		}
64
		msg.Partition = partition
65
		if expectation.CheckFunction != nil {
66
			errCheck := expectation.CheckFunction(msg)
67
			if errCheck != nil {
68
				sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
69
				return -1, -1, errCheck
70
			}
71
		}
72
		if errors.Is(expectation.Result, errProduceSuccess) {
73
			sp.lastOffset++
74
			msg.Offset = sp.lastOffset
75
			return 0, msg.Offset, nil
76
		}
77
		return -1, -1, expectation.Result
78
	}
79
	sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
80
	return -1, -1, errOutOfExpectations
81
}
82

83
// SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation.
84
// You have to set expectations on the mock producer before calling SendMessages, so it knows
85
// how to handle them. If there is no more remaining expectations when SendMessages is called,
86
// the mock producer will write an error to the test state object.
87
func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
88
	sp.l.Lock()
89
	defer sp.l.Unlock()
90

91
	if len(sp.expectations) >= len(msgs) {
92
		expectations := sp.expectations[0:len(msgs)]
93
		sp.expectations = sp.expectations[len(msgs):]
94

95
		for i, expectation := range expectations {
96
			topic := msgs[i].Topic
97
			partition, err := sp.partitioner(topic).Partition(msgs[i], sp.partitions(topic))
98
			if err != nil {
99
				sp.t.Errorf("Partitioner returned an error: %s", err.Error())
100
				return err
101
			}
102
			msgs[i].Partition = partition
103
			if expectation.CheckFunction != nil {
104
				errCheck := expectation.CheckFunction(msgs[i])
105
				if errCheck != nil {
106
					sp.t.Errorf("Check function returned an error: %s", errCheck.Error())
107
					return errCheck
108
				}
109
			}
110
			if !errors.Is(expectation.Result, errProduceSuccess) {
111
				return expectation.Result
112
			}
113
			sp.lastOffset++
114
			msgs[i].Offset = sp.lastOffset
115
		}
116
		return nil
117
	}
118
	sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.")
119
	return errOutOfExpectations
120
}
121

122
func (sp *SyncProducer) partitioner(topic string) sarama.Partitioner {
123
	partitioner := sp.partitioners[topic]
124
	if partitioner == nil {
125
		partitioner = sp.newPartitioner(topic)
126
		sp.partitioners[topic] = partitioner
127
	}
128
	return partitioner
129
}
130

131
// Close corresponds with the Close method of sarama's SyncProducer implementation.
132
// By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow,
133
// so it will write an error to the test state if there's any remaining expectations.
134
func (sp *SyncProducer) Close() error {
135
	sp.l.Lock()
136
	defer sp.l.Unlock()
137

138
	if len(sp.expectations) > 0 {
139
		sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
140
	}
141

142
	return nil
143
}
144

145
////////////////////////////////////////////////
146
// Setting expectations
147
////////////////////////////////////////////////
148

149
// ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer
150
// that SendMessage will be called. The mock producer will first call the given function to check
151
// the message. It will cascade the error of the function, if any, or handle the message as if it
152
// produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
153
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer {
154
	sp.l.Lock()
155
	defer sp.l.Unlock()
156
	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
157

158
	return sp
159
}
160

161
// ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that
162
// SendMessage will be called. The mock producer will first call the given function to check the
163
// message. It will cascade the error of the function, if any, or handle the message as if it
164
// failed to produce successfully, i.e. by returning the provided error.
165
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer {
166
	sp.l.Lock()
167
	defer sp.l.Unlock()
168
	sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
169

170
	return sp
171
}
172

173
// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
174
// will be called. The mock producer will first call the given function to check the message value.
175
// It will cascade the error of the function, if any, or handle the message as if it produced
176
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
177
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer {
178
	sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf))
179

180
	return sp
181
}
182

183
// ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be
184
// called. The mock producer will first call the given function to check the message value.
185
// It will cascade the error of the function, if any, or handle the message as if it failed
186
// to produce successfully, i.e. by returning the provided error.
187
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer {
188
	sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err)
189

190
	return sp
191
}
192

193
// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
194
// called. The mock producer will handle the message as if it produced successfully, i.e. by
195
// returning a valid partition, and offset, and a nil error.
196
func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer {
197
	sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(nil)
198

199
	return sp
200
}
201

202
// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
203
// called. The mock producer will handle the message as if it failed to produce
204
// successfully, i.e. by returning the provided error.
205
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer {
206
	sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(nil, err)
207

208
	return sp
209
}
210

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.