cubefs

Форк
0
273 строки · 8.4 Кб
1
package sarama
2

3
import (
4
	"encoding/binary"
5
	"errors"
6
	"time"
7
)
8

9
type partitionSet struct {
10
	msgs          []*ProducerMessage
11
	recordsToSend Records
12
	bufferBytes   int
13
}
14

15
type produceSet struct {
16
	parent        *asyncProducer
17
	msgs          map[string]map[int32]*partitionSet
18
	producerID    int64
19
	producerEpoch int16
20

21
	bufferBytes int
22
	bufferCount int
23
}
24

25
func newProduceSet(parent *asyncProducer) *produceSet {
26
	pid, epoch := parent.txnmgr.getProducerID()
27
	return &produceSet{
28
		msgs:          make(map[string]map[int32]*partitionSet),
29
		parent:        parent,
30
		producerID:    pid,
31
		producerEpoch: epoch,
32
	}
33
}
34

35
func (ps *produceSet) add(msg *ProducerMessage) error {
36
	var err error
37
	var key, val []byte
38

39
	if msg.Key != nil {
40
		if key, err = msg.Key.Encode(); err != nil {
41
			return err
42
		}
43
	}
44

45
	if msg.Value != nil {
46
		if val, err = msg.Value.Encode(); err != nil {
47
			return err
48
		}
49
	}
50

51
	timestamp := msg.Timestamp
52
	if timestamp.IsZero() {
53
		timestamp = time.Now()
54
	}
55
	timestamp = timestamp.Truncate(time.Millisecond)
56

57
	partitions := ps.msgs[msg.Topic]
58
	if partitions == nil {
59
		partitions = make(map[int32]*partitionSet)
60
		ps.msgs[msg.Topic] = partitions
61
	}
62

63
	var size int
64

65
	set := partitions[msg.Partition]
66
	if set == nil {
67
		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
68
			batch := &RecordBatch{
69
				FirstTimestamp:   timestamp,
70
				Version:          2,
71
				Codec:            ps.parent.conf.Producer.Compression,
72
				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
73
				ProducerID:       ps.producerID,
74
				ProducerEpoch:    ps.producerEpoch,
75
			}
76
			if ps.parent.conf.Producer.Idempotent {
77
				batch.FirstSequence = msg.sequenceNumber
78
			}
79
			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
80
			size = recordBatchOverhead
81
		} else {
82
			set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
83
		}
84
		partitions[msg.Partition] = set
85
	}
86

87
	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
88
		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
89
			return errors.New("assertion failed: message out of sequence added to a batch")
90
		}
91
	}
92

93
	// Past this point we can't return an error, because we've already added the message to the set.
94
	set.msgs = append(set.msgs, msg)
95

96
	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
97
		// We are being conservative here to avoid having to prep encode the record
98
		size += maximumRecordOverhead
99
		rec := &Record{
100
			Key:            key,
101
			Value:          val,
102
			TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
103
		}
104
		size += len(key) + len(val)
105
		if len(msg.Headers) > 0 {
106
			rec.Headers = make([]*RecordHeader, len(msg.Headers))
107
			for i := range msg.Headers {
108
				rec.Headers[i] = &msg.Headers[i]
109
				size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
110
			}
111
		}
112
		set.recordsToSend.RecordBatch.addRecord(rec)
113
	} else {
114
		msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
115
		if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
116
			msgToSend.Timestamp = timestamp
117
			msgToSend.Version = 1
118
		}
119
		set.recordsToSend.MsgSet.addMessage(msgToSend)
120
		size = producerMessageOverhead + len(key) + len(val)
121
	}
122

123
	set.bufferBytes += size
124
	ps.bufferBytes += size
125
	ps.bufferCount++
126

127
	return nil
128
}
129

130
func (ps *produceSet) buildRequest() *ProduceRequest {
131
	req := &ProduceRequest{
132
		RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
133
		Timeout:      int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
134
	}
135
	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
136
		req.Version = 2
137
	}
138
	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
139
		req.Version = 3
140
	}
141

142
	if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
143
		req.Version = 7
144
	}
145

146
	for topic, partitionSets := range ps.msgs {
147
		for partition, set := range partitionSets {
148
			if req.Version >= 3 {
149
				// If the API version we're hitting is 3 or greater, we need to calculate
150
				// offsets for each record in the batch relative to FirstOffset.
151
				// Additionally, we must set LastOffsetDelta to the value of the last offset
152
				// in the batch. Since the OffsetDelta of the first record is 0, we know that the
153
				// final record of any batch will have an offset of (# of records in batch) - 1.
154
				// (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
155
				//  under the RecordBatch section for details.)
156
				rb := set.recordsToSend.RecordBatch
157
				if len(rb.Records) > 0 {
158
					rb.LastOffsetDelta = int32(len(rb.Records) - 1)
159
					for i, record := range rb.Records {
160
						record.OffsetDelta = int64(i)
161
					}
162
				}
163
				req.AddBatch(topic, partition, rb)
164
				continue
165
			}
166
			if ps.parent.conf.Producer.Compression == CompressionNone {
167
				req.AddSet(topic, partition, set.recordsToSend.MsgSet)
168
			} else {
169
				// When compression is enabled, the entire set for each partition is compressed
170
				// and sent as the payload of a single fake "message" with the appropriate codec
171
				// set and no key. When the server sees a message with a compression codec, it
172
				// decompresses the payload and treats the result as its message set.
173

174
				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
175
					// If our version is 0.10 or later, assign relative offsets
176
					// to the inner messages. This lets the broker avoid
177
					// recompressing the message set.
178
					// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
179
					// for details on relative offsets.)
180
					for i, msg := range set.recordsToSend.MsgSet.Messages {
181
						msg.Offset = int64(i)
182
					}
183
				}
184
				payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
185
				if err != nil {
186
					Logger.Println(err) // if this happens, it's basically our fault.
187
					panic(err)
188
				}
189
				compMsg := &Message{
190
					Codec:            ps.parent.conf.Producer.Compression,
191
					CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
192
					Key:              nil,
193
					Value:            payload,
194
					Set:              set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
195
				}
196
				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
197
					compMsg.Version = 1
198
					compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
199
				}
200
				req.AddMessage(topic, partition, compMsg)
201
			}
202
		}
203
	}
204

205
	return req
206
}
207

208
func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
209
	for topic, partitionSet := range ps.msgs {
210
		for partition, set := range partitionSet {
211
			cb(topic, partition, set)
212
		}
213
	}
214
}
215

216
func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
217
	if ps.msgs[topic] == nil {
218
		return nil
219
	}
220
	set := ps.msgs[topic][partition]
221
	if set == nil {
222
		return nil
223
	}
224
	ps.bufferBytes -= set.bufferBytes
225
	ps.bufferCount -= len(set.msgs)
226
	delete(ps.msgs[topic], partition)
227
	return set.msgs
228
}
229

230
func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
231
	version := 1
232
	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
233
		version = 2
234
	}
235

236
	switch {
237
	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
238
	case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
239
		return true
240
	// Would we overflow the size-limit of a message-batch for this partition?
241
	case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
242
		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
243
		return true
244
	// Would we overflow simply in number of messages?
245
	case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
246
		return true
247
	default:
248
		return false
249
	}
250
}
251

252
func (ps *produceSet) readyToFlush() bool {
253
	switch {
254
	// If we don't have any messages, nothing else matters
255
	case ps.empty():
256
		return false
257
	// If all three config values are 0, we always flush as-fast-as-possible
258
	case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
259
		return true
260
	// If we've passed the message trigger-point
261
	case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
262
		return true
263
	// If we've passed the byte trigger-point
264
	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
265
		return true
266
	default:
267
		return false
268
	}
269
}
270

271
func (ps *produceSet) empty() bool {
272
	return ps.bufferCount == 0
273
}
274

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.