cubefs

Форк
0
1081 строка · 33.7 Кб
1
package sarama
2

3
import (
4
	"errors"
5
	"fmt"
6
	"math"
7
	"sync"
8
	"sync/atomic"
9
	"time"
10

11
	"github.com/rcrowley/go-metrics"
12
)
13

14
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15
type ConsumerMessage struct {
16
	Headers        []*RecordHeader // only set if kafka is version 0.11+
17
	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
18
	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
19

20
	Key, Value []byte
21
	Topic      string
22
	Partition  int32
23
	Offset     int64
24
}
25

26
// ConsumerError is what is provided to the user when an error occurs.
27
// It wraps an error and includes the topic and partition.
28
type ConsumerError struct {
29
	Topic     string
30
	Partition int32
31
	Err       error
32
}
33

34
func (ce ConsumerError) Error() string {
35
	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36
}
37

38
func (ce ConsumerError) Unwrap() error {
39
	return ce.Err
40
}
41

42
// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
43
// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
44
// when stopping.
45
type ConsumerErrors []*ConsumerError
46

47
func (ce ConsumerErrors) Error() string {
48
	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
49
}
50

51
// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
52
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
53
// scope.
54
type Consumer interface {
55
	// Topics returns the set of available topics as retrieved from the cluster
56
	// metadata. This method is the same as Client.Topics(), and is provided for
57
	// convenience.
58
	Topics() ([]string, error)
59

60
	// Partitions returns the sorted list of all partition IDs for the given topic.
61
	// This method is the same as Client.Partitions(), and is provided for convenience.
62
	Partitions(topic string) ([]int32, error)
63

64
	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
65
	// the given offset. It will return an error if this Consumer is already consuming
66
	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
67
	// or OffsetOldest
68
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
69

70
	// HighWaterMarks returns the current high water marks for each topic and partition.
71
	// Consistency between partitions is not guaranteed since high water marks are updated separately.
72
	HighWaterMarks() map[string]map[int32]int64
73

74
	// Close shuts down the consumer. It must be called after all child
75
	// PartitionConsumers have already been closed.
76
	Close() error
77

78
	// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
79
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
80
	// Note that this method does not affect partition subscription.
81
	// In particular, it does not cause a group rebalance when automatic assignment is used.
82
	Pause(topicPartitions map[string][]int32)
83

84
	// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
85
	// New calls to the broker will return records from these partitions if there are any to be fetched.
86
	Resume(topicPartitions map[string][]int32)
87

88
	// Pause suspends fetching from all partitions. Future calls to the broker will not return any
89
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
90
	// Note that this method does not affect partition subscription.
91
	// In particular, it does not cause a group rebalance when automatic assignment is used.
92
	PauseAll()
93

94
	// Resume resumes all partitions which have been paused with Pause()/PauseAll().
95
	// New calls to the broker will return records from these partitions if there are any to be fetched.
96
	ResumeAll()
97
}
98

99
// max time to wait for more partition subscriptions
100
const partitionConsumersBatchTimeout = 100 * time.Millisecond
101

102
type consumer struct {
103
	conf            *Config
104
	children        map[string]map[int32]*partitionConsumer
105
	brokerConsumers map[*Broker]*brokerConsumer
106
	client          Client
107
	lock            sync.Mutex
108
}
109

110
// NewConsumer creates a new consumer using the given broker addresses and configuration.
111
func NewConsumer(addrs []string, config *Config) (Consumer, error) {
112
	client, err := NewClient(addrs, config)
113
	if err != nil {
114
		return nil, err
115
	}
116
	return newConsumer(client)
117
}
118

119
// NewConsumerFromClient creates a new consumer using the given client. It is still
120
// necessary to call Close() on the underlying client when shutting down this consumer.
121
func NewConsumerFromClient(client Client) (Consumer, error) {
122
	// For clients passed in by the client, ensure we don't
123
	// call Close() on it.
124
	cli := &nopCloserClient{client}
125
	return newConsumer(cli)
126
}
127

128
func newConsumer(client Client) (Consumer, error) {
129
	// Check that we are not dealing with a closed Client before processing any other arguments
130
	if client.Closed() {
131
		return nil, ErrClosedClient
132
	}
133

134
	c := &consumer{
135
		client:          client,
136
		conf:            client.Config(),
137
		children:        make(map[string]map[int32]*partitionConsumer),
138
		brokerConsumers: make(map[*Broker]*brokerConsumer),
139
	}
140

141
	return c, nil
142
}
143

144
func (c *consumer) Close() error {
145
	return c.client.Close()
146
}
147

148
func (c *consumer) Topics() ([]string, error) {
149
	return c.client.Topics()
150
}
151

152
func (c *consumer) Partitions(topic string) ([]int32, error) {
153
	return c.client.Partitions(topic)
154
}
155

156
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
157
	child := &partitionConsumer{
158
		consumer:             c,
159
		conf:                 c.conf,
160
		topic:                topic,
161
		partition:            partition,
162
		messages:             make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
163
		errors:               make(chan *ConsumerError, c.conf.ChannelBufferSize),
164
		feeder:               make(chan *FetchResponse, 1),
165
		preferredReadReplica: invalidPreferredReplicaID,
166
		trigger:              make(chan none, 1),
167
		dying:                make(chan none),
168
		fetchSize:            c.conf.Consumer.Fetch.Default,
169
	}
170

171
	if err := child.chooseStartingOffset(offset); err != nil {
172
		return nil, err
173
	}
174

175
	var leader *Broker
176
	var err error
177
	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
178
		return nil, err
179
	}
180

181
	if err := c.addChild(child); err != nil {
182
		return nil, err
183
	}
184

185
	go withRecover(child.dispatcher)
186
	go withRecover(child.responseFeeder)
187

188
	child.broker = c.refBrokerConsumer(leader)
189
	child.broker.input <- child
190

191
	return child, nil
192
}
193

194
func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
195
	c.lock.Lock()
196
	defer c.lock.Unlock()
197

198
	hwms := make(map[string]map[int32]int64)
199
	for topic, p := range c.children {
200
		hwm := make(map[int32]int64, len(p))
201
		for partition, pc := range p {
202
			hwm[partition] = pc.HighWaterMarkOffset()
203
		}
204
		hwms[topic] = hwm
205
	}
206

207
	return hwms
208
}
209

210
func (c *consumer) addChild(child *partitionConsumer) error {
211
	c.lock.Lock()
212
	defer c.lock.Unlock()
213

214
	topicChildren := c.children[child.topic]
215
	if topicChildren == nil {
216
		topicChildren = make(map[int32]*partitionConsumer)
217
		c.children[child.topic] = topicChildren
218
	}
219

220
	if topicChildren[child.partition] != nil {
221
		return ConfigurationError("That topic/partition is already being consumed")
222
	}
223

224
	topicChildren[child.partition] = child
225
	return nil
226
}
227

228
func (c *consumer) removeChild(child *partitionConsumer) {
229
	c.lock.Lock()
230
	defer c.lock.Unlock()
231

232
	delete(c.children[child.topic], child.partition)
233
}
234

235
func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
236
	c.lock.Lock()
237
	defer c.lock.Unlock()
238

239
	bc := c.brokerConsumers[broker]
240
	if bc == nil {
241
		bc = c.newBrokerConsumer(broker)
242
		c.brokerConsumers[broker] = bc
243
	}
244

245
	bc.refs++
246

247
	return bc
248
}
249

250
func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
251
	c.lock.Lock()
252
	defer c.lock.Unlock()
253

254
	brokerWorker.refs--
255

256
	if brokerWorker.refs == 0 {
257
		close(brokerWorker.input)
258
		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
259
			delete(c.brokerConsumers, brokerWorker.broker)
260
		}
261
	}
262
}
263

264
func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
265
	c.lock.Lock()
266
	defer c.lock.Unlock()
267

268
	delete(c.brokerConsumers, brokerWorker.broker)
269
}
270

271
// Pause implements Consumer.
272
func (c *consumer) Pause(topicPartitions map[string][]int32) {
273
	c.lock.Lock()
274
	defer c.lock.Unlock()
275

276
	for topic, partitions := range topicPartitions {
277
		for _, partition := range partitions {
278
			if topicConsumers, ok := c.children[topic]; ok {
279
				if partitionConsumer, ok := topicConsumers[partition]; ok {
280
					partitionConsumer.Pause()
281
				}
282
			}
283
		}
284
	}
285
}
286

287
// Resume implements Consumer.
288
func (c *consumer) Resume(topicPartitions map[string][]int32) {
289
	c.lock.Lock()
290
	defer c.lock.Unlock()
291

292
	for topic, partitions := range topicPartitions {
293
		for _, partition := range partitions {
294
			if topicConsumers, ok := c.children[topic]; ok {
295
				if partitionConsumer, ok := topicConsumers[partition]; ok {
296
					partitionConsumer.Resume()
297
				}
298
			}
299
		}
300
	}
301
}
302

303
// PauseAll implements Consumer.
304
func (c *consumer) PauseAll() {
305
	c.lock.Lock()
306
	defer c.lock.Unlock()
307

308
	for _, partitions := range c.children {
309
		for _, partitionConsumer := range partitions {
310
			partitionConsumer.Pause()
311
		}
312
	}
313
}
314

315
// ResumeAll implements Consumer.
316
func (c *consumer) ResumeAll() {
317
	c.lock.Lock()
318
	defer c.lock.Unlock()
319

320
	for _, partitions := range c.children {
321
		for _, partitionConsumer := range partitions {
322
			partitionConsumer.Resume()
323
		}
324
	}
325
}
326

327
// PartitionConsumer
328

329
// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
330
// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
331
// of scope.
332
//
333
// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
334
// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
335
// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
336
// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
337
// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
338
// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
339
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
340
//
341
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
342
// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
343
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
344
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
345
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
346
type PartitionConsumer interface {
347
	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
348
	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
349
	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
350
	// this before calling Close on the underlying client.
351
	AsyncClose()
352

353
	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
354
	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
355
	// the Messages channel when this function is called, you will be competing with Close for messages; consider
356
	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
357
	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
358
	Close() error
359

360
	// Messages returns the read channel for the messages that are returned by
361
	// the broker.
362
	Messages() <-chan *ConsumerMessage
363

364
	// Errors returns a read channel of errors that occurred during consuming, if
365
	// enabled. By default, errors are logged and not returned over this channel.
366
	// If you want to implement any custom error handling, set your config's
367
	// Consumer.Return.Errors setting to true, and read from this channel.
368
	Errors() <-chan *ConsumerError
369

370
	// HighWaterMarkOffset returns the high water mark offset of the partition,
371
	// i.e. the offset that will be used for the next message that will be produced.
372
	// You can use this to determine how far behind the processing is.
373
	HighWaterMarkOffset() int64
374

375
	// Pause suspends fetching from this partition. Future calls to the broker will not return
376
	// any records from these partition until it have been resumed using Resume().
377
	// Note that this method does not affect partition subscription.
378
	// In particular, it does not cause a group rebalance when automatic assignment is used.
379
	Pause()
380

381
	// Resume resumes this partition which have been paused with Pause().
382
	// New calls to the broker will return records from these partitions if there are any to be fetched.
383
	// If the partition was not previously paused, this method is a no-op.
384
	Resume()
385

386
	// IsPaused indicates if this partition consumer is paused or not
387
	IsPaused() bool
388
}
389

390
type partitionConsumer struct {
391
	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
392

393
	consumer *consumer
394
	conf     *Config
395
	broker   *brokerConsumer
396
	messages chan *ConsumerMessage
397
	errors   chan *ConsumerError
398
	feeder   chan *FetchResponse
399

400
	preferredReadReplica int32
401

402
	trigger, dying chan none
403
	closeOnce      sync.Once
404
	topic          string
405
	partition      int32
406
	responseResult error
407
	fetchSize      int32
408
	offset         int64
409
	retries        int32
410

411
	paused int32
412
}
413

414
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
415

416
func (child *partitionConsumer) sendError(err error) {
417
	cErr := &ConsumerError{
418
		Topic:     child.topic,
419
		Partition: child.partition,
420
		Err:       err,
421
	}
422

423
	if child.conf.Consumer.Return.Errors {
424
		child.errors <- cErr
425
	} else {
426
		Logger.Println(cErr)
427
	}
428
}
429

430
func (child *partitionConsumer) computeBackoff() time.Duration {
431
	if child.conf.Consumer.Retry.BackoffFunc != nil {
432
		retries := atomic.AddInt32(&child.retries, 1)
433
		return child.conf.Consumer.Retry.BackoffFunc(int(retries))
434
	}
435
	return child.conf.Consumer.Retry.Backoff
436
}
437

438
func (child *partitionConsumer) dispatcher() {
439
	for range child.trigger {
440
		select {
441
		case <-child.dying:
442
			close(child.trigger)
443
		case <-time.After(child.computeBackoff()):
444
			if child.broker != nil {
445
				child.consumer.unrefBrokerConsumer(child.broker)
446
				child.broker = nil
447
			}
448

449
			if err := child.dispatch(); err != nil {
450
				child.sendError(err)
451
				child.trigger <- none{}
452
			}
453
		}
454
	}
455

456
	if child.broker != nil {
457
		child.consumer.unrefBrokerConsumer(child.broker)
458
	}
459
	child.consumer.removeChild(child)
460
	close(child.feeder)
461
}
462

463
func (child *partitionConsumer) preferredBroker() (*Broker, error) {
464
	if child.preferredReadReplica >= 0 {
465
		broker, err := child.consumer.client.Broker(child.preferredReadReplica)
466
		if err == nil {
467
			return broker, nil
468
		}
469
		Logger.Printf(
470
			"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
471
			child.topic, child.partition, child.preferredReadReplica)
472

473
		// if we couldn't find it, discard the replica preference and trigger a
474
		// metadata refresh whilst falling back to consuming from the leader again
475
		child.preferredReadReplica = invalidPreferredReplicaID
476
		_ = child.consumer.client.RefreshMetadata(child.topic)
477
	}
478

479
	// if preferred replica cannot be found fallback to leader
480
	return child.consumer.client.Leader(child.topic, child.partition)
481
}
482

483
func (child *partitionConsumer) dispatch() error {
484
	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
485
		return err
486
	}
487

488
	broker, err := child.preferredBroker()
489
	if err != nil {
490
		return err
491
	}
492

493
	child.broker = child.consumer.refBrokerConsumer(broker)
494

495
	child.broker.input <- child
496

497
	return nil
498
}
499

500
func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
501
	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
502
	if err != nil {
503
		return err
504
	}
505

506
	child.highWaterMarkOffset = newestOffset
507

508
	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
509
	if err != nil {
510
		return err
511
	}
512

513
	switch {
514
	case offset == OffsetNewest:
515
		child.offset = newestOffset
516
	case offset == OffsetOldest:
517
		child.offset = oldestOffset
518
	case offset >= oldestOffset && offset <= newestOffset:
519
		child.offset = offset
520
	default:
521
		return ErrOffsetOutOfRange
522
	}
523

524
	return nil
525
}
526

527
func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
528
	return child.messages
529
}
530

531
func (child *partitionConsumer) Errors() <-chan *ConsumerError {
532
	return child.errors
533
}
534

535
func (child *partitionConsumer) AsyncClose() {
536
	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
537
	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
538
	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
539
	// also just close itself)
540
	child.closeOnce.Do(func() {
541
		close(child.dying)
542
	})
543
}
544

545
func (child *partitionConsumer) Close() error {
546
	child.AsyncClose()
547

548
	var consumerErrors ConsumerErrors
549
	for err := range child.errors {
550
		consumerErrors = append(consumerErrors, err)
551
	}
552

553
	if len(consumerErrors) > 0 {
554
		return consumerErrors
555
	}
556
	return nil
557
}
558

559
func (child *partitionConsumer) HighWaterMarkOffset() int64 {
560
	return atomic.LoadInt64(&child.highWaterMarkOffset)
561
}
562

563
func (child *partitionConsumer) responseFeeder() {
564
	var msgs []*ConsumerMessage
565
	expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
566
	firstAttempt := true
567

568
feederLoop:
569
	for response := range child.feeder {
570
		msgs, child.responseResult = child.parseResponse(response)
571

572
		if child.responseResult == nil {
573
			atomic.StoreInt32(&child.retries, 0)
574
		}
575

576
		for i, msg := range msgs {
577
			child.interceptors(msg)
578
		messageSelect:
579
			select {
580
			case <-child.dying:
581
				child.broker.acks.Done()
582
				continue feederLoop
583
			case child.messages <- msg:
584
				firstAttempt = true
585
			case <-expiryTicker.C:
586
				if !firstAttempt {
587
					child.responseResult = errTimedOut
588
					child.broker.acks.Done()
589
				remainingLoop:
590
					for _, msg = range msgs[i:] {
591
						child.interceptors(msg)
592
						select {
593
						case child.messages <- msg:
594
						case <-child.dying:
595
							break remainingLoop
596
						}
597
					}
598
					child.broker.input <- child
599
					continue feederLoop
600
				} else {
601
					// current message has not been sent, return to select
602
					// statement
603
					firstAttempt = false
604
					goto messageSelect
605
				}
606
			}
607
		}
608

609
		child.broker.acks.Done()
610
	}
611

612
	expiryTicker.Stop()
613
	close(child.messages)
614
	close(child.errors)
615
}
616

617
func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
618
	var messages []*ConsumerMessage
619
	for _, msgBlock := range msgSet.Messages {
620
		for _, msg := range msgBlock.Messages() {
621
			offset := msg.Offset
622
			timestamp := msg.Msg.Timestamp
623
			if msg.Msg.Version >= 1 {
624
				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
625
				offset += baseOffset
626
				if msg.Msg.LogAppendTime {
627
					timestamp = msgBlock.Msg.Timestamp
628
				}
629
			}
630
			if offset < child.offset {
631
				continue
632
			}
633
			messages = append(messages, &ConsumerMessage{
634
				Topic:          child.topic,
635
				Partition:      child.partition,
636
				Key:            msg.Msg.Key,
637
				Value:          msg.Msg.Value,
638
				Offset:         offset,
639
				Timestamp:      timestamp,
640
				BlockTimestamp: msgBlock.Msg.Timestamp,
641
			})
642
			child.offset = offset + 1
643
		}
644
	}
645
	if len(messages) == 0 {
646
		child.offset++
647
	}
648
	return messages, nil
649
}
650

651
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
652
	messages := make([]*ConsumerMessage, 0, len(batch.Records))
653

654
	for _, rec := range batch.Records {
655
		offset := batch.FirstOffset + rec.OffsetDelta
656
		if offset < child.offset {
657
			continue
658
		}
659
		timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
660
		if batch.LogAppendTime {
661
			timestamp = batch.MaxTimestamp
662
		}
663
		messages = append(messages, &ConsumerMessage{
664
			Topic:     child.topic,
665
			Partition: child.partition,
666
			Key:       rec.Key,
667
			Value:     rec.Value,
668
			Offset:    offset,
669
			Timestamp: timestamp,
670
			Headers:   rec.Headers,
671
		})
672
		child.offset = offset + 1
673
	}
674
	if len(messages) == 0 {
675
		child.offset++
676
	}
677
	return messages, nil
678
}
679

680
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
681
	var (
682
		metricRegistry          = child.conf.MetricRegistry
683
		consumerBatchSizeMetric metrics.Histogram
684
	)
685

686
	if metricRegistry != nil {
687
		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
688
	}
689

690
	// If request was throttled and empty we log and return without error
691
	if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
692
		Logger.Printf(
693
			"consumer/broker/%d FetchResponse throttled %v\n",
694
			child.broker.broker.ID(), response.ThrottleTime)
695
		return nil, nil
696
	}
697

698
	block := response.GetBlock(child.topic, child.partition)
699
	if block == nil {
700
		return nil, ErrIncompleteResponse
701
	}
702

703
	if !errors.Is(block.Err, ErrNoError) {
704
		return nil, block.Err
705
	}
706

707
	nRecs, err := block.numRecords()
708
	if err != nil {
709
		return nil, err
710
	}
711

712
	consumerBatchSizeMetric.Update(int64(nRecs))
713

714
	if block.PreferredReadReplica != invalidPreferredReplicaID {
715
		child.preferredReadReplica = block.PreferredReadReplica
716
	}
717

718
	if nRecs == 0 {
719
		partialTrailingMessage, err := block.isPartial()
720
		if err != nil {
721
			return nil, err
722
		}
723
		// We got no messages. If we got a trailing one then we need to ask for more data.
724
		// Otherwise we just poll again and wait for one to be produced...
725
		if partialTrailingMessage {
726
			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
727
				// we can't ask for more data, we've hit the configured limit
728
				child.sendError(ErrMessageTooLarge)
729
				child.offset++ // skip this one so we can keep processing future messages
730
			} else {
731
				child.fetchSize *= 2
732
				// check int32 overflow
733
				if child.fetchSize < 0 {
734
					child.fetchSize = math.MaxInt32
735
				}
736
				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
737
					child.fetchSize = child.conf.Consumer.Fetch.Max
738
				}
739
			}
740
		} else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset {
741
			// check last record offset to avoid stuck if high watermark was not reached
742
			Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset)
743
			child.offset = *block.LastRecordsBatchOffset + 1
744
		}
745

746
		return nil, nil
747
	}
748

749
	// we got messages, reset our fetch size in case it was increased for a previous request
750
	child.fetchSize = child.conf.Consumer.Fetch.Default
751
	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
752

753
	// abortedProducerIDs contains producerID which message should be ignored as uncommitted
754
	// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
755
	// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
756
	abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
757
	abortedTransactions := block.getAbortedTransactions()
758

759
	var messages []*ConsumerMessage
760
	for _, records := range block.RecordsSet {
761
		switch records.recordsType {
762
		case legacyRecords:
763
			messageSetMessages, err := child.parseMessages(records.MsgSet)
764
			if err != nil {
765
				return nil, err
766
			}
767

768
			messages = append(messages, messageSetMessages...)
769
		case defaultRecords:
770
			// Consume remaining abortedTransaction up to last offset of current batch
771
			for _, txn := range abortedTransactions {
772
				if txn.FirstOffset > records.RecordBatch.LastOffset() {
773
					break
774
				}
775
				abortedProducerIDs[txn.ProducerID] = struct{}{}
776
				// Pop abortedTransactions so that we never add it again
777
				abortedTransactions = abortedTransactions[1:]
778
			}
779

780
			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
781
			if err != nil {
782
				return nil, err
783
			}
784

785
			// Parse and commit offset but do not expose messages that are:
786
			// - control records
787
			// - part of an aborted transaction when set to `ReadCommitted`
788

789
			// control record
790
			isControl, err := records.isControl()
791
			if err != nil {
792
				// I don't know why there is this continue in case of error to begin with
793
				// Safe bet is to ignore control messages if ReadUncommitted
794
				// and block on them in case of error and ReadCommitted
795
				if child.conf.Consumer.IsolationLevel == ReadCommitted {
796
					return nil, err
797
				}
798
				continue
799
			}
800
			if isControl {
801
				controlRecord, err := records.getControlRecord()
802
				if err != nil {
803
					return nil, err
804
				}
805

806
				if controlRecord.Type == ControlRecordAbort {
807
					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
808
				}
809
				continue
810
			}
811

812
			// filter aborted transactions
813
			if child.conf.Consumer.IsolationLevel == ReadCommitted {
814
				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
815
				if records.RecordBatch.IsTransactional && isAborted {
816
					continue
817
				}
818
			}
819

820
			messages = append(messages, recordBatchMessages...)
821
		default:
822
			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
823
		}
824
	}
825

826
	return messages, nil
827
}
828

829
func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
830
	for _, interceptor := range child.conf.Consumer.Interceptors {
831
		msg.safelyApplyInterceptor(interceptor)
832
	}
833
}
834

835
// Pause implements PartitionConsumer.
836
func (child *partitionConsumer) Pause() {
837
	atomic.StoreInt32(&child.paused, 1)
838
}
839

840
// Resume implements PartitionConsumer.
841
func (child *partitionConsumer) Resume() {
842
	atomic.StoreInt32(&child.paused, 0)
843
}
844

845
// IsPaused implements PartitionConsumer.
846
func (child *partitionConsumer) IsPaused() bool {
847
	return atomic.LoadInt32(&child.paused) == 1
848
}
849

850
type brokerConsumer struct {
851
	consumer         *consumer
852
	broker           *Broker
853
	input            chan *partitionConsumer
854
	newSubscriptions chan []*partitionConsumer
855
	subscriptions    map[*partitionConsumer]none
856
	acks             sync.WaitGroup
857
	refs             int
858
}
859

860
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
861
	bc := &brokerConsumer{
862
		consumer:         c,
863
		broker:           broker,
864
		input:            make(chan *partitionConsumer),
865
		newSubscriptions: make(chan []*partitionConsumer),
866
		subscriptions:    make(map[*partitionConsumer]none),
867
		refs:             0,
868
	}
869

870
	go withRecover(bc.subscriptionManager)
871
	go withRecover(bc.subscriptionConsumer)
872

873
	return bc
874
}
875

876
// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
877
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
878
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
879
// it nil if no new subscriptions are available.
880
func (bc *brokerConsumer) subscriptionManager() {
881
	defer close(bc.newSubscriptions)
882

883
	for {
884
		var partitionConsumers []*partitionConsumer
885

886
		// Check for any partition consumer asking to subscribe if there aren't
887
		// any, trigger the network request (to fetch Kafka messages) by sending "nil" to the
888
		// newSubscriptions channel
889
		select {
890
		case pc, ok := <-bc.input:
891
			if !ok {
892
				return
893
			}
894
			partitionConsumers = append(partitionConsumers, pc)
895
		case bc.newSubscriptions <- nil:
896
			continue
897
		}
898

899
		// drain input of any further incoming subscriptions
900
		timer := time.NewTimer(partitionConsumersBatchTimeout)
901
		for batchComplete := false; !batchComplete; {
902
			select {
903
			case pc := <-bc.input:
904
				partitionConsumers = append(partitionConsumers, pc)
905
			case <-timer.C:
906
				batchComplete = true
907
			}
908
		}
909
		timer.Stop()
910

911
		Logger.Printf(
912
			"consumer/broker/%d accumulated %d new subscriptions\n",
913
			bc.broker.ID(), len(partitionConsumers))
914

915
		bc.newSubscriptions <- partitionConsumers
916
	}
917
}
918

919
// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
920
// this is a the main loop that fetches Kafka messages
921
func (bc *brokerConsumer) subscriptionConsumer() {
922
	for newSubscriptions := range bc.newSubscriptions {
923
		bc.updateSubscriptions(newSubscriptions)
924

925
		if len(bc.subscriptions) == 0 {
926
			// We're about to be shut down or we're about to receive more subscriptions.
927
			// Take a small nap to avoid burning the CPU.
928
			time.Sleep(partitionConsumersBatchTimeout)
929
			continue
930
		}
931

932
		response, err := bc.fetchNewMessages()
933
		if err != nil {
934
			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
935
			bc.abort(err)
936
			return
937
		}
938

939
		bc.acks.Add(len(bc.subscriptions))
940
		for child := range bc.subscriptions {
941
			child.feeder <- response
942
		}
943
		bc.acks.Wait()
944
		bc.handleResponses()
945
	}
946
}
947

948
func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
949
	for _, child := range newSubscriptions {
950
		bc.subscriptions[child] = none{}
951
		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
952
	}
953

954
	for child := range bc.subscriptions {
955
		select {
956
		case <-child.dying:
957
			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
958
			close(child.trigger)
959
			delete(bc.subscriptions, child)
960
		default:
961
			// no-op
962
		}
963
	}
964
}
965

966
// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
967
func (bc *brokerConsumer) handleResponses() {
968
	for child := range bc.subscriptions {
969
		result := child.responseResult
970
		child.responseResult = nil
971

972
		if result == nil {
973
			if preferredBroker, err := child.preferredBroker(); err == nil {
974
				if bc.broker.ID() != preferredBroker.ID() {
975
					// not an error but needs redispatching to consume from preferred replica
976
					Logger.Printf(
977
						"consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
978
						bc.broker.ID(), preferredBroker.ID())
979
					child.trigger <- none{}
980
					delete(bc.subscriptions, child)
981
				}
982
			}
983
			continue
984
		}
985

986
		// Discard any replica preference.
987
		child.preferredReadReplica = invalidPreferredReplicaID
988

989
		if errors.Is(result, errTimedOut) {
990
			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
991
				bc.broker.ID(), child.topic, child.partition)
992
			delete(bc.subscriptions, child)
993
		} else if errors.Is(result, ErrOffsetOutOfRange) {
994
			// there's no point in retrying this it will just fail the same way again
995
			// shut it down and force the user to choose what to do
996
			child.sendError(result)
997
			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
998
			close(child.trigger)
999
			delete(bc.subscriptions, child)
1000
		} else if errors.Is(result, ErrUnknownTopicOrPartition) || errors.Is(result, ErrNotLeaderForPartition) || errors.Is(result, ErrLeaderNotAvailable) || errors.Is(result, ErrReplicaNotAvailable) {
1001
			// not an error, but does need redispatching
1002
			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1003
				bc.broker.ID(), child.topic, child.partition, result)
1004
			child.trigger <- none{}
1005
			delete(bc.subscriptions, child)
1006
		} else {
1007
			// dunno, tell the user and try redispatching
1008
			child.sendError(result)
1009
			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1010
				bc.broker.ID(), child.topic, child.partition, result)
1011
			child.trigger <- none{}
1012
			delete(bc.subscriptions, child)
1013
		}
1014
	}
1015
}
1016

1017
func (bc *brokerConsumer) abort(err error) {
1018
	bc.consumer.abandonBrokerConsumer(bc)
1019
	_ = bc.broker.Close() // we don't care about the error this might return, we already have one
1020

1021
	for child := range bc.subscriptions {
1022
		child.sendError(err)
1023
		child.trigger <- none{}
1024
	}
1025

1026
	for newSubscriptions := range bc.newSubscriptions {
1027
		if len(newSubscriptions) == 0 {
1028
			// Take a small nap to avoid burning the CPU.
1029
			time.Sleep(partitionConsumersBatchTimeout)
1030
			continue
1031
		}
1032
		for _, child := range newSubscriptions {
1033
			child.sendError(err)
1034
			child.trigger <- none{}
1035
		}
1036
	}
1037
}
1038

1039
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
1040
	request := &FetchRequest{
1041
		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
1042
		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
1043
	}
1044
	if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
1045
		request.Version = 1
1046
	}
1047
	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
1048
		request.Version = 2
1049
	}
1050
	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
1051
		request.Version = 3
1052
		request.MaxBytes = MaxResponseSize
1053
	}
1054
	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
1055
		request.Version = 4
1056
		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
1057
	}
1058
	if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
1059
		request.Version = 7
1060
		// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
1061
		// and the epoch to -1 tells the broker not to generate as session ID we're going
1062
		// to just ignore anyway.
1063
		request.SessionID = 0
1064
		request.SessionEpoch = -1
1065
	}
1066
	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
1067
		request.Version = 10
1068
	}
1069
	if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
1070
		request.Version = 11
1071
		request.RackID = bc.consumer.conf.RackID
1072
	}
1073

1074
	for child := range bc.subscriptions {
1075
		if !child.IsPaused() {
1076
			request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
1077
		}
1078
	}
1079

1080
	return bc.broker.Fetch(request)
1081
}
1082

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.