cubefs
214 строк · 5.3 Кб
1package sarama
2
3import "errors"
4
5// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
6// tells the broker to set the timestamp to the time at which the request was received.
7// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
8const ReceiveTime int64 = -1
9
10// GroupGenerationUndefined is a special value for the group generation field of
11// Offset Commit Requests that should be used when a consumer group does not rely
12// on Kafka for partition management.
13const GroupGenerationUndefined = -1
14
15type offsetCommitRequestBlock struct {
16offset int64
17timestamp int64
18metadata string
19}
20
21func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
22pe.putInt64(b.offset)
23if version == 1 {
24pe.putInt64(b.timestamp)
25} else if b.timestamp != 0 {
26Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
27}
28
29return pe.putString(b.metadata)
30}
31
32func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
33if b.offset, err = pd.getInt64(); err != nil {
34return err
35}
36if version == 1 {
37if b.timestamp, err = pd.getInt64(); err != nil {
38return err
39}
40}
41b.metadata, err = pd.getString()
42return err
43}
44
45type OffsetCommitRequest struct {
46ConsumerGroup string
47ConsumerGroupGeneration int32 // v1 or later
48ConsumerID string // v1 or later
49RetentionTime int64 // v2 or later
50
51// Version can be:
52// - 0 (kafka 0.8.1 and later)
53// - 1 (kafka 0.8.2 and later)
54// - 2 (kafka 0.9.0 and later)
55// - 3 (kafka 0.11.0 and later)
56// - 4 (kafka 2.0.0 and later)
57Version int16
58blocks map[string]map[int32]*offsetCommitRequestBlock
59}
60
61func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
62if r.Version < 0 || r.Version > 4 {
63return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
64}
65
66if err := pe.putString(r.ConsumerGroup); err != nil {
67return err
68}
69
70if r.Version >= 1 {
71pe.putInt32(r.ConsumerGroupGeneration)
72if err := pe.putString(r.ConsumerID); err != nil {
73return err
74}
75} else {
76if r.ConsumerGroupGeneration != 0 {
77Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
78}
79if r.ConsumerID != "" {
80Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
81}
82}
83
84if r.Version >= 2 {
85pe.putInt64(r.RetentionTime)
86} else if r.RetentionTime != 0 {
87Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
88}
89
90if err := pe.putArrayLength(len(r.blocks)); err != nil {
91return err
92}
93for topic, partitions := range r.blocks {
94if err := pe.putString(topic); err != nil {
95return err
96}
97if err := pe.putArrayLength(len(partitions)); err != nil {
98return err
99}
100for partition, block := range partitions {
101pe.putInt32(partition)
102if err := block.encode(pe, r.Version); err != nil {
103return err
104}
105}
106}
107return nil
108}
109
110func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
111r.Version = version
112
113if r.ConsumerGroup, err = pd.getString(); err != nil {
114return err
115}
116
117if r.Version >= 1 {
118if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
119return err
120}
121if r.ConsumerID, err = pd.getString(); err != nil {
122return err
123}
124}
125
126if r.Version >= 2 {
127if r.RetentionTime, err = pd.getInt64(); err != nil {
128return err
129}
130}
131
132topicCount, err := pd.getArrayLength()
133if err != nil {
134return err
135}
136if topicCount == 0 {
137return nil
138}
139r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
140for i := 0; i < topicCount; i++ {
141topic, err := pd.getString()
142if err != nil {
143return err
144}
145partitionCount, err := pd.getArrayLength()
146if err != nil {
147return err
148}
149r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
150for j := 0; j < partitionCount; j++ {
151partition, err := pd.getInt32()
152if err != nil {
153return err
154}
155block := &offsetCommitRequestBlock{}
156if err := block.decode(pd, r.Version); err != nil {
157return err
158}
159r.blocks[topic][partition] = block
160}
161}
162return nil
163}
164
165func (r *OffsetCommitRequest) key() int16 {
166return 8
167}
168
169func (r *OffsetCommitRequest) version() int16 {
170return r.Version
171}
172
173func (r *OffsetCommitRequest) headerVersion() int16 {
174return 1
175}
176
177func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
178switch r.Version {
179case 1:
180return V0_8_2_0
181case 2:
182return V0_9_0_0
183case 3:
184return V0_11_0_0
185case 4:
186return V2_0_0_0
187default:
188return MinVersion
189}
190}
191
192func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
193if r.blocks == nil {
194r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
195}
196
197if r.blocks[topic] == nil {
198r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
199}
200
201r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
202}
203
204func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
205partitions := r.blocks[topic]
206if partitions == nil {
207return 0, "", errors.New("no such offset")
208}
209block := partitions[partitionID]
210if block == nil {
211return 0, "", errors.New("no such offset")
212}
213return block.offset, block.metadata, nil
214}
215