cubefs

Форк
0
/
offset_response.go 
192 строки · 3.5 Кб
1
package sarama
2

3
type OffsetResponseBlock struct {
4
	Err       KError
5
	Offsets   []int64 // Version 0
6
	Offset    int64   // Version 1
7
	Timestamp int64   // Version 1
8
}
9

10
func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11
	tmp, err := pd.getInt16()
12
	if err != nil {
13
		return err
14
	}
15
	b.Err = KError(tmp)
16

17
	if version == 0 {
18
		b.Offsets, err = pd.getInt64Array()
19

20
		return err
21
	}
22

23
	b.Timestamp, err = pd.getInt64()
24
	if err != nil {
25
		return err
26
	}
27

28
	b.Offset, err = pd.getInt64()
29
	if err != nil {
30
		return err
31
	}
32

33
	// For backwards compatibility put the offset in the offsets array too
34
	b.Offsets = []int64{b.Offset}
35

36
	return nil
37
}
38

39
func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
40
	pe.putInt16(int16(b.Err))
41

42
	if version == 0 {
43
		return pe.putInt64Array(b.Offsets)
44
	}
45

46
	pe.putInt64(b.Timestamp)
47
	pe.putInt64(b.Offset)
48

49
	return nil
50
}
51

52
type OffsetResponse struct {
53
	Version        int16
54
	ThrottleTimeMs int32
55
	Blocks         map[string]map[int32]*OffsetResponseBlock
56
}
57

58
func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
59
	if version >= 2 {
60
		r.ThrottleTimeMs, err = pd.getInt32()
61
		if err != nil {
62
			return err
63
		}
64
	}
65

66
	numTopics, err := pd.getArrayLength()
67
	if err != nil {
68
		return err
69
	}
70

71
	r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
72
	for i := 0; i < numTopics; i++ {
73
		name, err := pd.getString()
74
		if err != nil {
75
			return err
76
		}
77

78
		numBlocks, err := pd.getArrayLength()
79
		if err != nil {
80
			return err
81
		}
82

83
		r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
84

85
		for j := 0; j < numBlocks; j++ {
86
			id, err := pd.getInt32()
87
			if err != nil {
88
				return err
89
			}
90

91
			block := new(OffsetResponseBlock)
92
			err = block.decode(pd, version)
93
			if err != nil {
94
				return err
95
			}
96
			r.Blocks[name][id] = block
97
		}
98
	}
99

100
	return nil
101
}
102

103
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
104
	if r.Blocks == nil {
105
		return nil
106
	}
107

108
	if r.Blocks[topic] == nil {
109
		return nil
110
	}
111

112
	return r.Blocks[topic][partition]
113
}
114

115
/*
116
// [0 0 0 1 ntopics
117
0 8 109 121 95 116 111 112 105 99 topic
118
0 0 0 1 npartitions
119
0 0 0 0 id
120
0 0
121

122
0 0 0 1 0 0 0 0
123
0 1 1 1 0 0 0 1
124
0 8 109 121 95 116 111 112
125
105 99 0 0 0 1 0 0
126
0 0 0 0 0 0 0 1
127
0 0 0 0 0 1 1 1] <nil>
128

129
*/
130
func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
131
	if r.Version >= 2 {
132
		pe.putInt32(r.ThrottleTimeMs)
133
	}
134

135
	if err = pe.putArrayLength(len(r.Blocks)); err != nil {
136
		return err
137
	}
138

139
	for topic, partitions := range r.Blocks {
140
		if err = pe.putString(topic); err != nil {
141
			return err
142
		}
143
		if err = pe.putArrayLength(len(partitions)); err != nil {
144
			return err
145
		}
146
		for partition, block := range partitions {
147
			pe.putInt32(partition)
148
			if err = block.encode(pe, r.version()); err != nil {
149
				return err
150
			}
151
		}
152
	}
153

154
	return nil
155
}
156

157
func (r *OffsetResponse) key() int16 {
158
	return 2
159
}
160

161
func (r *OffsetResponse) version() int16 {
162
	return r.Version
163
}
164

165
func (r *OffsetResponse) headerVersion() int16 {
166
	return 0
167
}
168

169
func (r *OffsetResponse) requiredVersion() KafkaVersion {
170
	switch r.Version {
171
	case 1:
172
		return V0_10_1_0
173
	case 2:
174
		return V0_11_0_0
175
	default:
176
		return MinVersion
177
	}
178
}
179

180
// testing API
181

182
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
183
	if r.Blocks == nil {
184
		r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
185
	}
186
	byTopic, ok := r.Blocks[topic]
187
	if !ok {
188
		byTopic = make(map[int32]*OffsetResponseBlock)
189
		r.Blocks[topic] = byTopic
190
	}
191
	byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
192
}
193

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.