cubefs

Форк
0
/
describe_configs_response.go 
327 строк · 5.7 Кб
1
package sarama
2

3
import (
4
	"fmt"
5
	"time"
6
)
7

8
type ConfigSource int8
9

10
func (s ConfigSource) String() string {
11
	switch s {
12
	case SourceUnknown:
13
		return "Unknown"
14
	case SourceTopic:
15
		return "Topic"
16
	case SourceDynamicBroker:
17
		return "DynamicBroker"
18
	case SourceDynamicDefaultBroker:
19
		return "DynamicDefaultBroker"
20
	case SourceStaticBroker:
21
		return "StaticBroker"
22
	case SourceDefault:
23
		return "Default"
24
	}
25
	return fmt.Sprintf("Source Invalid: %d", int(s))
26
}
27

28
const (
29
	SourceUnknown ConfigSource = iota
30
	SourceTopic
31
	SourceDynamicBroker
32
	SourceDynamicDefaultBroker
33
	SourceStaticBroker
34
	SourceDefault
35
)
36

37
type DescribeConfigsResponse struct {
38
	Version      int16
39
	ThrottleTime time.Duration
40
	Resources    []*ResourceResponse
41
}
42

43
type ResourceResponse struct {
44
	ErrorCode int16
45
	ErrorMsg  string
46
	Type      ConfigResourceType
47
	Name      string
48
	Configs   []*ConfigEntry
49
}
50

51
type ConfigEntry struct {
52
	Name      string
53
	Value     string
54
	ReadOnly  bool
55
	Default   bool
56
	Source    ConfigSource
57
	Sensitive bool
58
	Synonyms  []*ConfigSynonym
59
}
60

61
type ConfigSynonym struct {
62
	ConfigName  string
63
	ConfigValue string
64
	Source      ConfigSource
65
}
66

67
func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
68
	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
69
	if err = pe.putArrayLength(len(r.Resources)); err != nil {
70
		return err
71
	}
72

73
	for _, c := range r.Resources {
74
		if err = c.encode(pe, r.Version); err != nil {
75
			return err
76
		}
77
	}
78

79
	return nil
80
}
81

82
func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
83
	r.Version = version
84
	throttleTime, err := pd.getInt32()
85
	if err != nil {
86
		return err
87
	}
88
	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
89

90
	n, err := pd.getArrayLength()
91
	if err != nil {
92
		return err
93
	}
94

95
	r.Resources = make([]*ResourceResponse, n)
96
	for i := 0; i < n; i++ {
97
		rr := &ResourceResponse{}
98
		if err := rr.decode(pd, version); err != nil {
99
			return err
100
		}
101
		r.Resources[i] = rr
102
	}
103

104
	return nil
105
}
106

107
func (r *DescribeConfigsResponse) key() int16 {
108
	return 32
109
}
110

111
func (r *DescribeConfigsResponse) version() int16 {
112
	return r.Version
113
}
114

115
func (r *DescribeConfigsResponse) headerVersion() int16 {
116
	return 0
117
}
118

119
func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
120
	switch r.Version {
121
	case 1:
122
		return V1_0_0_0
123
	case 2:
124
		return V2_0_0_0
125
	default:
126
		return V0_11_0_0
127
	}
128
}
129

130
func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
131
	pe.putInt16(r.ErrorCode)
132

133
	if err = pe.putString(r.ErrorMsg); err != nil {
134
		return err
135
	}
136

137
	pe.putInt8(int8(r.Type))
138

139
	if err = pe.putString(r.Name); err != nil {
140
		return err
141
	}
142

143
	if err = pe.putArrayLength(len(r.Configs)); err != nil {
144
		return err
145
	}
146

147
	for _, c := range r.Configs {
148
		if err = c.encode(pe, version); err != nil {
149
			return err
150
		}
151
	}
152
	return nil
153
}
154

155
func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
156
	ec, err := pd.getInt16()
157
	if err != nil {
158
		return err
159
	}
160
	r.ErrorCode = ec
161

162
	em, err := pd.getString()
163
	if err != nil {
164
		return err
165
	}
166
	r.ErrorMsg = em
167

168
	t, err := pd.getInt8()
169
	if err != nil {
170
		return err
171
	}
172
	r.Type = ConfigResourceType(t)
173

174
	name, err := pd.getString()
175
	if err != nil {
176
		return err
177
	}
178
	r.Name = name
179

180
	n, err := pd.getArrayLength()
181
	if err != nil {
182
		return err
183
	}
184

185
	r.Configs = make([]*ConfigEntry, n)
186
	for i := 0; i < n; i++ {
187
		c := &ConfigEntry{}
188
		if err := c.decode(pd, version); err != nil {
189
			return err
190
		}
191
		r.Configs[i] = c
192
	}
193
	return nil
194
}
195

196
func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
197
	if err = pe.putString(r.Name); err != nil {
198
		return err
199
	}
200

201
	if err = pe.putString(r.Value); err != nil {
202
		return err
203
	}
204

205
	pe.putBool(r.ReadOnly)
206

207
	if version <= 0 {
208
		pe.putBool(r.Default)
209
		pe.putBool(r.Sensitive)
210
	} else {
211
		pe.putInt8(int8(r.Source))
212
		pe.putBool(r.Sensitive)
213

214
		if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
215
			return err
216
		}
217
		for _, c := range r.Synonyms {
218
			if err = c.encode(pe, version); err != nil {
219
				return err
220
			}
221
		}
222
	}
223

224
	return nil
225
}
226

227
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
228
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
229
	if version == 0 {
230
		r.Source = SourceUnknown
231
	}
232
	name, err := pd.getString()
233
	if err != nil {
234
		return err
235
	}
236
	r.Name = name
237

238
	value, err := pd.getString()
239
	if err != nil {
240
		return err
241
	}
242
	r.Value = value
243

244
	read, err := pd.getBool()
245
	if err != nil {
246
		return err
247
	}
248
	r.ReadOnly = read
249

250
	if version == 0 {
251
		defaultB, err := pd.getBool()
252
		if err != nil {
253
			return err
254
		}
255
		r.Default = defaultB
256
		if defaultB {
257
			r.Source = SourceDefault
258
		}
259
	} else {
260
		source, err := pd.getInt8()
261
		if err != nil {
262
			return err
263
		}
264
		r.Source = ConfigSource(source)
265
		r.Default = r.Source == SourceDefault
266
	}
267

268
	sensitive, err := pd.getBool()
269
	if err != nil {
270
		return err
271
	}
272
	r.Sensitive = sensitive
273

274
	if version > 0 {
275
		n, err := pd.getArrayLength()
276
		if err != nil {
277
			return err
278
		}
279
		r.Synonyms = make([]*ConfigSynonym, n)
280

281
		for i := 0; i < n; i++ {
282
			s := &ConfigSynonym{}
283
			if err := s.decode(pd, version); err != nil {
284
				return err
285
			}
286
			r.Synonyms[i] = s
287
		}
288
	}
289
	return nil
290
}
291

292
func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
293
	err = pe.putString(c.ConfigName)
294
	if err != nil {
295
		return err
296
	}
297

298
	err = pe.putString(c.ConfigValue)
299
	if err != nil {
300
		return err
301
	}
302

303
	pe.putInt8(int8(c.Source))
304

305
	return nil
306
}
307

308
func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
309
	name, err := pd.getString()
310
	if err != nil {
311
		return err
312
	}
313
	c.ConfigName = name
314

315
	value, err := pd.getString()
316
	if err != nil {
317
		return err
318
	}
319
	c.ConfigValue = value
320

321
	source, err := pd.getInt8()
322
	if err != nil {
323
		return err
324
	}
325
	c.Source = ConfigSource(source)
326
	return nil
327
}
328

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.