cubefs

Форк
0
435 строк · 13.0 Кб
1
package mocks
2

3
import (
4
	"sync"
5
	"sync/atomic"
6

7
	"github.com/Shopify/sarama"
8
)
9

10
// Consumer implements sarama's Consumer interface for testing purposes.
11
// Before you can start consuming from this consumer, you have to register
12
// topic/partitions using ExpectConsumePartition, and set expectations on them.
13
type Consumer struct {
14
	l                  sync.Mutex
15
	t                  ErrorReporter
16
	config             *sarama.Config
17
	partitionConsumers map[string]map[int32]*PartitionConsumer
18
	metadata           map[string][]int32
19
}
20

21
// NewConsumer returns a new mock Consumer instance. The t argument should
22
// be the *testing.T instance of your test method. An error will be written to it if
23
// an expectation is violated. The config argument can be set to nil.
24
func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer {
25
	if config == nil {
26
		config = sarama.NewConfig()
27
	}
28

29
	c := &Consumer{
30
		t:                  t,
31
		config:             config,
32
		partitionConsumers: make(map[string]map[int32]*PartitionConsumer),
33
	}
34
	return c
35
}
36

37
///////////////////////////////////////////////////
38
// Consumer interface implementation
39
///////////////////////////////////////////////////
40

41
// ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface.
42
// Before you can start consuming a partition, you have to set expectations on it using
43
// ExpectConsumePartition. You can only consume a partition once per consumer.
44
func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
45
	c.l.Lock()
46
	defer c.l.Unlock()
47

48
	if c.partitionConsumers[topic] == nil || c.partitionConsumers[topic][partition] == nil {
49
		c.t.Errorf("No expectations set for %s/%d", topic, partition)
50
		return nil, errOutOfExpectations
51
	}
52

53
	pc := c.partitionConsumers[topic][partition]
54
	if pc.consumed {
55
		return nil, sarama.ConfigurationError("The topic/partition is already being consumed")
56
	}
57

58
	if pc.offset != AnyOffset && pc.offset != offset {
59
		c.t.Errorf("Unexpected offset when calling ConsumePartition for %s/%d. Expected %d, got %d.", topic, partition, pc.offset, offset)
60
	}
61

62
	pc.consumed = true
63
	return pc, nil
64
}
65

66
// Topics returns a list of topics, as registered with SetTopicMetadata
67
func (c *Consumer) Topics() ([]string, error) {
68
	c.l.Lock()
69
	defer c.l.Unlock()
70

71
	if c.metadata == nil {
72
		c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetTopicMetadata.")
73
		return nil, sarama.ErrOutOfBrokers
74
	}
75

76
	var result []string
77
	for topic := range c.metadata {
78
		result = append(result, topic)
79
	}
80
	return result, nil
81
}
82

83
// Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata
84
func (c *Consumer) Partitions(topic string) ([]int32, error) {
85
	c.l.Lock()
86
	defer c.l.Unlock()
87

88
	if c.metadata == nil {
89
		c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetTopicMetadata.")
90
		return nil, sarama.ErrOutOfBrokers
91
	}
92
	if c.metadata[topic] == nil {
93
		return nil, sarama.ErrUnknownTopicOrPartition
94
	}
95

96
	return c.metadata[topic], nil
97
}
98

99
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 {
100
	c.l.Lock()
101
	defer c.l.Unlock()
102

103
	hwms := make(map[string]map[int32]int64, len(c.partitionConsumers))
104
	for topic, partitionConsumers := range c.partitionConsumers {
105
		hwm := make(map[int32]int64, len(partitionConsumers))
106
		for partition, pc := range partitionConsumers {
107
			hwm[partition] = pc.HighWaterMarkOffset()
108
		}
109
		hwms[topic] = hwm
110
	}
111

112
	return hwms
113
}
114

115
// Close implements the Close method from the sarama.Consumer interface. It will close
116
// all registered PartitionConsumer instances.
117
func (c *Consumer) Close() error {
118
	c.l.Lock()
119
	defer c.l.Unlock()
120

121
	for _, partitions := range c.partitionConsumers {
122
		for _, partitionConsumer := range partitions {
123
			_ = partitionConsumer.Close()
124
		}
125
	}
126

127
	return nil
128
}
129

130
// Pause implements Consumer.
131
func (c *Consumer) Pause(topicPartitions map[string][]int32) {
132
	c.l.Lock()
133
	defer c.l.Unlock()
134

135
	for topic, partitions := range topicPartitions {
136
		for _, partition := range partitions {
137
			if topicConsumers, ok := c.partitionConsumers[topic]; ok {
138
				if partitionConsumer, ok := topicConsumers[partition]; ok {
139
					partitionConsumer.Pause()
140
				}
141
			}
142
		}
143
	}
144
}
145

146
// Resume implements Consumer.
147
func (c *Consumer) Resume(topicPartitions map[string][]int32) {
148
	c.l.Lock()
149
	defer c.l.Unlock()
150

151
	for topic, partitions := range topicPartitions {
152
		for _, partition := range partitions {
153
			if topicConsumers, ok := c.partitionConsumers[topic]; ok {
154
				if partitionConsumer, ok := topicConsumers[partition]; ok {
155
					partitionConsumer.Resume()
156
				}
157
			}
158
		}
159
	}
160
}
161

162
// PauseAll implements Consumer.
163
func (c *Consumer) PauseAll() {
164
	c.l.Lock()
165
	defer c.l.Unlock()
166

167
	for _, partitions := range c.partitionConsumers {
168
		for _, partitionConsumer := range partitions {
169
			partitionConsumer.Pause()
170
		}
171
	}
172
}
173

174
// ResumeAll implements Consumer.
175
func (c *Consumer) ResumeAll() {
176
	c.l.Lock()
177
	defer c.l.Unlock()
178

179
	for _, partitions := range c.partitionConsumers {
180
		for _, partitionConsumer := range partitions {
181
			partitionConsumer.Resume()
182
		}
183
	}
184
}
185

186
///////////////////////////////////////////////////
187
// Expectation API
188
///////////////////////////////////////////////////
189

190
// SetTopicMetadata sets the clusters topic/partition metadata,
191
// which will be returned by Topics() and Partitions().
192
func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
193
	c.l.Lock()
194
	defer c.l.Unlock()
195

196
	c.metadata = metadata
197
}
198

199
// ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
200
// The registered PartitionConsumer will be returned, so you can set expectations
201
// on it using method chaining. Once a topic/partition is registered, you are
202
// expected to start consuming it using ConsumePartition. If that doesn't happen,
203
// an error will be written to the error reporter once the mock consumer is closed. It will
204
// also expect that the
205
func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer {
206
	c.l.Lock()
207
	defer c.l.Unlock()
208

209
	if c.partitionConsumers[topic] == nil {
210
		c.partitionConsumers[topic] = make(map[int32]*PartitionConsumer)
211
	}
212

213
	if c.partitionConsumers[topic][partition] == nil {
214
		highWatermarkOffset := offset
215
		if offset == sarama.OffsetOldest {
216
			highWatermarkOffset = 0
217
		}
218

219
		c.partitionConsumers[topic][partition] = &PartitionConsumer{
220
			highWaterMarkOffset: highWatermarkOffset,
221
			t:                   c.t,
222
			topic:               topic,
223
			partition:           partition,
224
			offset:              offset,
225
			messages:            make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
226
			suppressedMessages:  make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
227
			errors:              make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
228
		}
229
	}
230

231
	return c.partitionConsumers[topic][partition]
232
}
233

234
///////////////////////////////////////////////////
235
// PartitionConsumer mock type
236
///////////////////////////////////////////////////
237

238
// PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes.
239
// It is returned by the mock Consumers ConsumePartitionMethod, but only if it is
240
// registered first using the Consumer's ExpectConsumePartition method. Before consuming the
241
// Errors and Messages channel, you should specify what values will be provided on these
242
// channels using YieldMessage and YieldError.
243
type PartitionConsumer struct {
244
	highWaterMarkOffset           int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
245
	l                             sync.Mutex
246
	t                             ErrorReporter
247
	topic                         string
248
	partition                     int32
249
	offset                        int64
250
	messages                      chan *sarama.ConsumerMessage
251
	suppressedMessages            chan *sarama.ConsumerMessage
252
	suppressedHighWaterMarkOffset int64
253
	errors                        chan *sarama.ConsumerError
254
	singleClose                   sync.Once
255
	consumed                      bool
256
	errorsShouldBeDrained         bool
257
	messagesShouldBeDrained       bool
258
	paused                        bool
259
}
260

261
///////////////////////////////////////////////////
262
// PartitionConsumer interface implementation
263
///////////////////////////////////////////////////
264

265
// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
266
func (pc *PartitionConsumer) AsyncClose() {
267
	pc.singleClose.Do(func() {
268
		close(pc.suppressedMessages)
269
		close(pc.messages)
270
		close(pc.errors)
271
	})
272
}
273

274
// Close implements the Close method from the sarama.PartitionConsumer interface. It will
275
// verify whether the partition consumer was actually started.
276
func (pc *PartitionConsumer) Close() error {
277
	if !pc.consumed {
278
		pc.t.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition)
279
		return errPartitionConsumerNotStarted
280
	}
281

282
	if pc.errorsShouldBeDrained && len(pc.errors) > 0 {
283
		pc.t.Errorf("Expected the errors channel for %s/%d to be drained on close, but found %d errors.", pc.topic, pc.partition, len(pc.errors))
284
	}
285

286
	if pc.messagesShouldBeDrained && len(pc.messages) > 0 {
287
		pc.t.Errorf("Expected the messages channel for %s/%d to be drained on close, but found %d messages.", pc.topic, pc.partition, len(pc.messages))
288
	}
289

290
	pc.AsyncClose()
291

292
	var (
293
		closeErr error
294
		wg       sync.WaitGroup
295
	)
296

297
	wg.Add(1)
298
	go func() {
299
		defer wg.Done()
300

301
		errs := make(sarama.ConsumerErrors, 0)
302
		for err := range pc.errors {
303
			errs = append(errs, err)
304
		}
305

306
		if len(errs) > 0 {
307
			closeErr = errs
308
		}
309
	}()
310

311
	wg.Add(1)
312
	go func() {
313
		defer wg.Done()
314
		for range pc.messages {
315
			// drain
316
		}
317
	}()
318

319
	wg.Add(1)
320
	go func() {
321
		defer wg.Done()
322
		for range pc.suppressedMessages {
323
			// drain
324
		}
325
	}()
326

327
	wg.Wait()
328
	return closeErr
329
}
330

331
// Errors implements the Errors method from the sarama.PartitionConsumer interface.
332
func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError {
333
	return pc.errors
334
}
335

336
// Messages implements the Messages method from the sarama.PartitionConsumer interface.
337
func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
338
	return pc.messages
339
}
340

341
func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
342
	return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
343
}
344

345
// Pause implements the Pause method from the sarama.PartitionConsumer interface.
346
func (pc *PartitionConsumer) Pause() {
347
	pc.l.Lock()
348
	defer pc.l.Unlock()
349

350
	pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset)
351

352
	pc.paused = true
353
}
354

355
// Resume implements the Resume method from the sarama.PartitionConsumer interface.
356
func (pc *PartitionConsumer) Resume() {
357
	pc.l.Lock()
358
	defer pc.l.Unlock()
359

360
	pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset)
361
	for len(pc.suppressedMessages) > 0 {
362
		msg := <-pc.suppressedMessages
363
		pc.messages <- msg
364
	}
365

366
	pc.paused = false
367
}
368

369
// IsPaused implements the IsPaused method from the sarama.PartitionConsumer interface.
370
func (pc *PartitionConsumer) IsPaused() bool {
371
	pc.l.Lock()
372
	defer pc.l.Unlock()
373

374
	return pc.paused
375
}
376

377
///////////////////////////////////////////////////
378
// Expectation API
379
///////////////////////////////////////////////////
380

381
// YieldMessage will yield a messages Messages channel of this partition consumer
382
// when it is consumed. By default, the mock consumer will not verify whether this
383
// message was consumed from the Messages channel, because there are legitimate
384
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
385
// verify that the channel is empty on close.
386
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer {
387
	pc.l.Lock()
388
	defer pc.l.Unlock()
389

390
	msg.Topic = pc.topic
391
	msg.Partition = pc.partition
392

393
	if pc.paused {
394
		msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1
395
		pc.suppressedMessages <- msg
396
	} else {
397
		msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1
398
		pc.messages <- msg
399
	}
400

401
	return pc
402
}
403

404
// YieldError will yield an error on the Errors channel of this partition consumer
405
// when it is consumed. By default, the mock consumer will not verify whether this error was
406
// consumed from the Errors channel, because there are legitimate reasons for this
407
// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
408
// the channel is empty on close.
409
func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer {
410
	pc.errors <- &sarama.ConsumerError{
411
		Topic:     pc.topic,
412
		Partition: pc.partition,
413
		Err:       err,
414
	}
415

416
	return pc
417
}
418

419
// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
420
// that the messages channel will be fully drained when Close is called. If this
421
// expectation is not met, an error is reported to the error reporter.
422
func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer {
423
	pc.messagesShouldBeDrained = true
424

425
	return pc
426
}
427

428
// ExpectErrorsDrainedOnClose sets an expectation on the partition consumer
429
// that the errors channel will be fully drained when Close is called. If this
430
// expectation is not met, an error is reported to the error reporter.
431
func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer {
432
	pc.errorsShouldBeDrained = true
433

434
	return pc
435
}
436

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.