cubefs

Форк
0
295 строк · 5.9 Кб
1
package sarama
2

3
type fetchRequestBlock struct {
4
	Version            int16
5
	currentLeaderEpoch int32
6
	fetchOffset        int64
7
	logStartOffset     int64
8
	maxBytes           int32
9
}
10

11
func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
12
	b.Version = version
13
	if b.Version >= 9 {
14
		pe.putInt32(b.currentLeaderEpoch)
15
	}
16
	pe.putInt64(b.fetchOffset)
17
	if b.Version >= 5 {
18
		pe.putInt64(b.logStartOffset)
19
	}
20
	pe.putInt32(b.maxBytes)
21
	return nil
22
}
23

24
func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
25
	b.Version = version
26
	if b.Version >= 9 {
27
		if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
28
			return err
29
		}
30
	}
31
	if b.fetchOffset, err = pd.getInt64(); err != nil {
32
		return err
33
	}
34
	if b.Version >= 5 {
35
		if b.logStartOffset, err = pd.getInt64(); err != nil {
36
			return err
37
		}
38
	}
39
	if b.maxBytes, err = pd.getInt32(); err != nil {
40
		return err
41
	}
42
	return nil
43
}
44

45
// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
46
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that.  The KIP is at
47
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
48
type FetchRequest struct {
49
	MaxWaitTime  int32
50
	MinBytes     int32
51
	MaxBytes     int32
52
	Version      int16
53
	Isolation    IsolationLevel
54
	SessionID    int32
55
	SessionEpoch int32
56
	blocks       map[string]map[int32]*fetchRequestBlock
57
	forgotten    map[string][]int32
58
	RackID       string
59
}
60

61
type IsolationLevel int8
62

63
const (
64
	ReadUncommitted IsolationLevel = iota
65
	ReadCommitted
66
)
67

68
func (r *FetchRequest) encode(pe packetEncoder) (err error) {
69
	pe.putInt32(-1) // replica ID is always -1 for clients
70
	pe.putInt32(r.MaxWaitTime)
71
	pe.putInt32(r.MinBytes)
72
	if r.Version >= 3 {
73
		pe.putInt32(r.MaxBytes)
74
	}
75
	if r.Version >= 4 {
76
		pe.putInt8(int8(r.Isolation))
77
	}
78
	if r.Version >= 7 {
79
		pe.putInt32(r.SessionID)
80
		pe.putInt32(r.SessionEpoch)
81
	}
82
	err = pe.putArrayLength(len(r.blocks))
83
	if err != nil {
84
		return err
85
	}
86
	for topic, blocks := range r.blocks {
87
		err = pe.putString(topic)
88
		if err != nil {
89
			return err
90
		}
91
		err = pe.putArrayLength(len(blocks))
92
		if err != nil {
93
			return err
94
		}
95
		for partition, block := range blocks {
96
			pe.putInt32(partition)
97
			err = block.encode(pe, r.Version)
98
			if err != nil {
99
				return err
100
			}
101
		}
102
	}
103
	if r.Version >= 7 {
104
		err = pe.putArrayLength(len(r.forgotten))
105
		if err != nil {
106
			return err
107
		}
108
		for topic, partitions := range r.forgotten {
109
			err = pe.putString(topic)
110
			if err != nil {
111
				return err
112
			}
113
			err = pe.putArrayLength(len(partitions))
114
			if err != nil {
115
				return err
116
			}
117
			for _, partition := range partitions {
118
				pe.putInt32(partition)
119
			}
120
		}
121
	}
122
	if r.Version >= 11 {
123
		err = pe.putString(r.RackID)
124
		if err != nil {
125
			return err
126
		}
127
	}
128

129
	return nil
130
}
131

132
func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
133
	r.Version = version
134

135
	if _, err = pd.getInt32(); err != nil {
136
		return err
137
	}
138
	if r.MaxWaitTime, err = pd.getInt32(); err != nil {
139
		return err
140
	}
141
	if r.MinBytes, err = pd.getInt32(); err != nil {
142
		return err
143
	}
144
	if r.Version >= 3 {
145
		if r.MaxBytes, err = pd.getInt32(); err != nil {
146
			return err
147
		}
148
	}
149
	if r.Version >= 4 {
150
		isolation, err := pd.getInt8()
151
		if err != nil {
152
			return err
153
		}
154
		r.Isolation = IsolationLevel(isolation)
155
	}
156
	if r.Version >= 7 {
157
		r.SessionID, err = pd.getInt32()
158
		if err != nil {
159
			return err
160
		}
161
		r.SessionEpoch, err = pd.getInt32()
162
		if err != nil {
163
			return err
164
		}
165
	}
166
	topicCount, err := pd.getArrayLength()
167
	if err != nil {
168
		return err
169
	}
170
	if topicCount == 0 {
171
		return nil
172
	}
173
	r.blocks = make(map[string]map[int32]*fetchRequestBlock)
174
	for i := 0; i < topicCount; i++ {
175
		topic, err := pd.getString()
176
		if err != nil {
177
			return err
178
		}
179
		partitionCount, err := pd.getArrayLength()
180
		if err != nil {
181
			return err
182
		}
183
		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
184
		for j := 0; j < partitionCount; j++ {
185
			partition, err := pd.getInt32()
186
			if err != nil {
187
				return err
188
			}
189
			fetchBlock := &fetchRequestBlock{}
190
			if err = fetchBlock.decode(pd, r.Version); err != nil {
191
				return err
192
			}
193
			r.blocks[topic][partition] = fetchBlock
194
		}
195
	}
196

197
	if r.Version >= 7 {
198
		forgottenCount, err := pd.getArrayLength()
199
		if err != nil {
200
			return err
201
		}
202
		r.forgotten = make(map[string][]int32)
203
		for i := 0; i < forgottenCount; i++ {
204
			topic, err := pd.getString()
205
			if err != nil {
206
				return err
207
			}
208
			partitionCount, err := pd.getArrayLength()
209
			if err != nil {
210
				return err
211
			}
212
			r.forgotten[topic] = make([]int32, partitionCount)
213

214
			for j := 0; j < partitionCount; j++ {
215
				partition, err := pd.getInt32()
216
				if err != nil {
217
					return err
218
				}
219
				r.forgotten[topic][j] = partition
220
			}
221
		}
222
	}
223

224
	if r.Version >= 11 {
225
		r.RackID, err = pd.getString()
226
		if err != nil {
227
			return err
228
		}
229
	}
230

231
	return nil
232
}
233

234
func (r *FetchRequest) key() int16 {
235
	return 1
236
}
237

238
func (r *FetchRequest) version() int16 {
239
	return r.Version
240
}
241

242
func (r *FetchRequest) headerVersion() int16 {
243
	return 1
244
}
245

246
func (r *FetchRequest) requiredVersion() KafkaVersion {
247
	switch r.Version {
248
	case 0:
249
		return MinVersion
250
	case 1:
251
		return V0_9_0_0
252
	case 2:
253
		return V0_10_0_0
254
	case 3:
255
		return V0_10_1_0
256
	case 4, 5:
257
		return V0_11_0_0
258
	case 6:
259
		return V1_0_0_0
260
	case 7:
261
		return V1_1_0_0
262
	case 8:
263
		return V2_0_0_0
264
	case 9, 10:
265
		return V2_1_0_0
266
	case 11:
267
		return V2_3_0_0
268
	default:
269
		return MaxVersion
270
	}
271
}
272

273
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
274
	if r.blocks == nil {
275
		r.blocks = make(map[string]map[int32]*fetchRequestBlock)
276
	}
277

278
	if r.Version >= 7 && r.forgotten == nil {
279
		r.forgotten = make(map[string][]int32)
280
	}
281

282
	if r.blocks[topic] == nil {
283
		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
284
	}
285

286
	tmp := new(fetchRequestBlock)
287
	tmp.Version = r.Version
288
	tmp.maxBytes = maxBytes
289
	tmp.fetchOffset = fetchOffset
290
	if r.Version >= 9 {
291
		tmp.currentLeaderEpoch = int32(-1)
292
	}
293

294
	r.blocks[topic][partitionID] = tmp
295
}
296

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.