cubefs
139 строк · 2.5 Кб
1package sarama
2
3type JoinGroupResponse struct {
4Version int16
5ThrottleTime int32
6Err KError
7GenerationId int32
8GroupProtocol string
9LeaderId string
10MemberId string
11Members map[string][]byte
12}
13
14func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
15members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
16for id, bin := range r.Members {
17meta := new(ConsumerGroupMemberMetadata)
18if err := decode(bin, meta); err != nil {
19return nil, err
20}
21members[id] = *meta
22}
23return members, nil
24}
25
26func (r *JoinGroupResponse) encode(pe packetEncoder) error {
27if r.Version >= 2 {
28pe.putInt32(r.ThrottleTime)
29}
30pe.putInt16(int16(r.Err))
31pe.putInt32(r.GenerationId)
32
33if err := pe.putString(r.GroupProtocol); err != nil {
34return err
35}
36if err := pe.putString(r.LeaderId); err != nil {
37return err
38}
39if err := pe.putString(r.MemberId); err != nil {
40return err
41}
42
43if err := pe.putArrayLength(len(r.Members)); err != nil {
44return err
45}
46
47for memberId, memberMetadata := range r.Members {
48if err := pe.putString(memberId); err != nil {
49return err
50}
51
52if err := pe.putBytes(memberMetadata); err != nil {
53return err
54}
55}
56
57return nil
58}
59
60func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
61r.Version = version
62
63if version >= 2 {
64if r.ThrottleTime, err = pd.getInt32(); err != nil {
65return
66}
67}
68
69kerr, err := pd.getInt16()
70if err != nil {
71return err
72}
73
74r.Err = KError(kerr)
75
76if r.GenerationId, err = pd.getInt32(); err != nil {
77return
78}
79
80if r.GroupProtocol, err = pd.getString(); err != nil {
81return
82}
83
84if r.LeaderId, err = pd.getString(); err != nil {
85return
86}
87
88if r.MemberId, err = pd.getString(); err != nil {
89return
90}
91
92n, err := pd.getArrayLength()
93if err != nil {
94return err
95}
96if n == 0 {
97return nil
98}
99
100r.Members = make(map[string][]byte)
101for i := 0; i < n; i++ {
102memberId, err := pd.getString()
103if err != nil {
104return err
105}
106
107memberMetadata, err := pd.getBytes()
108if err != nil {
109return err
110}
111
112r.Members[memberId] = memberMetadata
113}
114
115return nil
116}
117
118func (r *JoinGroupResponse) key() int16 {
119return 11
120}
121
122func (r *JoinGroupResponse) version() int16 {
123return r.Version
124}
125
126func (r *JoinGroupResponse) headerVersion() int16 {
127return 0
128}
129
130func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
131switch r.Version {
132case 2:
133return V0_11_0_0
134case 1:
135return V0_10_1_0
136default:
137return V0_9_0_0
138}
139}
140