cubefs
167 строк · 3.4 Кб
1package sarama2
3type GroupProtocol struct {4Name string5Metadata []byte6}
7
8func (p *GroupProtocol) decode(pd packetDecoder) (err error) {9p.Name, err = pd.getString()10if err != nil {11return err12}13p.Metadata, err = pd.getBytes()14return err15}
16
17func (p *GroupProtocol) encode(pe packetEncoder) (err error) {18if err := pe.putString(p.Name); err != nil {19return err20}21if err := pe.putBytes(p.Metadata); err != nil {22return err23}24return nil25}
26
27type JoinGroupRequest struct {28Version int1629GroupId string30SessionTimeout int3231RebalanceTimeout int3232MemberId string33ProtocolType string34GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols35OrderedGroupProtocols []*GroupProtocol36}
37
38func (r *JoinGroupRequest) encode(pe packetEncoder) error {39if err := pe.putString(r.GroupId); err != nil {40return err41}42pe.putInt32(r.SessionTimeout)43if r.Version >= 1 {44pe.putInt32(r.RebalanceTimeout)45}46if err := pe.putString(r.MemberId); err != nil {47return err48}49if err := pe.putString(r.ProtocolType); err != nil {50return err51}52
53if len(r.GroupProtocols) > 0 {54if len(r.OrderedGroupProtocols) > 0 {55return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}56}57
58if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {59return err60}61for name, metadata := range r.GroupProtocols {62if err := pe.putString(name); err != nil {63return err64}65if err := pe.putBytes(metadata); err != nil {66return err67}68}69} else {70if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {71return err72}73for _, protocol := range r.OrderedGroupProtocols {74if err := protocol.encode(pe); err != nil {75return err76}77}78}79
80return nil81}
82
83func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {84r.Version = version85
86if r.GroupId, err = pd.getString(); err != nil {87return88}89
90if r.SessionTimeout, err = pd.getInt32(); err != nil {91return92}93
94if version >= 1 {95if r.RebalanceTimeout, err = pd.getInt32(); err != nil {96return err97}98}99
100if r.MemberId, err = pd.getString(); err != nil {101return102}103
104if r.ProtocolType, err = pd.getString(); err != nil {105return106}107
108n, err := pd.getArrayLength()109if err != nil {110return err111}112if n == 0 {113return nil114}115
116r.GroupProtocols = make(map[string][]byte)117for i := 0; i < n; i++ {118protocol := &GroupProtocol{}119if err := protocol.decode(pd); err != nil {120return err121}122r.GroupProtocols[protocol.Name] = protocol.Metadata123r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)124}125
126return nil127}
128
129func (r *JoinGroupRequest) key() int16 {130return 11131}
132
133func (r *JoinGroupRequest) version() int16 {134return r.Version135}
136
137func (r *JoinGroupRequest) headerVersion() int16 {138return 1139}
140
141func (r *JoinGroupRequest) requiredVersion() KafkaVersion {142switch r.Version {143case 2:144return V0_11_0_0145case 1:146return V0_10_1_0147default:148return V0_9_0_0149}150}
151
152func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {153r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{154Name: name,155Metadata: metadata,156})157}
158
159func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {160bin, err := encode(metadata, nil)161if err != nil {162return err163}164
165r.AddGroupProtocol(name, bin)166return nil167}
168