cubefs
280 строк · 4.9 Кб
1package sarama
2
3type OffsetFetchResponseBlock struct {
4Offset int64
5LeaderEpoch int32
6Metadata string
7Err KError
8}
9
10func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11isFlexible := version >= 6
12
13b.Offset, err = pd.getInt64()
14if err != nil {
15return err
16}
17
18if version >= 5 {
19b.LeaderEpoch, err = pd.getInt32()
20if err != nil {
21return err
22}
23}
24
25if isFlexible {
26b.Metadata, err = pd.getCompactString()
27} else {
28b.Metadata, err = pd.getString()
29}
30if err != nil {
31return err
32}
33
34tmp, err := pd.getInt16()
35if err != nil {
36return err
37}
38b.Err = KError(tmp)
39
40if isFlexible {
41if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
42return err
43}
44}
45
46return nil
47}
48
49func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
50isFlexible := version >= 6
51pe.putInt64(b.Offset)
52
53if version >= 5 {
54pe.putInt32(b.LeaderEpoch)
55}
56if isFlexible {
57err = pe.putCompactString(b.Metadata)
58} else {
59err = pe.putString(b.Metadata)
60}
61if err != nil {
62return err
63}
64
65pe.putInt16(int16(b.Err))
66
67if isFlexible {
68pe.putEmptyTaggedFieldArray()
69}
70
71return nil
72}
73
74type OffsetFetchResponse struct {
75Version int16
76ThrottleTimeMs int32
77Blocks map[string]map[int32]*OffsetFetchResponseBlock
78Err KError
79}
80
81func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
82isFlexible := r.Version >= 6
83
84if r.Version >= 3 {
85pe.putInt32(r.ThrottleTimeMs)
86}
87if isFlexible {
88pe.putCompactArrayLength(len(r.Blocks))
89} else {
90err = pe.putArrayLength(len(r.Blocks))
91}
92if err != nil {
93return err
94}
95
96for topic, partitions := range r.Blocks {
97if isFlexible {
98err = pe.putCompactString(topic)
99} else {
100err = pe.putString(topic)
101}
102if err != nil {
103return err
104}
105
106if isFlexible {
107pe.putCompactArrayLength(len(partitions))
108} else {
109err = pe.putArrayLength(len(partitions))
110}
111if err != nil {
112return err
113}
114for partition, block := range partitions {
115pe.putInt32(partition)
116if err := block.encode(pe, r.Version); err != nil {
117return err
118}
119}
120if isFlexible {
121pe.putEmptyTaggedFieldArray()
122}
123}
124if r.Version >= 2 {
125pe.putInt16(int16(r.Err))
126}
127if isFlexible {
128pe.putEmptyTaggedFieldArray()
129}
130return nil
131}
132
133func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
134r.Version = version
135isFlexible := version >= 6
136
137if version >= 3 {
138r.ThrottleTimeMs, err = pd.getInt32()
139if err != nil {
140return err
141}
142}
143
144var numTopics int
145if isFlexible {
146numTopics, err = pd.getCompactArrayLength()
147} else {
148numTopics, err = pd.getArrayLength()
149}
150if err != nil {
151return err
152}
153
154if numTopics > 0 {
155r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
156for i := 0; i < numTopics; i++ {
157var name string
158if isFlexible {
159name, err = pd.getCompactString()
160} else {
161name, err = pd.getString()
162}
163if err != nil {
164return err
165}
166
167var numBlocks int
168if isFlexible {
169numBlocks, err = pd.getCompactArrayLength()
170} else {
171numBlocks, err = pd.getArrayLength()
172}
173if err != nil {
174return err
175}
176
177r.Blocks[name] = nil
178if numBlocks > 0 {
179r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
180}
181for j := 0; j < numBlocks; j++ {
182id, err := pd.getInt32()
183if err != nil {
184return err
185}
186
187block := new(OffsetFetchResponseBlock)
188err = block.decode(pd, version)
189if err != nil {
190return err
191}
192
193r.Blocks[name][id] = block
194}
195
196if isFlexible {
197if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
198return err
199}
200}
201}
202}
203
204if version >= 2 {
205kerr, err := pd.getInt16()
206if err != nil {
207return err
208}
209r.Err = KError(kerr)
210}
211
212if isFlexible {
213if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
214return err
215}
216}
217
218return nil
219}
220
221func (r *OffsetFetchResponse) key() int16 {
222return 9
223}
224
225func (r *OffsetFetchResponse) version() int16 {
226return r.Version
227}
228
229func (r *OffsetFetchResponse) headerVersion() int16 {
230if r.Version >= 6 {
231return 1
232}
233
234return 0
235}
236
237func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
238switch r.Version {
239case 1:
240return V0_8_2_0
241case 2:
242return V0_10_2_0
243case 3:
244return V0_11_0_0
245case 4:
246return V2_0_0_0
247case 5:
248return V2_1_0_0
249case 6:
250return V2_4_0_0
251case 7:
252return V2_5_0_0
253default:
254return MinVersion
255}
256}
257
258func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
259if r.Blocks == nil {
260return nil
261}
262
263if r.Blocks[topic] == nil {
264return nil
265}
266
267return r.Blocks[topic][partition]
268}
269
270func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
271if r.Blocks == nil {
272r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
273}
274partitions := r.Blocks[topic]
275if partitions == nil {
276partitions = make(map[int32]*OffsetFetchResponseBlock)
277r.Blocks[topic] = partitions
278}
279partitions[partition] = block
280}
281