cubefs

Форк
0
/
async_producer.go 
1246 строк · 35.9 Кб
1
package sarama
2

3
import (
4
	"encoding/binary"
5
	"errors"
6
	"fmt"
7
	"math"
8
	"sync"
9
	"time"
10

11
	"github.com/eapache/go-resiliency/breaker"
12
	"github.com/eapache/queue"
13
)
14

15
// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
16
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
17
// and parses responses for errors. You must read from the Errors() channel or the
18
// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
19
// leaks and message lost: it will not be garbage-collected automatically when it passes
20
// out of scope and buffered messages may not be flushed.
21
type AsyncProducer interface {
22

23
	// AsyncClose triggers a shutdown of the producer. The shutdown has completed
24
	// when both the Errors and Successes channels have been closed. When calling
25
	// AsyncClose, you *must* continue to read from those channels in order to
26
	// drain the results of any messages in flight.
27
	AsyncClose()
28

29
	// Close shuts down the producer and waits for any buffered messages to be
30
	// flushed. You must call this function before a producer object passes out of
31
	// scope, as it may otherwise leak memory. You must call this before process
32
	// shutting down, or you may lose messages. You must call this before calling
33
	// Close on the underlying client.
34
	Close() error
35

36
	// Input is the input channel for the user to write messages to that they
37
	// wish to send.
38
	Input() chan<- *ProducerMessage
39

40
	// Successes is the success output channel back to the user when Return.Successes is
41
	// enabled. If Return.Successes is true, you MUST read from this channel or the
42
	// Producer will deadlock. It is suggested that you send and read messages
43
	// together in a single select statement.
44
	Successes() <-chan *ProducerMessage
45

46
	// Errors is the error output channel back to the user. You MUST read from this
47
	// channel or the Producer will deadlock when the channel is full. Alternatively,
48
	// you can set Producer.Return.Errors in your config to false, which prevents
49
	// errors to be returned.
50
	Errors() <-chan *ProducerError
51
}
52

53
// transactionManager keeps the state necessary to ensure idempotent production
54
type transactionManager struct {
55
	producerID      int64
56
	producerEpoch   int16
57
	sequenceNumbers map[string]int32
58
	mutex           sync.Mutex
59
}
60

61
const (
62
	noProducerID    = -1
63
	noProducerEpoch = -1
64
)
65

66
func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
67
	key := fmt.Sprintf("%s-%d", topic, partition)
68
	t.mutex.Lock()
69
	defer t.mutex.Unlock()
70
	sequence := t.sequenceNumbers[key]
71
	t.sequenceNumbers[key] = sequence + 1
72
	return sequence, t.producerEpoch
73
}
74

75
func (t *transactionManager) bumpEpoch() {
76
	t.mutex.Lock()
77
	defer t.mutex.Unlock()
78
	t.producerEpoch++
79
	for k := range t.sequenceNumbers {
80
		t.sequenceNumbers[k] = 0
81
	}
82
}
83

84
func (t *transactionManager) getProducerID() (int64, int16) {
85
	t.mutex.Lock()
86
	defer t.mutex.Unlock()
87
	return t.producerID, t.producerEpoch
88
}
89

90
func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
91
	txnmgr := &transactionManager{
92
		producerID:    noProducerID,
93
		producerEpoch: noProducerEpoch,
94
	}
95

96
	if conf.Producer.Idempotent {
97
		initProducerIDResponse, err := client.InitProducerID()
98
		if err != nil {
99
			return nil, err
100
		}
101
		txnmgr.producerID = initProducerIDResponse.ProducerID
102
		txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
103
		txnmgr.sequenceNumbers = make(map[string]int32)
104
		txnmgr.mutex = sync.Mutex{}
105

106
		Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
107
	}
108

109
	return txnmgr, nil
110
}
111

112
type asyncProducer struct {
113
	client Client
114
	conf   *Config
115

116
	errors                    chan *ProducerError
117
	input, successes, retries chan *ProducerMessage
118
	inFlight                  sync.WaitGroup
119

120
	brokers    map[*Broker]*brokerProducer
121
	brokerRefs map[*brokerProducer]int
122
	brokerLock sync.Mutex
123

124
	txnmgr *transactionManager
125
}
126

127
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
128
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
129
	client, err := NewClient(addrs, conf)
130
	if err != nil {
131
		return nil, err
132
	}
133
	return newAsyncProducer(client)
134
}
135

136
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
137
// necessary to call Close() on the underlying client when shutting down this producer.
138
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
139
	// For clients passed in by the client, ensure we don't
140
	// call Close() on it.
141
	cli := &nopCloserClient{client}
142
	return newAsyncProducer(cli)
143
}
144

145
func newAsyncProducer(client Client) (AsyncProducer, error) {
146
	// Check that we are not dealing with a closed Client before processing any other arguments
147
	if client.Closed() {
148
		return nil, ErrClosedClient
149
	}
150

151
	txnmgr, err := newTransactionManager(client.Config(), client)
152
	if err != nil {
153
		return nil, err
154
	}
155

156
	p := &asyncProducer{
157
		client:     client,
158
		conf:       client.Config(),
159
		errors:     make(chan *ProducerError),
160
		input:      make(chan *ProducerMessage),
161
		successes:  make(chan *ProducerMessage),
162
		retries:    make(chan *ProducerMessage),
163
		brokers:    make(map[*Broker]*brokerProducer),
164
		brokerRefs: make(map[*brokerProducer]int),
165
		txnmgr:     txnmgr,
166
	}
167

168
	// launch our singleton dispatchers
169
	go withRecover(p.dispatcher)
170
	go withRecover(p.retryHandler)
171

172
	return p, nil
173
}
174

175
type flagSet int8
176

177
const (
178
	syn      flagSet = 1 << iota // first message from partitionProducer to brokerProducer
179
	fin                          // final message from partitionProducer to brokerProducer and back
180
	shutdown                     // start the shutdown process
181
)
182

183
// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
184
type ProducerMessage struct {
185
	Topic string // The Kafka topic for this message.
186
	// The partitioning key for this message. Pre-existing Encoders include
187
	// StringEncoder and ByteEncoder.
188
	Key Encoder
189
	// The actual message to store in Kafka. Pre-existing Encoders include
190
	// StringEncoder and ByteEncoder.
191
	Value Encoder
192

193
	// The headers are key-value pairs that are transparently passed
194
	// by Kafka between producers and consumers.
195
	Headers []RecordHeader
196

197
	// This field is used to hold arbitrary data you wish to include so it
198
	// will be available when receiving on the Successes and Errors channels.
199
	// Sarama completely ignores this field and is only to be used for
200
	// pass-through data.
201
	Metadata interface{}
202

203
	// Below this point are filled in by the producer as the message is processed
204

205
	// Offset is the offset of the message stored on the broker. This is only
206
	// guaranteed to be defined if the message was successfully delivered and
207
	// RequiredAcks is not NoResponse.
208
	Offset int64
209
	// Partition is the partition that the message was sent to. This is only
210
	// guaranteed to be defined if the message was successfully delivered.
211
	Partition int32
212
	// Timestamp can vary in behavior depending on broker configuration, being
213
	// in either one of the CreateTime or LogAppendTime modes (default CreateTime),
214
	// and requiring version at least 0.10.0.
215
	//
216
	// When configured to CreateTime, the timestamp is specified by the producer
217
	// either by explicitly setting this field, or when the message is added
218
	// to a produce set.
219
	//
220
	// When configured to LogAppendTime, the timestamp assigned to the message
221
	// by the broker. This is only guaranteed to be defined if the message was
222
	// successfully delivered and RequiredAcks is not NoResponse.
223
	Timestamp time.Time
224

225
	retries        int
226
	flags          flagSet
227
	expectation    chan *ProducerError
228
	sequenceNumber int32
229
	producerEpoch  int16
230
	hasSequence    bool
231
}
232

233
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
234

235
func (m *ProducerMessage) byteSize(version int) int {
236
	var size int
237
	if version >= 2 {
238
		size = maximumRecordOverhead
239
		for _, h := range m.Headers {
240
			size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
241
		}
242
	} else {
243
		size = producerMessageOverhead
244
	}
245
	if m.Key != nil {
246
		size += m.Key.Length()
247
	}
248
	if m.Value != nil {
249
		size += m.Value.Length()
250
	}
251
	return size
252
}
253

254
func (m *ProducerMessage) clear() {
255
	m.flags = 0
256
	m.retries = 0
257
	m.sequenceNumber = 0
258
	m.producerEpoch = 0
259
	m.hasSequence = false
260
}
261

262
// ProducerError is the type of error generated when the producer fails to deliver a message.
263
// It contains the original ProducerMessage as well as the actual error value.
264
type ProducerError struct {
265
	Msg *ProducerMessage
266
	Err error
267
}
268

269
func (pe ProducerError) Error() string {
270
	return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
271
}
272

273
func (pe ProducerError) Unwrap() error {
274
	return pe.Err
275
}
276

277
// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
278
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
279
// when closing a producer.
280
type ProducerErrors []*ProducerError
281

282
func (pe ProducerErrors) Error() string {
283
	return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
284
}
285

286
func (p *asyncProducer) Errors() <-chan *ProducerError {
287
	return p.errors
288
}
289

290
func (p *asyncProducer) Successes() <-chan *ProducerMessage {
291
	return p.successes
292
}
293

294
func (p *asyncProducer) Input() chan<- *ProducerMessage {
295
	return p.input
296
}
297

298
func (p *asyncProducer) Close() error {
299
	p.AsyncClose()
300

301
	if p.conf.Producer.Return.Successes {
302
		go withRecover(func() {
303
			for range p.successes {
304
			}
305
		})
306
	}
307

308
	var errors ProducerErrors
309
	if p.conf.Producer.Return.Errors {
310
		for event := range p.errors {
311
			errors = append(errors, event)
312
		}
313
	} else {
314
		<-p.errors
315
	}
316

317
	if len(errors) > 0 {
318
		return errors
319
	}
320
	return nil
321
}
322

323
func (p *asyncProducer) AsyncClose() {
324
	go withRecover(p.shutdown)
325
}
326

327
// singleton
328
// dispatches messages by topic
329
func (p *asyncProducer) dispatcher() {
330
	handlers := make(map[string]chan<- *ProducerMessage)
331
	shuttingDown := false
332

333
	for msg := range p.input {
334
		if msg == nil {
335
			Logger.Println("Something tried to send a nil message, it was ignored.")
336
			continue
337
		}
338

339
		if msg.flags&shutdown != 0 {
340
			shuttingDown = true
341
			p.inFlight.Done()
342
			continue
343
		} else if msg.retries == 0 {
344
			if shuttingDown {
345
				// we can't just call returnError here because that decrements the wait group,
346
				// which hasn't been incremented yet for this message, and shouldn't be
347
				pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
348
				if p.conf.Producer.Return.Errors {
349
					p.errors <- pErr
350
				} else {
351
					Logger.Println(pErr)
352
				}
353
				continue
354
			}
355
			p.inFlight.Add(1)
356
		}
357

358
		for _, interceptor := range p.conf.Producer.Interceptors {
359
			msg.safelyApplyInterceptor(interceptor)
360
		}
361

362
		version := 1
363
		if p.conf.Version.IsAtLeast(V0_11_0_0) {
364
			version = 2
365
		} else if msg.Headers != nil {
366
			p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
367
			continue
368
		}
369
		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
370
			p.returnError(msg, ErrMessageSizeTooLarge)
371
			continue
372
		}
373

374
		handler := handlers[msg.Topic]
375
		if handler == nil {
376
			handler = p.newTopicProducer(msg.Topic)
377
			handlers[msg.Topic] = handler
378
		}
379

380
		handler <- msg
381
	}
382

383
	for _, handler := range handlers {
384
		close(handler)
385
	}
386
}
387

388
// one per topic
389
// partitions messages, then dispatches them by partition
390
type topicProducer struct {
391
	parent *asyncProducer
392
	topic  string
393
	input  <-chan *ProducerMessage
394

395
	breaker     *breaker.Breaker
396
	handlers    map[int32]chan<- *ProducerMessage
397
	partitioner Partitioner
398
}
399

400
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
401
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
402
	tp := &topicProducer{
403
		parent:      p,
404
		topic:       topic,
405
		input:       input,
406
		breaker:     breaker.New(3, 1, 10*time.Second),
407
		handlers:    make(map[int32]chan<- *ProducerMessage),
408
		partitioner: p.conf.Producer.Partitioner(topic),
409
	}
410
	go withRecover(tp.dispatch)
411
	return input
412
}
413

414
func (tp *topicProducer) dispatch() {
415
	for msg := range tp.input {
416
		if msg.retries == 0 {
417
			if err := tp.partitionMessage(msg); err != nil {
418
				tp.parent.returnError(msg, err)
419
				continue
420
			}
421
		}
422

423
		handler := tp.handlers[msg.Partition]
424
		if handler == nil {
425
			handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
426
			tp.handlers[msg.Partition] = handler
427
		}
428

429
		handler <- msg
430
	}
431

432
	for _, handler := range tp.handlers {
433
		close(handler)
434
	}
435
}
436

437
func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
438
	var partitions []int32
439

440
	err := tp.breaker.Run(func() (err error) {
441
		requiresConsistency := false
442
		if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
443
			requiresConsistency = ep.MessageRequiresConsistency(msg)
444
		} else {
445
			requiresConsistency = tp.partitioner.RequiresConsistency()
446
		}
447

448
		if requiresConsistency {
449
			partitions, err = tp.parent.client.Partitions(msg.Topic)
450
		} else {
451
			partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
452
		}
453
		return
454
	})
455
	if err != nil {
456
		return err
457
	}
458

459
	numPartitions := int32(len(partitions))
460

461
	if numPartitions == 0 {
462
		return ErrLeaderNotAvailable
463
	}
464

465
	choice, err := tp.partitioner.Partition(msg, numPartitions)
466

467
	if err != nil {
468
		return err
469
	} else if choice < 0 || choice >= numPartitions {
470
		return ErrInvalidPartition
471
	}
472

473
	msg.Partition = partitions[choice]
474

475
	return nil
476
}
477

478
// one per partition per topic
479
// dispatches messages to the appropriate broker
480
// also responsible for maintaining message order during retries
481
type partitionProducer struct {
482
	parent    *asyncProducer
483
	topic     string
484
	partition int32
485
	input     <-chan *ProducerMessage
486

487
	leader         *Broker
488
	breaker        *breaker.Breaker
489
	brokerProducer *brokerProducer
490

491
	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
492
	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
493
	// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
494
	// therefore whether our buffer is complete and safe to flush)
495
	highWatermark int
496
	retryState    []partitionRetryState
497
}
498

499
type partitionRetryState struct {
500
	buf          []*ProducerMessage
501
	expectChaser bool
502
}
503

504
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
505
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
506
	pp := &partitionProducer{
507
		parent:    p,
508
		topic:     topic,
509
		partition: partition,
510
		input:     input,
511

512
		breaker:    breaker.New(3, 1, 10*time.Second),
513
		retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
514
	}
515
	go withRecover(pp.dispatch)
516
	return input
517
}
518

519
func (pp *partitionProducer) backoff(retries int) {
520
	var backoff time.Duration
521
	if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
522
		maxRetries := pp.parent.conf.Producer.Retry.Max
523
		backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
524
	} else {
525
		backoff = pp.parent.conf.Producer.Retry.Backoff
526
	}
527
	if backoff > 0 {
528
		time.Sleep(backoff)
529
	}
530
}
531

532
func (pp *partitionProducer) dispatch() {
533
	// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
534
	// on the first message
535
	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
536
	if pp.leader != nil {
537
		pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
538
		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
539
		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
540
	}
541

542
	defer func() {
543
		if pp.brokerProducer != nil {
544
			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
545
		}
546
	}()
547

548
	for msg := range pp.input {
549
		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
550
			select {
551
			case <-pp.brokerProducer.abandoned:
552
				// a message on the abandoned channel means that our current broker selection is out of date
553
				Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
554
				pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
555
				pp.brokerProducer = nil
556
				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
557
			default:
558
				// producer connection is still open.
559
			}
560
		}
561

562
		if msg.retries > pp.highWatermark {
563
			// a new, higher, retry level; handle it and then back off
564
			pp.newHighWatermark(msg.retries)
565
			pp.backoff(msg.retries)
566
		} else if pp.highWatermark > 0 {
567
			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
568
			if msg.retries < pp.highWatermark {
569
				// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
570
				if msg.flags&fin == fin {
571
					pp.retryState[msg.retries].expectChaser = false
572
					pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
573
				} else {
574
					pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
575
				}
576
				continue
577
			} else if msg.flags&fin == fin {
578
				// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
579
				// meaning this retry level is done and we can go down (at least) one level and flush that
580
				pp.retryState[pp.highWatermark].expectChaser = false
581
				pp.flushRetryBuffers()
582
				pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
583
				continue
584
			}
585
		}
586

587
		// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
588
		// without breaking any of our ordering guarantees
589

590
		if pp.brokerProducer == nil {
591
			if err := pp.updateLeader(); err != nil {
592
				pp.parent.returnError(msg, err)
593
				pp.backoff(msg.retries)
594
				continue
595
			}
596
			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
597
		}
598

599
		// Now that we know we have a broker to actually try and send this message to, generate the sequence
600
		// number for it.
601
		// All messages being retried (sent or not) have already had their retry count updated
602
		// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
603
		if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
604
			msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
605
			msg.hasSequence = true
606
		}
607

608
		pp.brokerProducer.input <- msg
609
	}
610
}
611

612
func (pp *partitionProducer) newHighWatermark(hwm int) {
613
	Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
614
	pp.highWatermark = hwm
615

616
	// send off a fin so that we know when everything "in between" has made it
617
	// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
618
	pp.retryState[pp.highWatermark].expectChaser = true
619
	pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
620
	pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
621

622
	// a new HWM means that our current broker selection is out of date
623
	Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
624
	pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
625
	pp.brokerProducer = nil
626
}
627

628
func (pp *partitionProducer) flushRetryBuffers() {
629
	Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
630
	for {
631
		pp.highWatermark--
632

633
		if pp.brokerProducer == nil {
634
			if err := pp.updateLeader(); err != nil {
635
				pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
636
				goto flushDone
637
			}
638
			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
639
		}
640

641
		for _, msg := range pp.retryState[pp.highWatermark].buf {
642
			pp.brokerProducer.input <- msg
643
		}
644

645
	flushDone:
646
		pp.retryState[pp.highWatermark].buf = nil
647
		if pp.retryState[pp.highWatermark].expectChaser {
648
			Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
649
			break
650
		} else if pp.highWatermark == 0 {
651
			Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
652
			break
653
		}
654
	}
655
}
656

657
func (pp *partitionProducer) updateLeader() error {
658
	return pp.breaker.Run(func() (err error) {
659
		if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
660
			return err
661
		}
662

663
		if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
664
			return err
665
		}
666

667
		pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
668
		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
669
		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
670

671
		return nil
672
	})
673
}
674

675
// one per broker; also constructs an associated flusher
676
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
677
	var (
678
		input     = make(chan *ProducerMessage)
679
		bridge    = make(chan *produceSet)
680
		pending   = make(chan *brokerProducerResponse)
681
		responses = make(chan *brokerProducerResponse)
682
	)
683

684
	bp := &brokerProducer{
685
		parent:         p,
686
		broker:         broker,
687
		input:          input,
688
		output:         bridge,
689
		responses:      responses,
690
		buffer:         newProduceSet(p),
691
		currentRetries: make(map[string]map[int32]error),
692
	}
693
	go withRecover(bp.run)
694

695
	// minimal bridge to make the network response `select`able
696
	go withRecover(func() {
697
		// Use a wait group to know if we still have in flight requests
698
		var wg sync.WaitGroup
699

700
		for set := range bridge {
701
			request := set.buildRequest()
702

703
			// Count the in flight requests to know when we can close the pending channel safely
704
			wg.Add(1)
705
			// Capture the current set to forward in the callback
706
			sendResponse := func(set *produceSet) ProduceCallback {
707
				return func(response *ProduceResponse, err error) {
708
					// Forward the response to make sure we do not block the responseReceiver
709
					pending <- &brokerProducerResponse{
710
						set: set,
711
						err: err,
712
						res: response,
713
					}
714
					wg.Done()
715
				}
716
			}(set)
717

718
			// Use AsyncProduce vs Produce to not block waiting for the response
719
			// so that we can pipeline multiple produce requests and achieve higher throughput, see:
720
			// https://kafka.apache.org/protocol#protocol_network
721
			err := broker.AsyncProduce(request, sendResponse)
722
			if err != nil {
723
				// Request failed to be sent
724
				sendResponse(nil, err)
725
				continue
726
			}
727
			// Callback is not called when using NoResponse
728
			if p.conf.Producer.RequiredAcks == NoResponse {
729
				// Provide the expected nil response
730
				sendResponse(nil, nil)
731
			}
732
		}
733
		// Wait for all in flight requests to close the pending channel safely
734
		wg.Wait()
735
		close(pending)
736
	})
737

738
	// In order to avoid a deadlock when closing the broker on network or malformed response error
739
	// we use an intermediate channel to buffer and send pending responses in order
740
	// This is because the AsyncProduce callback inside the bridge is invoked from the broker
741
	// responseReceiver goroutine and closing the broker requires such goroutine to be finished
742
	go withRecover(func() {
743
		buf := queue.New()
744
		for {
745
			if buf.Length() == 0 {
746
				res, ok := <-pending
747
				if !ok {
748
					// We are done forwarding the last pending response
749
					close(responses)
750
					return
751
				}
752
				buf.Add(res)
753
			}
754
			// Send the head pending response or buffer another one
755
			// so that we never block the callback
756
			headRes := buf.Peek().(*brokerProducerResponse)
757
			select {
758
			case res, ok := <-pending:
759
				if !ok {
760
					continue
761
				}
762
				buf.Add(res)
763
				continue
764
			case responses <- headRes:
765
				buf.Remove()
766
				continue
767
			}
768
		}
769
	})
770

771
	if p.conf.Producer.Retry.Max <= 0 {
772
		bp.abandoned = make(chan struct{})
773
	}
774

775
	return bp
776
}
777

778
type brokerProducerResponse struct {
779
	set *produceSet
780
	err error
781
	res *ProduceResponse
782
}
783

784
// groups messages together into appropriately-sized batches for sending to the broker
785
// handles state related to retries etc
786
type brokerProducer struct {
787
	parent *asyncProducer
788
	broker *Broker
789

790
	input     chan *ProducerMessage
791
	output    chan<- *produceSet
792
	responses <-chan *brokerProducerResponse
793
	abandoned chan struct{}
794

795
	buffer     *produceSet
796
	timer      <-chan time.Time
797
	timerFired bool
798

799
	closing        error
800
	currentRetries map[string]map[int32]error
801
}
802

803
func (bp *brokerProducer) run() {
804
	var output chan<- *produceSet
805
	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
806

807
	for {
808
		select {
809
		case msg, ok := <-bp.input:
810
			if !ok {
811
				Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
812
				bp.shutdown()
813
				return
814
			}
815

816
			if msg == nil {
817
				continue
818
			}
819

820
			if msg.flags&syn == syn {
821
				Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
822
					bp.broker.ID(), msg.Topic, msg.Partition)
823
				if bp.currentRetries[msg.Topic] == nil {
824
					bp.currentRetries[msg.Topic] = make(map[int32]error)
825
				}
826
				bp.currentRetries[msg.Topic][msg.Partition] = nil
827
				bp.parent.inFlight.Done()
828
				continue
829
			}
830

831
			if reason := bp.needsRetry(msg); reason != nil {
832
				bp.parent.retryMessage(msg, reason)
833

834
				if bp.closing == nil && msg.flags&fin == fin {
835
					// we were retrying this partition but we can start processing again
836
					delete(bp.currentRetries[msg.Topic], msg.Partition)
837
					Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
838
						bp.broker.ID(), msg.Topic, msg.Partition)
839
				}
840

841
				continue
842
			}
843

844
			if msg.flags&fin == fin {
845
				// New broker producer that was caught up by the retry loop
846
				bp.parent.retryMessage(msg, ErrShuttingDown)
847
				DebugLogger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
848
					bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
849
				continue
850
			}
851

852
			if bp.buffer.wouldOverflow(msg) {
853
				Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
854
				if err := bp.waitForSpace(msg, false); err != nil {
855
					bp.parent.retryMessage(msg, err)
856
					continue
857
				}
858
			}
859

860
			if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
861
				// The epoch was reset, need to roll the buffer over
862
				Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
863
				if err := bp.waitForSpace(msg, true); err != nil {
864
					bp.parent.retryMessage(msg, err)
865
					continue
866
				}
867
			}
868
			if err := bp.buffer.add(msg); err != nil {
869
				bp.parent.returnError(msg, err)
870
				continue
871
			}
872

873
			if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
874
				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
875
			}
876
		case <-bp.timer:
877
			bp.timerFired = true
878
		case output <- bp.buffer:
879
			bp.rollOver()
880
		case response, ok := <-bp.responses:
881
			if ok {
882
				bp.handleResponse(response)
883
			}
884
		}
885

886
		if bp.timerFired || bp.buffer.readyToFlush() {
887
			output = bp.output
888
		} else {
889
			output = nil
890
		}
891
	}
892
}
893

894
func (bp *brokerProducer) shutdown() {
895
	for !bp.buffer.empty() {
896
		select {
897
		case response := <-bp.responses:
898
			bp.handleResponse(response)
899
		case bp.output <- bp.buffer:
900
			bp.rollOver()
901
		}
902
	}
903
	close(bp.output)
904
	// Drain responses from the bridge goroutine
905
	for response := range bp.responses {
906
		bp.handleResponse(response)
907
	}
908
	// No more brokerProducer related goroutine should be running
909
	Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
910
}
911

912
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
913
	if bp.closing != nil {
914
		return bp.closing
915
	}
916

917
	return bp.currentRetries[msg.Topic][msg.Partition]
918
}
919

920
func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
921
	for {
922
		select {
923
		case response := <-bp.responses:
924
			bp.handleResponse(response)
925
			// handling a response can change our state, so re-check some things
926
			if reason := bp.needsRetry(msg); reason != nil {
927
				return reason
928
			} else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
929
				return nil
930
			}
931
		case bp.output <- bp.buffer:
932
			bp.rollOver()
933
			return nil
934
		}
935
	}
936
}
937

938
func (bp *brokerProducer) rollOver() {
939
	bp.timer = nil
940
	bp.timerFired = false
941
	bp.buffer = newProduceSet(bp.parent)
942
}
943

944
func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
945
	if response.err != nil {
946
		bp.handleError(response.set, response.err)
947
	} else {
948
		bp.handleSuccess(response.set, response.res)
949
	}
950

951
	if bp.buffer.empty() {
952
		bp.rollOver() // this can happen if the response invalidated our buffer
953
	}
954
}
955

956
func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
957
	// we iterate through the blocks in the request set, not the response, so that we notice
958
	// if the response is missing a block completely
959
	var retryTopics []string
960
	sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
961
		if response == nil {
962
			// this only happens when RequiredAcks is NoResponse, so we have to assume success
963
			bp.parent.returnSuccesses(pSet.msgs)
964
			return
965
		}
966

967
		block := response.GetBlock(topic, partition)
968
		if block == nil {
969
			bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
970
			return
971
		}
972

973
		switch block.Err {
974
		// Success
975
		case ErrNoError:
976
			if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
977
				for _, msg := range pSet.msgs {
978
					msg.Timestamp = block.Timestamp
979
				}
980
			}
981
			for i, msg := range pSet.msgs {
982
				msg.Offset = block.Offset + int64(i)
983
			}
984
			bp.parent.returnSuccesses(pSet.msgs)
985
		// Duplicate
986
		case ErrDuplicateSequenceNumber:
987
			bp.parent.returnSuccesses(pSet.msgs)
988
		// Retriable errors
989
		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
990
			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
991
			if bp.parent.conf.Producer.Retry.Max <= 0 {
992
				bp.parent.abandonBrokerConnection(bp.broker)
993
				bp.parent.returnErrors(pSet.msgs, block.Err)
994
			} else {
995
				retryTopics = append(retryTopics, topic)
996
			}
997
		// Other non-retriable errors
998
		default:
999
			if bp.parent.conf.Producer.Retry.Max <= 0 {
1000
				bp.parent.abandonBrokerConnection(bp.broker)
1001
			}
1002
			bp.parent.returnErrors(pSet.msgs, block.Err)
1003
		}
1004
	})
1005

1006
	if len(retryTopics) > 0 {
1007
		if bp.parent.conf.Producer.Idempotent {
1008
			err := bp.parent.client.RefreshMetadata(retryTopics...)
1009
			if err != nil {
1010
				Logger.Printf("Failed refreshing metadata because of %v\n", err)
1011
			}
1012
		}
1013

1014
		sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1015
			block := response.GetBlock(topic, partition)
1016
			if block == nil {
1017
				// handled in the previous "eachPartition" loop
1018
				return
1019
			}
1020

1021
			switch block.Err {
1022
			case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1023
				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
1024
				Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
1025
					bp.broker.ID(), topic, partition, block.Err)
1026
				if bp.currentRetries[topic] == nil {
1027
					bp.currentRetries[topic] = make(map[int32]error)
1028
				}
1029
				bp.currentRetries[topic][partition] = block.Err
1030
				if bp.parent.conf.Producer.Idempotent {
1031
					go bp.parent.retryBatch(topic, partition, pSet, block.Err)
1032
				} else {
1033
					bp.parent.retryMessages(pSet.msgs, block.Err)
1034
				}
1035
				// dropping the following messages has the side effect of incrementing their retry count
1036
				bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
1037
			}
1038
		})
1039
	}
1040
}
1041

1042
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
1043
	Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
1044
	produceSet := newProduceSet(p)
1045
	produceSet.msgs[topic] = make(map[int32]*partitionSet)
1046
	produceSet.msgs[topic][partition] = pSet
1047
	produceSet.bufferBytes += pSet.bufferBytes
1048
	produceSet.bufferCount += len(pSet.msgs)
1049
	for _, msg := range pSet.msgs {
1050
		if msg.retries >= p.conf.Producer.Retry.Max {
1051
			p.returnError(msg, kerr)
1052
			return
1053
		}
1054
		msg.retries++
1055
	}
1056

1057
	// it's expected that a metadata refresh has been requested prior to calling retryBatch
1058
	leader, err := p.client.Leader(topic, partition)
1059
	if err != nil {
1060
		Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
1061
		for _, msg := range pSet.msgs {
1062
			p.returnError(msg, kerr)
1063
		}
1064
		return
1065
	}
1066
	bp := p.getBrokerProducer(leader)
1067
	bp.output <- produceSet
1068
	p.unrefBrokerProducer(leader, bp)
1069
}
1070

1071
func (bp *brokerProducer) handleError(sent *produceSet, err error) {
1072
	var target PacketEncodingError
1073
	if errors.As(err, &target) {
1074
		sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1075
			bp.parent.returnErrors(pSet.msgs, err)
1076
		})
1077
	} else {
1078
		Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
1079
		bp.parent.abandonBrokerConnection(bp.broker)
1080
		_ = bp.broker.Close()
1081
		bp.closing = err
1082
		sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1083
			bp.parent.retryMessages(pSet.msgs, err)
1084
		})
1085
		bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1086
			bp.parent.retryMessages(pSet.msgs, err)
1087
		})
1088
		bp.rollOver()
1089
	}
1090
}
1091

1092
// singleton
1093
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
1094
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
1095
func (p *asyncProducer) retryHandler() {
1096
	var msg *ProducerMessage
1097
	buf := queue.New()
1098

1099
	for {
1100
		if buf.Length() == 0 {
1101
			msg = <-p.retries
1102
		} else {
1103
			select {
1104
			case msg = <-p.retries:
1105
			case p.input <- buf.Peek().(*ProducerMessage):
1106
				buf.Remove()
1107
				continue
1108
			}
1109
		}
1110

1111
		if msg == nil {
1112
			return
1113
		}
1114

1115
		buf.Add(msg)
1116
	}
1117
}
1118

1119
// utility functions
1120

1121
func (p *asyncProducer) shutdown() {
1122
	Logger.Println("Producer shutting down.")
1123
	p.inFlight.Add(1)
1124
	p.input <- &ProducerMessage{flags: shutdown}
1125

1126
	p.inFlight.Wait()
1127

1128
	err := p.client.Close()
1129
	if err != nil {
1130
		Logger.Println("producer/shutdown failed to close the embedded client:", err)
1131
	}
1132

1133
	close(p.input)
1134
	close(p.retries)
1135
	close(p.errors)
1136
	close(p.successes)
1137
}
1138

1139
func (p *asyncProducer) bumpIdempotentProducerEpoch() {
1140
	_, epoch := p.txnmgr.getProducerID()
1141
	if epoch == math.MaxInt16 {
1142
		Logger.Println("producer/txnmanager epoch exhausted, requesting new producer ID")
1143
		txnmgr, err := newTransactionManager(p.conf, p.client)
1144
		if err != nil {
1145
			Logger.Println(err)
1146
			return
1147
		}
1148

1149
		p.txnmgr = txnmgr
1150
	} else {
1151
		p.txnmgr.bumpEpoch()
1152
	}
1153
}
1154

1155
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1156
	// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
1157
	// will never see a message with this number, so we can never continue the sequence.
1158
	if msg.hasSequence {
1159
		Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
1160
		p.bumpIdempotentProducerEpoch()
1161
	}
1162

1163
	msg.clear()
1164
	pErr := &ProducerError{Msg: msg, Err: err}
1165
	if p.conf.Producer.Return.Errors {
1166
		p.errors <- pErr
1167
	} else {
1168
		Logger.Println(pErr)
1169
	}
1170
	p.inFlight.Done()
1171
}
1172

1173
func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1174
	for _, msg := range batch {
1175
		p.returnError(msg, err)
1176
	}
1177
}
1178

1179
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1180
	for _, msg := range batch {
1181
		if p.conf.Producer.Return.Successes {
1182
			msg.clear()
1183
			p.successes <- msg
1184
		}
1185
		p.inFlight.Done()
1186
	}
1187
}
1188

1189
func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1190
	if msg.retries >= p.conf.Producer.Retry.Max {
1191
		p.returnError(msg, err)
1192
	} else {
1193
		msg.retries++
1194
		p.retries <- msg
1195
	}
1196
}
1197

1198
func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1199
	for _, msg := range batch {
1200
		p.retryMessage(msg, err)
1201
	}
1202
}
1203

1204
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1205
	p.brokerLock.Lock()
1206
	defer p.brokerLock.Unlock()
1207

1208
	bp := p.brokers[broker]
1209

1210
	if bp == nil {
1211
		bp = p.newBrokerProducer(broker)
1212
		p.brokers[broker] = bp
1213
		p.brokerRefs[bp] = 0
1214
	}
1215

1216
	p.brokerRefs[bp]++
1217

1218
	return bp
1219
}
1220

1221
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1222
	p.brokerLock.Lock()
1223
	defer p.brokerLock.Unlock()
1224

1225
	p.brokerRefs[bp]--
1226
	if p.brokerRefs[bp] == 0 {
1227
		close(bp.input)
1228
		delete(p.brokerRefs, bp)
1229

1230
		if p.brokers[broker] == bp {
1231
			delete(p.brokers, broker)
1232
		}
1233
	}
1234
}
1235

1236
func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1237
	p.brokerLock.Lock()
1238
	defer p.brokerLock.Unlock()
1239

1240
	bc, ok := p.brokers[broker]
1241
	if ok && bc.abandoned != nil {
1242
		close(bc.abandoned)
1243
	}
1244

1245
	delete(p.brokers, broker)
1246
}
1247

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.