cubefs

Форк
0
/
offset_fetch_request.go 
207 строк · 3.6 Кб
1
package sarama
2

3
type OffsetFetchRequest struct {
4
	Version       int16
5
	ConsumerGroup string
6
	RequireStable bool // requires v7+
7
	partitions    map[string][]int32
8
}
9

10
func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
11
	if r.Version < 0 || r.Version > 7 {
12
		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
13
	}
14

15
	isFlexible := r.Version >= 6
16

17
	if isFlexible {
18
		err = pe.putCompactString(r.ConsumerGroup)
19
	} else {
20
		err = pe.putString(r.ConsumerGroup)
21
	}
22
	if err != nil {
23
		return err
24
	}
25

26
	if isFlexible {
27
		if r.partitions == nil {
28
			pe.putUVarint(0)
29
		} else {
30
			pe.putCompactArrayLength(len(r.partitions))
31
		}
32
	} else {
33
		if r.partitions == nil && r.Version >= 2 {
34
			pe.putInt32(-1)
35
		} else {
36
			if err = pe.putArrayLength(len(r.partitions)); err != nil {
37
				return err
38
			}
39
		}
40
	}
41

42
	for topic, partitions := range r.partitions {
43
		if isFlexible {
44
			err = pe.putCompactString(topic)
45
		} else {
46
			err = pe.putString(topic)
47
		}
48
		if err != nil {
49
			return err
50
		}
51

52
		//
53

54
		if isFlexible {
55
			err = pe.putCompactInt32Array(partitions)
56
		} else {
57
			err = pe.putInt32Array(partitions)
58
		}
59
		if err != nil {
60
			return err
61
		}
62

63
		if isFlexible {
64
			pe.putEmptyTaggedFieldArray()
65
		}
66
	}
67

68
	if r.RequireStable && r.Version < 7 {
69
		return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
70
	}
71

72
	if r.Version >= 7 {
73
		pe.putBool(r.RequireStable)
74
	}
75

76
	if isFlexible {
77
		pe.putEmptyTaggedFieldArray()
78
	}
79

80
	return nil
81
}
82

83
func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
84
	r.Version = version
85
	isFlexible := r.Version >= 6
86
	if isFlexible {
87
		r.ConsumerGroup, err = pd.getCompactString()
88
	} else {
89
		r.ConsumerGroup, err = pd.getString()
90
	}
91
	if err != nil {
92
		return err
93
	}
94

95
	var partitionCount int
96

97
	if isFlexible {
98
		partitionCount, err = pd.getCompactArrayLength()
99
	} else {
100
		partitionCount, err = pd.getArrayLength()
101
	}
102
	if err != nil {
103
		return err
104
	}
105

106
	if (partitionCount == 0 && version < 2) || partitionCount < 0 {
107
		return nil
108
	}
109

110
	r.partitions = make(map[string][]int32, partitionCount)
111
	for i := 0; i < partitionCount; i++ {
112
		var topic string
113
		if isFlexible {
114
			topic, err = pd.getCompactString()
115
		} else {
116
			topic, err = pd.getString()
117
		}
118
		if err != nil {
119
			return err
120
		}
121

122
		var partitions []int32
123
		if isFlexible {
124
			partitions, err = pd.getCompactInt32Array()
125
		} else {
126
			partitions, err = pd.getInt32Array()
127
		}
128
		if err != nil {
129
			return err
130
		}
131
		if isFlexible {
132
			_, err = pd.getEmptyTaggedFieldArray()
133
			if err != nil {
134
				return err
135
			}
136
		}
137

138
		r.partitions[topic] = partitions
139
	}
140

141
	if r.Version >= 7 {
142
		r.RequireStable, err = pd.getBool()
143
		if err != nil {
144
			return err
145
		}
146
	}
147

148
	if isFlexible {
149
		_, err = pd.getEmptyTaggedFieldArray()
150
		if err != nil {
151
			return err
152
		}
153
	}
154

155
	return nil
156
}
157

158
func (r *OffsetFetchRequest) key() int16 {
159
	return 9
160
}
161

162
func (r *OffsetFetchRequest) version() int16 {
163
	return r.Version
164
}
165

166
func (r *OffsetFetchRequest) headerVersion() int16 {
167
	if r.Version >= 6 {
168
		return 2
169
	}
170

171
	return 1
172
}
173

174
func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
175
	switch r.Version {
176
	case 1:
177
		return V0_8_2_0
178
	case 2:
179
		return V0_10_2_0
180
	case 3:
181
		return V0_11_0_0
182
	case 4:
183
		return V2_0_0_0
184
	case 5:
185
		return V2_1_0_0
186
	case 6:
187
		return V2_4_0_0
188
	case 7:
189
		return V2_5_0_0
190
	default:
191
		return MinVersion
192
	}
193
}
194

195
func (r *OffsetFetchRequest) ZeroPartitions() {
196
	if r.partitions == nil && r.Version >= 2 {
197
		r.partitions = make(map[string][]int32)
198
	}
199
}
200

201
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
202
	if r.partitions == nil {
203
		r.partitions = make(map[string][]int32)
204
	}
205

206
	r.partitions[topic] = append(r.partitions[topic], partitionID)
207
}
208

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.