cubefs

Форк
0
/
produce_request.go 
258 строк · 7.2 Кб
1
package sarama
2

3
import "github.com/rcrowley/go-metrics"
4

5
// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
6
// it must see before responding. Any of the constants defined here are valid. On broker versions
7
// prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
8
// acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
9
// by setting the `min.isr` value in the brokers configuration).
10
type RequiredAcks int16
11

12
const (
13
	// NoResponse doesn't send any response, the TCP ACK is all you get.
14
	NoResponse RequiredAcks = 0
15
	// WaitForLocal waits for only the local commit to succeed before responding.
16
	WaitForLocal RequiredAcks = 1
17
	// WaitForAll waits for all in-sync replicas to commit before responding.
18
	// The minimum number of in-sync replicas is configured on the broker via
19
	// the `min.insync.replicas` configuration key.
20
	WaitForAll RequiredAcks = -1
21
)
22

23
type ProduceRequest struct {
24
	TransactionalID *string
25
	RequiredAcks    RequiredAcks
26
	Timeout         int32
27
	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
28
	records         map[string]map[int32]Records
29
}
30

31
func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
32
	topicCompressionRatioMetric metrics.Histogram) int64 {
33
	var topicRecordCount int64
34
	for _, messageBlock := range msgSet.Messages {
35
		// Is this a fake "message" wrapping real messages?
36
		if messageBlock.Msg.Set != nil {
37
			topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
38
		} else {
39
			// A single uncompressed message
40
			topicRecordCount++
41
		}
42
		// Better be safe than sorry when computing the compression ratio
43
		if messageBlock.Msg.compressedSize != 0 {
44
			compressionRatio := float64(len(messageBlock.Msg.Value)) /
45
				float64(messageBlock.Msg.compressedSize)
46
			// Histogram do not support decimal values, let's multiple it by 100 for better precision
47
			intCompressionRatio := int64(100 * compressionRatio)
48
			compressionRatioMetric.Update(intCompressionRatio)
49
			topicCompressionRatioMetric.Update(intCompressionRatio)
50
		}
51
	}
52
	return topicRecordCount
53
}
54

55
func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
56
	topicCompressionRatioMetric metrics.Histogram) int64 {
57
	if recordBatch.compressedRecords != nil {
58
		compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
59
		compressionRatioMetric.Update(compressionRatio)
60
		topicCompressionRatioMetric.Update(compressionRatio)
61
	}
62

63
	return int64(len(recordBatch.Records))
64
}
65

66
func (r *ProduceRequest) encode(pe packetEncoder) error {
67
	if r.Version >= 3 {
68
		if err := pe.putNullableString(r.TransactionalID); err != nil {
69
			return err
70
		}
71
	}
72
	pe.putInt16(int16(r.RequiredAcks))
73
	pe.putInt32(r.Timeout)
74
	metricRegistry := pe.metricRegistry()
75
	var batchSizeMetric metrics.Histogram
76
	var compressionRatioMetric metrics.Histogram
77
	if metricRegistry != nil {
78
		batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
79
		compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
80
	}
81
	totalRecordCount := int64(0)
82

83
	err := pe.putArrayLength(len(r.records))
84
	if err != nil {
85
		return err
86
	}
87

88
	for topic, partitions := range r.records {
89
		err = pe.putString(topic)
90
		if err != nil {
91
			return err
92
		}
93
		err = pe.putArrayLength(len(partitions))
94
		if err != nil {
95
			return err
96
		}
97
		topicRecordCount := int64(0)
98
		var topicCompressionRatioMetric metrics.Histogram
99
		if metricRegistry != nil {
100
			topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
101
		}
102
		for id, records := range partitions {
103
			startOffset := pe.offset()
104
			pe.putInt32(id)
105
			pe.push(&lengthField{})
106
			err = records.encode(pe)
107
			if err != nil {
108
				return err
109
			}
110
			err = pe.pop()
111
			if err != nil {
112
				return err
113
			}
114
			if metricRegistry != nil {
115
				if r.Version >= 3 {
116
					topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
117
				} else {
118
					topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
119
				}
120
				batchSize := int64(pe.offset() - startOffset)
121
				batchSizeMetric.Update(batchSize)
122
				getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
123
			}
124
		}
125
		if topicRecordCount > 0 {
126
			getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
127
			getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
128
			totalRecordCount += topicRecordCount
129
		}
130
	}
131
	if totalRecordCount > 0 {
132
		metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
133
		getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
134
	}
135

136
	return nil
137
}
138

139
func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
140
	r.Version = version
141

142
	if version >= 3 {
143
		id, err := pd.getNullableString()
144
		if err != nil {
145
			return err
146
		}
147
		r.TransactionalID = id
148
	}
149
	requiredAcks, err := pd.getInt16()
150
	if err != nil {
151
		return err
152
	}
153
	r.RequiredAcks = RequiredAcks(requiredAcks)
154
	if r.Timeout, err = pd.getInt32(); err != nil {
155
		return err
156
	}
157
	topicCount, err := pd.getArrayLength()
158
	if err != nil {
159
		return err
160
	}
161
	if topicCount == 0 {
162
		return nil
163
	}
164

165
	r.records = make(map[string]map[int32]Records)
166
	for i := 0; i < topicCount; i++ {
167
		topic, err := pd.getString()
168
		if err != nil {
169
			return err
170
		}
171
		partitionCount, err := pd.getArrayLength()
172
		if err != nil {
173
			return err
174
		}
175
		r.records[topic] = make(map[int32]Records)
176

177
		for j := 0; j < partitionCount; j++ {
178
			partition, err := pd.getInt32()
179
			if err != nil {
180
				return err
181
			}
182
			size, err := pd.getInt32()
183
			if err != nil {
184
				return err
185
			}
186
			recordsDecoder, err := pd.getSubset(int(size))
187
			if err != nil {
188
				return err
189
			}
190
			var records Records
191
			if err := records.decode(recordsDecoder); err != nil {
192
				return err
193
			}
194
			r.records[topic][partition] = records
195
		}
196
	}
197

198
	return nil
199
}
200

201
func (r *ProduceRequest) key() int16 {
202
	return 0
203
}
204

205
func (r *ProduceRequest) version() int16 {
206
	return r.Version
207
}
208

209
func (r *ProduceRequest) headerVersion() int16 {
210
	return 1
211
}
212

213
func (r *ProduceRequest) requiredVersion() KafkaVersion {
214
	switch r.Version {
215
	case 1:
216
		return V0_9_0_0
217
	case 2:
218
		return V0_10_0_0
219
	case 3:
220
		return V0_11_0_0
221
	case 7:
222
		return V2_1_0_0
223
	default:
224
		return MinVersion
225
	}
226
}
227

228
func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
229
	if r.records == nil {
230
		r.records = make(map[string]map[int32]Records)
231
	}
232

233
	if r.records[topic] == nil {
234
		r.records[topic] = make(map[int32]Records)
235
	}
236
}
237

238
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
239
	r.ensureRecords(topic, partition)
240
	set := r.records[topic][partition].MsgSet
241

242
	if set == nil {
243
		set = new(MessageSet)
244
		r.records[topic][partition] = newLegacyRecords(set)
245
	}
246

247
	set.addMessage(msg)
248
}
249

250
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
251
	r.ensureRecords(topic, partition)
252
	r.records[topic][partition] = newLegacyRecords(set)
253
}
254

255
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
256
	r.ensureRecords(topic, partition)
257
	r.records[topic][partition] = newDefaultRecords(batch)
258
}
259

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.