cubefs
192 строки · 3.5 Кб
1package sarama
2
3type OffsetResponseBlock struct {
4Err KError
5Offsets []int64 // Version 0
6Offset int64 // Version 1
7Timestamp int64 // Version 1
8}
9
10func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11tmp, err := pd.getInt16()
12if err != nil {
13return err
14}
15b.Err = KError(tmp)
16
17if version == 0 {
18b.Offsets, err = pd.getInt64Array()
19
20return err
21}
22
23b.Timestamp, err = pd.getInt64()
24if err != nil {
25return err
26}
27
28b.Offset, err = pd.getInt64()
29if err != nil {
30return err
31}
32
33// For backwards compatibility put the offset in the offsets array too
34b.Offsets = []int64{b.Offset}
35
36return nil
37}
38
39func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
40pe.putInt16(int16(b.Err))
41
42if version == 0 {
43return pe.putInt64Array(b.Offsets)
44}
45
46pe.putInt64(b.Timestamp)
47pe.putInt64(b.Offset)
48
49return nil
50}
51
52type OffsetResponse struct {
53Version int16
54ThrottleTimeMs int32
55Blocks map[string]map[int32]*OffsetResponseBlock
56}
57
58func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
59if version >= 2 {
60r.ThrottleTimeMs, err = pd.getInt32()
61if err != nil {
62return err
63}
64}
65
66numTopics, err := pd.getArrayLength()
67if err != nil {
68return err
69}
70
71r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
72for i := 0; i < numTopics; i++ {
73name, err := pd.getString()
74if err != nil {
75return err
76}
77
78numBlocks, err := pd.getArrayLength()
79if err != nil {
80return err
81}
82
83r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
84
85for j := 0; j < numBlocks; j++ {
86id, err := pd.getInt32()
87if err != nil {
88return err
89}
90
91block := new(OffsetResponseBlock)
92err = block.decode(pd, version)
93if err != nil {
94return err
95}
96r.Blocks[name][id] = block
97}
98}
99
100return nil
101}
102
103func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
104if r.Blocks == nil {
105return nil
106}
107
108if r.Blocks[topic] == nil {
109return nil
110}
111
112return r.Blocks[topic][partition]
113}
114
115/*
116// [0 0 0 1 ntopics
1170 8 109 121 95 116 111 112 105 99 topic
1180 0 0 1 npartitions
1190 0 0 0 id
1200 0
121
1220 0 0 1 0 0 0 0
1230 1 1 1 0 0 0 1
1240 8 109 121 95 116 111 112
125105 99 0 0 0 1 0 0
1260 0 0 0 0 0 0 1
1270 0 0 0 0 1 1 1] <nil>
128
129*/
130func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
131if r.Version >= 2 {
132pe.putInt32(r.ThrottleTimeMs)
133}
134
135if err = pe.putArrayLength(len(r.Blocks)); err != nil {
136return err
137}
138
139for topic, partitions := range r.Blocks {
140if err = pe.putString(topic); err != nil {
141return err
142}
143if err = pe.putArrayLength(len(partitions)); err != nil {
144return err
145}
146for partition, block := range partitions {
147pe.putInt32(partition)
148if err = block.encode(pe, r.version()); err != nil {
149return err
150}
151}
152}
153
154return nil
155}
156
157func (r *OffsetResponse) key() int16 {
158return 2
159}
160
161func (r *OffsetResponse) version() int16 {
162return r.Version
163}
164
165func (r *OffsetResponse) headerVersion() int16 {
166return 0
167}
168
169func (r *OffsetResponse) requiredVersion() KafkaVersion {
170switch r.Version {
171case 1:
172return V0_10_1_0
173case 2:
174return V0_11_0_0
175default:
176return MinVersion
177}
178}
179
180// testing API
181
182func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
183if r.Blocks == nil {
184r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
185}
186byTopic, ok := r.Blocks[topic]
187if !ok {
188byTopic = make(map[int32]*OffsetResponseBlock)
189r.Blocks[topic] = byTopic
190}
191byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
192}
193