cubefs
327 строк · 5.7 Кб
1package sarama2
3import (4"fmt"5"time"6)
7
8type ConfigSource int89
10func (s ConfigSource) String() string {11switch s {12case SourceUnknown:13return "Unknown"14case SourceTopic:15return "Topic"16case SourceDynamicBroker:17return "DynamicBroker"18case SourceDynamicDefaultBroker:19return "DynamicDefaultBroker"20case SourceStaticBroker:21return "StaticBroker"22case SourceDefault:23return "Default"24}25return fmt.Sprintf("Source Invalid: %d", int(s))26}
27
28const (29SourceUnknown ConfigSource = iota30SourceTopic
31SourceDynamicBroker
32SourceDynamicDefaultBroker
33SourceStaticBroker
34SourceDefault
35)
36
37type DescribeConfigsResponse struct {38Version int1639ThrottleTime time.Duration40Resources []*ResourceResponse41}
42
43type ResourceResponse struct {44ErrorCode int1645ErrorMsg string46Type ConfigResourceType
47Name string48Configs []*ConfigEntry49}
50
51type ConfigEntry struct {52Name string53Value string54ReadOnly bool55Default bool56Source ConfigSource
57Sensitive bool58Synonyms []*ConfigSynonym59}
60
61type ConfigSynonym struct {62ConfigName string63ConfigValue string64Source ConfigSource
65}
66
67func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {68pe.putInt32(int32(r.ThrottleTime / time.Millisecond))69if err = pe.putArrayLength(len(r.Resources)); err != nil {70return err71}72
73for _, c := range r.Resources {74if err = c.encode(pe, r.Version); err != nil {75return err76}77}78
79return nil80}
81
82func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {83r.Version = version84throttleTime, err := pd.getInt32()85if err != nil {86return err87}88r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond89
90n, err := pd.getArrayLength()91if err != nil {92return err93}94
95r.Resources = make([]*ResourceResponse, n)96for i := 0; i < n; i++ {97rr := &ResourceResponse{}98if err := rr.decode(pd, version); err != nil {99return err100}101r.Resources[i] = rr102}103
104return nil105}
106
107func (r *DescribeConfigsResponse) key() int16 {108return 32109}
110
111func (r *DescribeConfigsResponse) version() int16 {112return r.Version113}
114
115func (r *DescribeConfigsResponse) headerVersion() int16 {116return 0117}
118
119func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {120switch r.Version {121case 1:122return V1_0_0_0123case 2:124return V2_0_0_0125default:126return V0_11_0_0127}128}
129
130func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {131pe.putInt16(r.ErrorCode)132
133if err = pe.putString(r.ErrorMsg); err != nil {134return err135}136
137pe.putInt8(int8(r.Type))138
139if err = pe.putString(r.Name); err != nil {140return err141}142
143if err = pe.putArrayLength(len(r.Configs)); err != nil {144return err145}146
147for _, c := range r.Configs {148if err = c.encode(pe, version); err != nil {149return err150}151}152return nil153}
154
155func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {156ec, err := pd.getInt16()157if err != nil {158return err159}160r.ErrorCode = ec161
162em, err := pd.getString()163if err != nil {164return err165}166r.ErrorMsg = em167
168t, err := pd.getInt8()169if err != nil {170return err171}172r.Type = ConfigResourceType(t)173
174name, err := pd.getString()175if err != nil {176return err177}178r.Name = name179
180n, err := pd.getArrayLength()181if err != nil {182return err183}184
185r.Configs = make([]*ConfigEntry, n)186for i := 0; i < n; i++ {187c := &ConfigEntry{}188if err := c.decode(pd, version); err != nil {189return err190}191r.Configs[i] = c192}193return nil194}
195
196func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {197if err = pe.putString(r.Name); err != nil {198return err199}200
201if err = pe.putString(r.Value); err != nil {202return err203}204
205pe.putBool(r.ReadOnly)206
207if version <= 0 {208pe.putBool(r.Default)209pe.putBool(r.Sensitive)210} else {211pe.putInt8(int8(r.Source))212pe.putBool(r.Sensitive)213
214if err := pe.putArrayLength(len(r.Synonyms)); err != nil {215return err216}217for _, c := range r.Synonyms {218if err = c.encode(pe, version); err != nil {219return err220}221}222}223
224return nil225}
226
227// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
228func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {229if version == 0 {230r.Source = SourceUnknown231}232name, err := pd.getString()233if err != nil {234return err235}236r.Name = name237
238value, err := pd.getString()239if err != nil {240return err241}242r.Value = value243
244read, err := pd.getBool()245if err != nil {246return err247}248r.ReadOnly = read249
250if version == 0 {251defaultB, err := pd.getBool()252if err != nil {253return err254}255r.Default = defaultB256if defaultB {257r.Source = SourceDefault258}259} else {260source, err := pd.getInt8()261if err != nil {262return err263}264r.Source = ConfigSource(source)265r.Default = r.Source == SourceDefault266}267
268sensitive, err := pd.getBool()269if err != nil {270return err271}272r.Sensitive = sensitive273
274if version > 0 {275n, err := pd.getArrayLength()276if err != nil {277return err278}279r.Synonyms = make([]*ConfigSynonym, n)280
281for i := 0; i < n; i++ {282s := &ConfigSynonym{}283if err := s.decode(pd, version); err != nil {284return err285}286r.Synonyms[i] = s287}288}289return nil290}
291
292func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {293err = pe.putString(c.ConfigName)294if err != nil {295return err296}297
298err = pe.putString(c.ConfigValue)299if err != nil {300return err301}302
303pe.putInt8(int8(c.Source))304
305return nil306}
307
308func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {309name, err := pd.getString()310if err != nil {311return err312}313c.ConfigName = name314
315value, err := pd.getString()316if err != nil {317return err318}319c.ConfigValue = value320
321source, err := pd.getInt8()322if err != nil {323return err324}325c.Source = ConfigSource(source)326return nil327}
328