cubefs
114 строк · 2.2 Кб
1package sarama2
3type OffsetCommitResponse struct {4Version int165ThrottleTimeMs int326Errors map[string]map[int32]KError7}
8
9func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {10if r.Errors == nil {11r.Errors = make(map[string]map[int32]KError)12}13partitions := r.Errors[topic]14if partitions == nil {15partitions = make(map[int32]KError)16r.Errors[topic] = partitions17}18partitions[partition] = kerror19}
20
21func (r *OffsetCommitResponse) encode(pe packetEncoder) error {22if r.Version >= 3 {23pe.putInt32(r.ThrottleTimeMs)24}25if err := pe.putArrayLength(len(r.Errors)); err != nil {26return err27}28for topic, partitions := range r.Errors {29if err := pe.putString(topic); err != nil {30return err31}32if err := pe.putArrayLength(len(partitions)); err != nil {33return err34}35for partition, kerror := range partitions {36pe.putInt32(partition)37pe.putInt16(int16(kerror))38}39}40return nil41}
42
43func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {44r.Version = version45
46if version >= 3 {47r.ThrottleTimeMs, err = pd.getInt32()48if err != nil {49return err50}51}52
53numTopics, err := pd.getArrayLength()54if err != nil || numTopics == 0 {55return err56}57
58r.Errors = make(map[string]map[int32]KError, numTopics)59for i := 0; i < numTopics; i++ {60name, err := pd.getString()61if err != nil {62return err63}64
65numErrors, err := pd.getArrayLength()66if err != nil {67return err68}69
70r.Errors[name] = make(map[int32]KError, numErrors)71
72for j := 0; j < numErrors; j++ {73id, err := pd.getInt32()74if err != nil {75return err76}77
78tmp, err := pd.getInt16()79if err != nil {80return err81}82r.Errors[name][id] = KError(tmp)83}84}85
86return nil87}
88
89func (r *OffsetCommitResponse) key() int16 {90return 891}
92
93func (r *OffsetCommitResponse) version() int16 {94return r.Version95}
96
97func (r *OffsetCommitResponse) headerVersion() int16 {98return 099}
100
101func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {102switch r.Version {103case 1:104return V0_8_2_0105case 2:106return V0_9_0_0107case 3:108return V0_11_0_0109case 4:110return V2_0_0_0111default:112return MinVersion113}114}
115