cubefs
197 строк · 4.0 Кб
1package sarama
2
3type DescribeGroupsResponse struct {
4Groups []*GroupDescription
5}
6
7func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
8if err := pe.putArrayLength(len(r.Groups)); err != nil {
9return err
10}
11
12for _, groupDescription := range r.Groups {
13if err := groupDescription.encode(pe); err != nil {
14return err
15}
16}
17
18return nil
19}
20
21func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
22n, err := pd.getArrayLength()
23if err != nil {
24return err
25}
26
27r.Groups = make([]*GroupDescription, n)
28for i := 0; i < n; i++ {
29r.Groups[i] = new(GroupDescription)
30if err := r.Groups[i].decode(pd); err != nil {
31return err
32}
33}
34
35return nil
36}
37
38func (r *DescribeGroupsResponse) key() int16 {
39return 15
40}
41
42func (r *DescribeGroupsResponse) version() int16 {
43return 0
44}
45
46func (r *DescribeGroupsResponse) headerVersion() int16 {
47return 0
48}
49
50func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
51return V0_9_0_0
52}
53
54type GroupDescription struct {
55Err KError
56GroupId string
57State string
58ProtocolType string
59Protocol string
60Members map[string]*GroupMemberDescription
61}
62
63func (gd *GroupDescription) encode(pe packetEncoder) error {
64pe.putInt16(int16(gd.Err))
65
66if err := pe.putString(gd.GroupId); err != nil {
67return err
68}
69if err := pe.putString(gd.State); err != nil {
70return err
71}
72if err := pe.putString(gd.ProtocolType); err != nil {
73return err
74}
75if err := pe.putString(gd.Protocol); err != nil {
76return err
77}
78
79if err := pe.putArrayLength(len(gd.Members)); err != nil {
80return err
81}
82
83for memberId, groupMemberDescription := range gd.Members {
84if err := pe.putString(memberId); err != nil {
85return err
86}
87if err := groupMemberDescription.encode(pe); err != nil {
88return err
89}
90}
91
92return nil
93}
94
95func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
96kerr, err := pd.getInt16()
97if err != nil {
98return err
99}
100
101gd.Err = KError(kerr)
102
103if gd.GroupId, err = pd.getString(); err != nil {
104return
105}
106if gd.State, err = pd.getString(); err != nil {
107return
108}
109if gd.ProtocolType, err = pd.getString(); err != nil {
110return
111}
112if gd.Protocol, err = pd.getString(); err != nil {
113return
114}
115
116n, err := pd.getArrayLength()
117if err != nil {
118return err
119}
120if n == 0 {
121return nil
122}
123
124gd.Members = make(map[string]*GroupMemberDescription)
125for i := 0; i < n; i++ {
126memberId, err := pd.getString()
127if err != nil {
128return err
129}
130
131gd.Members[memberId] = new(GroupMemberDescription)
132if err := gd.Members[memberId].decode(pd); err != nil {
133return err
134}
135}
136
137return nil
138}
139
140type GroupMemberDescription struct {
141ClientId string
142ClientHost string
143MemberMetadata []byte
144MemberAssignment []byte
145}
146
147func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
148if err := pe.putString(gmd.ClientId); err != nil {
149return err
150}
151if err := pe.putString(gmd.ClientHost); err != nil {
152return err
153}
154if err := pe.putBytes(gmd.MemberMetadata); err != nil {
155return err
156}
157if err := pe.putBytes(gmd.MemberAssignment); err != nil {
158return err
159}
160
161return nil
162}
163
164func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
165if gmd.ClientId, err = pd.getString(); err != nil {
166return
167}
168if gmd.ClientHost, err = pd.getString(); err != nil {
169return
170}
171if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
172return
173}
174if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
175return
176}
177
178return nil
179}
180
181func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
182if len(gmd.MemberAssignment) == 0 {
183return nil, nil
184}
185assignment := new(ConsumerGroupMemberAssignment)
186err := decode(gmd.MemberAssignment, assignment)
187return assignment, err
188}
189
190func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) {
191if len(gmd.MemberMetadata) == 0 {
192return nil, nil
193}
194metadata := new(ConsumerGroupMemberMetadata)
195err := decode(gmd.MemberMetadata, metadata)
196return metadata, err
197}
198