cubefs
557 строк · 12.8 Кб
1package sarama
2
3import (
4"errors"
5"sort"
6"time"
7)
8
9const invalidPreferredReplicaID = -1
10
11type AbortedTransaction struct {
12ProducerID int64
13FirstOffset int64
14}
15
16func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
17if t.ProducerID, err = pd.getInt64(); err != nil {
18return err
19}
20
21if t.FirstOffset, err = pd.getInt64(); err != nil {
22return err
23}
24
25return nil
26}
27
28func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
29pe.putInt64(t.ProducerID)
30pe.putInt64(t.FirstOffset)
31
32return nil
33}
34
35type FetchResponseBlock struct {
36Err KError
37HighWaterMarkOffset int64
38LastStableOffset int64
39LastRecordsBatchOffset *int64
40LogStartOffset int64
41AbortedTransactions []*AbortedTransaction
42PreferredReadReplica int32
43Records *Records // deprecated: use FetchResponseBlock.RecordsSet
44RecordsSet []*Records
45Partial bool
46}
47
48func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
49tmp, err := pd.getInt16()
50if err != nil {
51return err
52}
53b.Err = KError(tmp)
54
55b.HighWaterMarkOffset, err = pd.getInt64()
56if err != nil {
57return err
58}
59
60if version >= 4 {
61b.LastStableOffset, err = pd.getInt64()
62if err != nil {
63return err
64}
65
66if version >= 5 {
67b.LogStartOffset, err = pd.getInt64()
68if err != nil {
69return err
70}
71}
72
73numTransact, err := pd.getArrayLength()
74if err != nil {
75return err
76}
77
78if numTransact >= 0 {
79b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
80}
81
82for i := 0; i < numTransact; i++ {
83transact := new(AbortedTransaction)
84if err = transact.decode(pd); err != nil {
85return err
86}
87b.AbortedTransactions[i] = transact
88}
89}
90
91if version >= 11 {
92b.PreferredReadReplica, err = pd.getInt32()
93if err != nil {
94return err
95}
96} else {
97b.PreferredReadReplica = -1
98}
99
100recordsSize, err := pd.getInt32()
101if err != nil {
102return err
103}
104
105recordsDecoder, err := pd.getSubset(int(recordsSize))
106if err != nil {
107return err
108}
109
110b.RecordsSet = []*Records{}
111
112for recordsDecoder.remaining() > 0 {
113records := &Records{}
114if err := records.decode(recordsDecoder); err != nil {
115// If we have at least one decoded records, this is not an error
116if errors.Is(err, ErrInsufficientData) {
117if len(b.RecordsSet) == 0 {
118b.Partial = true
119}
120break
121}
122return err
123}
124
125b.LastRecordsBatchOffset, err = records.recordsOffset()
126if err != nil {
127return err
128}
129
130partial, err := records.isPartial()
131if err != nil {
132return err
133}
134
135n, err := records.numRecords()
136if err != nil {
137return err
138}
139
140if n > 0 || (partial && len(b.RecordsSet) == 0) {
141b.RecordsSet = append(b.RecordsSet, records)
142
143if b.Records == nil {
144b.Records = records
145}
146}
147
148overflow, err := records.isOverflow()
149if err != nil {
150return err
151}
152
153if partial || overflow {
154break
155}
156}
157
158return nil
159}
160
161func (b *FetchResponseBlock) numRecords() (int, error) {
162sum := 0
163
164for _, records := range b.RecordsSet {
165count, err := records.numRecords()
166if err != nil {
167return 0, err
168}
169
170sum += count
171}
172
173return sum, nil
174}
175
176func (b *FetchResponseBlock) isPartial() (bool, error) {
177if b.Partial {
178return true, nil
179}
180
181if len(b.RecordsSet) == 1 {
182return b.RecordsSet[0].isPartial()
183}
184
185return false, nil
186}
187
188func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
189pe.putInt16(int16(b.Err))
190
191pe.putInt64(b.HighWaterMarkOffset)
192
193if version >= 4 {
194pe.putInt64(b.LastStableOffset)
195
196if version >= 5 {
197pe.putInt64(b.LogStartOffset)
198}
199
200if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
201return err
202}
203for _, transact := range b.AbortedTransactions {
204if err = transact.encode(pe); err != nil {
205return err
206}
207}
208}
209
210if version >= 11 {
211pe.putInt32(b.PreferredReadReplica)
212}
213
214pe.push(&lengthField{})
215for _, records := range b.RecordsSet {
216err = records.encode(pe)
217if err != nil {
218return err
219}
220}
221return pe.pop()
222}
223
224func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
225// I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
226// plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
227at := b.AbortedTransactions
228sort.Slice(
229at,
230func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
231)
232return at
233}
234
235type FetchResponse struct {
236Blocks map[string]map[int32]*FetchResponseBlock
237ThrottleTime time.Duration
238ErrorCode int16
239SessionID int32
240Version int16
241LogAppendTime bool
242Timestamp time.Time
243}
244
245func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
246r.Version = version
247
248if r.Version >= 1 {
249throttle, err := pd.getInt32()
250if err != nil {
251return err
252}
253r.ThrottleTime = time.Duration(throttle) * time.Millisecond
254}
255
256if r.Version >= 7 {
257r.ErrorCode, err = pd.getInt16()
258if err != nil {
259return err
260}
261r.SessionID, err = pd.getInt32()
262if err != nil {
263return err
264}
265}
266
267numTopics, err := pd.getArrayLength()
268if err != nil {
269return err
270}
271
272r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
273for i := 0; i < numTopics; i++ {
274name, err := pd.getString()
275if err != nil {
276return err
277}
278
279numBlocks, err := pd.getArrayLength()
280if err != nil {
281return err
282}
283
284r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
285
286for j := 0; j < numBlocks; j++ {
287id, err := pd.getInt32()
288if err != nil {
289return err
290}
291
292block := new(FetchResponseBlock)
293err = block.decode(pd, version)
294if err != nil {
295return err
296}
297r.Blocks[name][id] = block
298}
299}
300
301return nil
302}
303
304func (r *FetchResponse) encode(pe packetEncoder) (err error) {
305if r.Version >= 1 {
306pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
307}
308
309if r.Version >= 7 {
310pe.putInt16(r.ErrorCode)
311pe.putInt32(r.SessionID)
312}
313
314err = pe.putArrayLength(len(r.Blocks))
315if err != nil {
316return err
317}
318
319for topic, partitions := range r.Blocks {
320err = pe.putString(topic)
321if err != nil {
322return err
323}
324
325err = pe.putArrayLength(len(partitions))
326if err != nil {
327return err
328}
329
330for id, block := range partitions {
331pe.putInt32(id)
332err = block.encode(pe, r.Version)
333if err != nil {
334return err
335}
336}
337}
338return nil
339}
340
341func (r *FetchResponse) key() int16 {
342return 1
343}
344
345func (r *FetchResponse) version() int16 {
346return r.Version
347}
348
349func (r *FetchResponse) headerVersion() int16 {
350return 0
351}
352
353func (r *FetchResponse) requiredVersion() KafkaVersion {
354switch r.Version {
355case 0:
356return MinVersion
357case 1:
358return V0_9_0_0
359case 2:
360return V0_10_0_0
361case 3:
362return V0_10_1_0
363case 4, 5:
364return V0_11_0_0
365case 6:
366return V1_0_0_0
367case 7:
368return V1_1_0_0
369case 8:
370return V2_0_0_0
371case 9, 10:
372return V2_1_0_0
373case 11:
374return V2_3_0_0
375default:
376return MaxVersion
377}
378}
379
380func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
381if r.Blocks == nil {
382return nil
383}
384
385if r.Blocks[topic] == nil {
386return nil
387}
388
389return r.Blocks[topic][partition]
390}
391
392func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
393if r.Blocks == nil {
394r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
395}
396partitions, ok := r.Blocks[topic]
397if !ok {
398partitions = make(map[int32]*FetchResponseBlock)
399r.Blocks[topic] = partitions
400}
401frb, ok := partitions[partition]
402if !ok {
403frb = new(FetchResponseBlock)
404partitions[partition] = frb
405}
406frb.Err = err
407}
408
409func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
410if r.Blocks == nil {
411r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
412}
413partitions, ok := r.Blocks[topic]
414if !ok {
415partitions = make(map[int32]*FetchResponseBlock)
416r.Blocks[topic] = partitions
417}
418frb, ok := partitions[partition]
419if !ok {
420frb = new(FetchResponseBlock)
421partitions[partition] = frb
422}
423
424return frb
425}
426
427func encodeKV(key, value Encoder) ([]byte, []byte) {
428var kb []byte
429var vb []byte
430if key != nil {
431kb, _ = key.Encode()
432}
433if value != nil {
434vb, _ = value.Encode()
435}
436
437return kb, vb
438}
439
440func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
441frb := r.getOrCreateBlock(topic, partition)
442kb, vb := encodeKV(key, value)
443if r.LogAppendTime {
444timestamp = r.Timestamp
445}
446msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
447msgBlock := &MessageBlock{Msg: msg, Offset: offset}
448if len(frb.RecordsSet) == 0 {
449records := newLegacyRecords(&MessageSet{})
450frb.RecordsSet = []*Records{&records}
451}
452set := frb.RecordsSet[0].MsgSet
453set.Messages = append(set.Messages, msgBlock)
454}
455
456func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
457frb := r.getOrCreateBlock(topic, partition)
458kb, vb := encodeKV(key, value)
459if len(frb.RecordsSet) == 0 {
460records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
461frb.RecordsSet = []*Records{&records}
462}
463batch := frb.RecordsSet[0].RecordBatch
464rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
465batch.addRecord(rec)
466}
467
468// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
469// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
470// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
471func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
472frb := r.getOrCreateBlock(topic, partition)
473kb, vb := encodeKV(key, value)
474
475records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
476batch := &RecordBatch{
477Version: 2,
478LogAppendTime: r.LogAppendTime,
479FirstTimestamp: timestamp,
480MaxTimestamp: r.Timestamp,
481FirstOffset: offset,
482LastOffsetDelta: 0,
483ProducerID: producerID,
484IsTransactional: isTransactional,
485}
486rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
487batch.addRecord(rec)
488records.RecordBatch = batch
489
490frb.RecordsSet = append(frb.RecordsSet, &records)
491}
492
493func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
494frb := r.getOrCreateBlock(topic, partition)
495
496// batch
497batch := &RecordBatch{
498Version: 2,
499LogAppendTime: r.LogAppendTime,
500FirstTimestamp: timestamp,
501MaxTimestamp: r.Timestamp,
502FirstOffset: offset,
503LastOffsetDelta: 0,
504ProducerID: producerID,
505IsTransactional: true,
506Control: true,
507}
508
509// records
510records := newDefaultRecords(nil)
511records.RecordBatch = batch
512
513// record
514crAbort := ControlRecord{
515Version: 0,
516Type: recordType,
517}
518crKey := &realEncoder{raw: make([]byte, 4)}
519crValue := &realEncoder{raw: make([]byte, 6)}
520crAbort.encode(crKey, crValue)
521rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
522batch.addRecord(rec)
523
524frb.RecordsSet = append(frb.RecordsSet, &records)
525}
526
527func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
528r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
529}
530
531func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
532r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
533}
534
535func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
536r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
537}
538
539func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
540// define controlRecord key and value
541r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
542}
543
544func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
545frb := r.getOrCreateBlock(topic, partition)
546if len(frb.RecordsSet) == 0 {
547records := newDefaultRecords(&RecordBatch{Version: 2})
548frb.RecordsSet = []*Records{&records}
549}
550batch := frb.RecordsSet[0].RecordBatch
551batch.LastOffsetDelta = offset
552}
553
554func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
555frb := r.getOrCreateBlock(topic, partition)
556frb.LastStableOffset = offset
557}
558