cubefs

Форк
0
/
offset_commit_request.go 
214 строк · 5.3 Кб
1
package sarama
2

3
import "errors"
4

5
// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
6
// tells the broker to set the timestamp to the time at which the request was received.
7
// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
8
const ReceiveTime int64 = -1
9

10
// GroupGenerationUndefined is a special value for the group generation field of
11
// Offset Commit Requests that should be used when a consumer group does not rely
12
// on Kafka for partition management.
13
const GroupGenerationUndefined = -1
14

15
type offsetCommitRequestBlock struct {
16
	offset    int64
17
	timestamp int64
18
	metadata  string
19
}
20

21
func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
22
	pe.putInt64(b.offset)
23
	if version == 1 {
24
		pe.putInt64(b.timestamp)
25
	} else if b.timestamp != 0 {
26
		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
27
	}
28

29
	return pe.putString(b.metadata)
30
}
31

32
func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
33
	if b.offset, err = pd.getInt64(); err != nil {
34
		return err
35
	}
36
	if version == 1 {
37
		if b.timestamp, err = pd.getInt64(); err != nil {
38
			return err
39
		}
40
	}
41
	b.metadata, err = pd.getString()
42
	return err
43
}
44

45
type OffsetCommitRequest struct {
46
	ConsumerGroup           string
47
	ConsumerGroupGeneration int32  // v1 or later
48
	ConsumerID              string // v1 or later
49
	RetentionTime           int64  // v2 or later
50

51
	// Version can be:
52
	// - 0 (kafka 0.8.1 and later)
53
	// - 1 (kafka 0.8.2 and later)
54
	// - 2 (kafka 0.9.0 and later)
55
	// - 3 (kafka 0.11.0 and later)
56
	// - 4 (kafka 2.0.0 and later)
57
	Version int16
58
	blocks  map[string]map[int32]*offsetCommitRequestBlock
59
}
60

61
func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
62
	if r.Version < 0 || r.Version > 4 {
63
		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
64
	}
65

66
	if err := pe.putString(r.ConsumerGroup); err != nil {
67
		return err
68
	}
69

70
	if r.Version >= 1 {
71
		pe.putInt32(r.ConsumerGroupGeneration)
72
		if err := pe.putString(r.ConsumerID); err != nil {
73
			return err
74
		}
75
	} else {
76
		if r.ConsumerGroupGeneration != 0 {
77
			Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
78
		}
79
		if r.ConsumerID != "" {
80
			Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
81
		}
82
	}
83

84
	if r.Version >= 2 {
85
		pe.putInt64(r.RetentionTime)
86
	} else if r.RetentionTime != 0 {
87
		Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
88
	}
89

90
	if err := pe.putArrayLength(len(r.blocks)); err != nil {
91
		return err
92
	}
93
	for topic, partitions := range r.blocks {
94
		if err := pe.putString(topic); err != nil {
95
			return err
96
		}
97
		if err := pe.putArrayLength(len(partitions)); err != nil {
98
			return err
99
		}
100
		for partition, block := range partitions {
101
			pe.putInt32(partition)
102
			if err := block.encode(pe, r.Version); err != nil {
103
				return err
104
			}
105
		}
106
	}
107
	return nil
108
}
109

110
func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
111
	r.Version = version
112

113
	if r.ConsumerGroup, err = pd.getString(); err != nil {
114
		return err
115
	}
116

117
	if r.Version >= 1 {
118
		if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
119
			return err
120
		}
121
		if r.ConsumerID, err = pd.getString(); err != nil {
122
			return err
123
		}
124
	}
125

126
	if r.Version >= 2 {
127
		if r.RetentionTime, err = pd.getInt64(); err != nil {
128
			return err
129
		}
130
	}
131

132
	topicCount, err := pd.getArrayLength()
133
	if err != nil {
134
		return err
135
	}
136
	if topicCount == 0 {
137
		return nil
138
	}
139
	r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
140
	for i := 0; i < topicCount; i++ {
141
		topic, err := pd.getString()
142
		if err != nil {
143
			return err
144
		}
145
		partitionCount, err := pd.getArrayLength()
146
		if err != nil {
147
			return err
148
		}
149
		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
150
		for j := 0; j < partitionCount; j++ {
151
			partition, err := pd.getInt32()
152
			if err != nil {
153
				return err
154
			}
155
			block := &offsetCommitRequestBlock{}
156
			if err := block.decode(pd, r.Version); err != nil {
157
				return err
158
			}
159
			r.blocks[topic][partition] = block
160
		}
161
	}
162
	return nil
163
}
164

165
func (r *OffsetCommitRequest) key() int16 {
166
	return 8
167
}
168

169
func (r *OffsetCommitRequest) version() int16 {
170
	return r.Version
171
}
172

173
func (r *OffsetCommitRequest) headerVersion() int16 {
174
	return 1
175
}
176

177
func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
178
	switch r.Version {
179
	case 1:
180
		return V0_8_2_0
181
	case 2:
182
		return V0_9_0_0
183
	case 3:
184
		return V0_11_0_0
185
	case 4:
186
		return V2_0_0_0
187
	default:
188
		return MinVersion
189
	}
190
}
191

192
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
193
	if r.blocks == nil {
194
		r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
195
	}
196

197
	if r.blocks[topic] == nil {
198
		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
199
	}
200

201
	r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
202
}
203

204
func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
205
	partitions := r.blocks[topic]
206
	if partitions == nil {
207
		return 0, "", errors.New("no such offset")
208
	}
209
	block := partitions[partitionID]
210
	if block == nil {
211
		return 0, "", errors.New("no such offset")
212
	}
213
	return block.offset, block.metadata, nil
214
}
215

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.