cubefs

Форк
0
/
metadata_response.go 
325 строк · 5.4 Кб
1
package sarama
2

3
type PartitionMetadata struct {
4
	Err             KError
5
	ID              int32
6
	Leader          int32
7
	Replicas        []int32
8
	Isr             []int32
9
	OfflineReplicas []int32
10
}
11

12
func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
13
	tmp, err := pd.getInt16()
14
	if err != nil {
15
		return err
16
	}
17
	pm.Err = KError(tmp)
18

19
	pm.ID, err = pd.getInt32()
20
	if err != nil {
21
		return err
22
	}
23

24
	pm.Leader, err = pd.getInt32()
25
	if err != nil {
26
		return err
27
	}
28

29
	pm.Replicas, err = pd.getInt32Array()
30
	if err != nil {
31
		return err
32
	}
33

34
	pm.Isr, err = pd.getInt32Array()
35
	if err != nil {
36
		return err
37
	}
38

39
	if version >= 5 {
40
		pm.OfflineReplicas, err = pd.getInt32Array()
41
		if err != nil {
42
			return err
43
		}
44
	}
45

46
	return nil
47
}
48

49
func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
50
	pe.putInt16(int16(pm.Err))
51
	pe.putInt32(pm.ID)
52
	pe.putInt32(pm.Leader)
53

54
	err = pe.putInt32Array(pm.Replicas)
55
	if err != nil {
56
		return err
57
	}
58

59
	err = pe.putInt32Array(pm.Isr)
60
	if err != nil {
61
		return err
62
	}
63

64
	if version >= 5 {
65
		err = pe.putInt32Array(pm.OfflineReplicas)
66
		if err != nil {
67
			return err
68
		}
69
	}
70

71
	return nil
72
}
73

74
type TopicMetadata struct {
75
	Err        KError
76
	Name       string
77
	IsInternal bool // Only valid for Version >= 1
78
	Partitions []*PartitionMetadata
79
}
80

81
func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
82
	tmp, err := pd.getInt16()
83
	if err != nil {
84
		return err
85
	}
86
	tm.Err = KError(tmp)
87

88
	tm.Name, err = pd.getString()
89
	if err != nil {
90
		return err
91
	}
92

93
	if version >= 1 {
94
		tm.IsInternal, err = pd.getBool()
95
		if err != nil {
96
			return err
97
		}
98
	}
99

100
	n, err := pd.getArrayLength()
101
	if err != nil {
102
		return err
103
	}
104
	tm.Partitions = make([]*PartitionMetadata, n)
105
	for i := 0; i < n; i++ {
106
		tm.Partitions[i] = new(PartitionMetadata)
107
		err = tm.Partitions[i].decode(pd, version)
108
		if err != nil {
109
			return err
110
		}
111
	}
112

113
	return nil
114
}
115

116
func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
117
	pe.putInt16(int16(tm.Err))
118

119
	err = pe.putString(tm.Name)
120
	if err != nil {
121
		return err
122
	}
123

124
	if version >= 1 {
125
		pe.putBool(tm.IsInternal)
126
	}
127

128
	err = pe.putArrayLength(len(tm.Partitions))
129
	if err != nil {
130
		return err
131
	}
132

133
	for _, pm := range tm.Partitions {
134
		err = pm.encode(pe, version)
135
		if err != nil {
136
			return err
137
		}
138
	}
139

140
	return nil
141
}
142

143
type MetadataResponse struct {
144
	Version        int16
145
	ThrottleTimeMs int32
146
	Brokers        []*Broker
147
	ClusterID      *string
148
	ControllerID   int32
149
	Topics         []*TopicMetadata
150
}
151

152
func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
153
	r.Version = version
154

155
	if version >= 3 {
156
		r.ThrottleTimeMs, err = pd.getInt32()
157
		if err != nil {
158
			return err
159
		}
160
	}
161

162
	n, err := pd.getArrayLength()
163
	if err != nil {
164
		return err
165
	}
166

167
	r.Brokers = make([]*Broker, n)
168
	for i := 0; i < n; i++ {
169
		r.Brokers[i] = new(Broker)
170
		err = r.Brokers[i].decode(pd, version)
171
		if err != nil {
172
			return err
173
		}
174
	}
175

176
	if version >= 2 {
177
		r.ClusterID, err = pd.getNullableString()
178
		if err != nil {
179
			return err
180
		}
181
	}
182

183
	if version >= 1 {
184
		r.ControllerID, err = pd.getInt32()
185
		if err != nil {
186
			return err
187
		}
188
	} else {
189
		r.ControllerID = -1
190
	}
191

192
	n, err = pd.getArrayLength()
193
	if err != nil {
194
		return err
195
	}
196

197
	r.Topics = make([]*TopicMetadata, n)
198
	for i := 0; i < n; i++ {
199
		r.Topics[i] = new(TopicMetadata)
200
		err = r.Topics[i].decode(pd, version)
201
		if err != nil {
202
			return err
203
		}
204
	}
205

206
	return nil
207
}
208

209
func (r *MetadataResponse) encode(pe packetEncoder) error {
210
	if r.Version >= 3 {
211
		pe.putInt32(r.ThrottleTimeMs)
212
	}
213

214
	err := pe.putArrayLength(len(r.Brokers))
215
	if err != nil {
216
		return err
217
	}
218
	for _, broker := range r.Brokers {
219
		err = broker.encode(pe, r.Version)
220
		if err != nil {
221
			return err
222
		}
223
	}
224

225
	if r.Version >= 2 {
226
		err := pe.putNullableString(r.ClusterID)
227
		if err != nil {
228
			return err
229
		}
230
	}
231

232
	if r.Version >= 1 {
233
		pe.putInt32(r.ControllerID)
234
	}
235

236
	err = pe.putArrayLength(len(r.Topics))
237
	if err != nil {
238
		return err
239
	}
240
	for _, tm := range r.Topics {
241
		err = tm.encode(pe, r.Version)
242
		if err != nil {
243
			return err
244
		}
245
	}
246

247
	return nil
248
}
249

250
func (r *MetadataResponse) key() int16 {
251
	return 3
252
}
253

254
func (r *MetadataResponse) version() int16 {
255
	return r.Version
256
}
257

258
func (r *MetadataResponse) headerVersion() int16 {
259
	return 0
260
}
261

262
func (r *MetadataResponse) requiredVersion() KafkaVersion {
263
	switch r.Version {
264
	case 1:
265
		return V0_10_0_0
266
	case 2:
267
		return V0_10_1_0
268
	case 3, 4:
269
		return V0_11_0_0
270
	case 5:
271
		return V1_0_0_0
272
	default:
273
		return MinVersion
274
	}
275
}
276

277
// testing API
278

279
func (r *MetadataResponse) AddBroker(addr string, id int32) {
280
	r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
281
}
282

283
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
284
	var tmatch *TopicMetadata
285

286
	for _, tm := range r.Topics {
287
		if tm.Name == topic {
288
			tmatch = tm
289
			goto foundTopic
290
		}
291
	}
292

293
	tmatch = new(TopicMetadata)
294
	tmatch.Name = topic
295
	r.Topics = append(r.Topics, tmatch)
296

297
foundTopic:
298

299
	tmatch.Err = err
300
	return tmatch
301
}
302

303
func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
304
	tmatch := r.AddTopic(topic, ErrNoError)
305
	var pmatch *PartitionMetadata
306

307
	for _, pm := range tmatch.Partitions {
308
		if pm.ID == partition {
309
			pmatch = pm
310
			goto foundPartition
311
		}
312
	}
313

314
	pmatch = new(PartitionMetadata)
315
	pmatch.ID = partition
316
	tmatch.Partitions = append(tmatch.Partitions, pmatch)
317

318
foundPartition:
319

320
	pmatch.Leader = brokerID
321
	pmatch.Replicas = replicas
322
	pmatch.Isr = isr
323
	pmatch.OfflineReplicas = offline
324
	pmatch.Err = err
325
}
326

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.