cubefs

Форк
0
148 строк · 4.6 Кб
1
package sarama
2

3
import "sync"
4

5
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
6
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
7
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
8
//
9
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
10
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
11
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
12
//
13
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
14
// be set to true in its configuration.
15
type SyncProducer interface {
16

17
	// SendMessage produces a given message, and returns only when it either has
18
	// succeeded or failed to produce. It will return the partition and the offset
19
	// of the produced message, or an error if the message failed to produce.
20
	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
21

22
	// SendMessages produces a given set of messages, and returns only when all
23
	// messages in the set have either succeeded or failed. Note that messages
24
	// can succeed and fail individually; if some succeed and some fail,
25
	// SendMessages will return an error.
26
	SendMessages(msgs []*ProducerMessage) error
27

28
	// Close shuts down the producer; you must call this function before a producer
29
	// object passes out of scope, as it may otherwise leak memory.
30
	// You must call this before calling Close on the underlying client.
31
	Close() error
32
}
33

34
type syncProducer struct {
35
	producer *asyncProducer
36
	wg       sync.WaitGroup
37
}
38

39
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
40
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
41
	if config == nil {
42
		config = NewConfig()
43
		config.Producer.Return.Successes = true
44
	}
45

46
	if err := verifyProducerConfig(config); err != nil {
47
		return nil, err
48
	}
49

50
	p, err := NewAsyncProducer(addrs, config)
51
	if err != nil {
52
		return nil, err
53
	}
54
	return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
55
}
56

57
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
58
// necessary to call Close() on the underlying client when shutting down this producer.
59
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
60
	if err := verifyProducerConfig(client.Config()); err != nil {
61
		return nil, err
62
	}
63

64
	p, err := NewAsyncProducerFromClient(client)
65
	if err != nil {
66
		return nil, err
67
	}
68
	return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
69
}
70

71
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
72
	sp := &syncProducer{producer: p}
73

74
	sp.wg.Add(2)
75
	go withRecover(sp.handleSuccesses)
76
	go withRecover(sp.handleErrors)
77

78
	return sp
79
}
80

81
func verifyProducerConfig(config *Config) error {
82
	if !config.Producer.Return.Errors {
83
		return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
84
	}
85
	if !config.Producer.Return.Successes {
86
		return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
87
	}
88
	return nil
89
}
90

91
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
92
	expectation := make(chan *ProducerError, 1)
93
	msg.expectation = expectation
94
	sp.producer.Input() <- msg
95

96
	if pErr := <-expectation; pErr != nil {
97
		return -1, -1, pErr.Err
98
	}
99

100
	return msg.Partition, msg.Offset, nil
101
}
102

103
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
104
	expectations := make(chan chan *ProducerError, len(msgs))
105
	go func() {
106
		for _, msg := range msgs {
107
			expectation := make(chan *ProducerError, 1)
108
			msg.expectation = expectation
109
			sp.producer.Input() <- msg
110
			expectations <- expectation
111
		}
112
		close(expectations)
113
	}()
114

115
	var errors ProducerErrors
116
	for expectation := range expectations {
117
		if pErr := <-expectation; pErr != nil {
118
			errors = append(errors, pErr)
119
		}
120
	}
121

122
	if len(errors) > 0 {
123
		return errors
124
	}
125
	return nil
126
}
127

128
func (sp *syncProducer) handleSuccesses() {
129
	defer sp.wg.Done()
130
	for msg := range sp.producer.Successes() {
131
		expectation := msg.expectation
132
		expectation <- nil
133
	}
134
}
135

136
func (sp *syncProducer) handleErrors() {
137
	defer sp.wg.Done()
138
	for err := range sp.producer.Errors() {
139
		expectation := err.Msg.expectation
140
		expectation <- err
141
	}
142
}
143

144
func (sp *syncProducer) Close() error {
145
	sp.producer.AsyncClose()
146
	sp.wg.Wait()
147
	return nil
148
}
149

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.