cubefs

Форк
0
/
fetch_response.go 
557 строк · 12.8 Кб
1
package sarama
2

3
import (
4
	"errors"
5
	"sort"
6
	"time"
7
)
8

9
const invalidPreferredReplicaID = -1
10

11
type AbortedTransaction struct {
12
	ProducerID  int64
13
	FirstOffset int64
14
}
15

16
func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
17
	if t.ProducerID, err = pd.getInt64(); err != nil {
18
		return err
19
	}
20

21
	if t.FirstOffset, err = pd.getInt64(); err != nil {
22
		return err
23
	}
24

25
	return nil
26
}
27

28
func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
29
	pe.putInt64(t.ProducerID)
30
	pe.putInt64(t.FirstOffset)
31

32
	return nil
33
}
34

35
type FetchResponseBlock struct {
36
	Err                    KError
37
	HighWaterMarkOffset    int64
38
	LastStableOffset       int64
39
	LastRecordsBatchOffset *int64
40
	LogStartOffset         int64
41
	AbortedTransactions    []*AbortedTransaction
42
	PreferredReadReplica   int32
43
	Records                *Records // deprecated: use FetchResponseBlock.RecordsSet
44
	RecordsSet             []*Records
45
	Partial                bool
46
}
47

48
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
49
	tmp, err := pd.getInt16()
50
	if err != nil {
51
		return err
52
	}
53
	b.Err = KError(tmp)
54

55
	b.HighWaterMarkOffset, err = pd.getInt64()
56
	if err != nil {
57
		return err
58
	}
59

60
	if version >= 4 {
61
		b.LastStableOffset, err = pd.getInt64()
62
		if err != nil {
63
			return err
64
		}
65

66
		if version >= 5 {
67
			b.LogStartOffset, err = pd.getInt64()
68
			if err != nil {
69
				return err
70
			}
71
		}
72

73
		numTransact, err := pd.getArrayLength()
74
		if err != nil {
75
			return err
76
		}
77

78
		if numTransact >= 0 {
79
			b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
80
		}
81

82
		for i := 0; i < numTransact; i++ {
83
			transact := new(AbortedTransaction)
84
			if err = transact.decode(pd); err != nil {
85
				return err
86
			}
87
			b.AbortedTransactions[i] = transact
88
		}
89
	}
90

91
	if version >= 11 {
92
		b.PreferredReadReplica, err = pd.getInt32()
93
		if err != nil {
94
			return err
95
		}
96
	} else {
97
		b.PreferredReadReplica = -1
98
	}
99

100
	recordsSize, err := pd.getInt32()
101
	if err != nil {
102
		return err
103
	}
104

105
	recordsDecoder, err := pd.getSubset(int(recordsSize))
106
	if err != nil {
107
		return err
108
	}
109

110
	b.RecordsSet = []*Records{}
111

112
	for recordsDecoder.remaining() > 0 {
113
		records := &Records{}
114
		if err := records.decode(recordsDecoder); err != nil {
115
			// If we have at least one decoded records, this is not an error
116
			if errors.Is(err, ErrInsufficientData) {
117
				if len(b.RecordsSet) == 0 {
118
					b.Partial = true
119
				}
120
				break
121
			}
122
			return err
123
		}
124

125
		b.LastRecordsBatchOffset, err = records.recordsOffset()
126
		if err != nil {
127
			return err
128
		}
129

130
		partial, err := records.isPartial()
131
		if err != nil {
132
			return err
133
		}
134

135
		n, err := records.numRecords()
136
		if err != nil {
137
			return err
138
		}
139

140
		if n > 0 || (partial && len(b.RecordsSet) == 0) {
141
			b.RecordsSet = append(b.RecordsSet, records)
142

143
			if b.Records == nil {
144
				b.Records = records
145
			}
146
		}
147

148
		overflow, err := records.isOverflow()
149
		if err != nil {
150
			return err
151
		}
152

153
		if partial || overflow {
154
			break
155
		}
156
	}
157

158
	return nil
159
}
160

161
func (b *FetchResponseBlock) numRecords() (int, error) {
162
	sum := 0
163

164
	for _, records := range b.RecordsSet {
165
		count, err := records.numRecords()
166
		if err != nil {
167
			return 0, err
168
		}
169

170
		sum += count
171
	}
172

173
	return sum, nil
174
}
175

176
func (b *FetchResponseBlock) isPartial() (bool, error) {
177
	if b.Partial {
178
		return true, nil
179
	}
180

181
	if len(b.RecordsSet) == 1 {
182
		return b.RecordsSet[0].isPartial()
183
	}
184

185
	return false, nil
186
}
187

188
func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
189
	pe.putInt16(int16(b.Err))
190

191
	pe.putInt64(b.HighWaterMarkOffset)
192

193
	if version >= 4 {
194
		pe.putInt64(b.LastStableOffset)
195

196
		if version >= 5 {
197
			pe.putInt64(b.LogStartOffset)
198
		}
199

200
		if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
201
			return err
202
		}
203
		for _, transact := range b.AbortedTransactions {
204
			if err = transact.encode(pe); err != nil {
205
				return err
206
			}
207
		}
208
	}
209

210
	if version >= 11 {
211
		pe.putInt32(b.PreferredReadReplica)
212
	}
213

214
	pe.push(&lengthField{})
215
	for _, records := range b.RecordsSet {
216
		err = records.encode(pe)
217
		if err != nil {
218
			return err
219
		}
220
	}
221
	return pe.pop()
222
}
223

224
func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
225
	// I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
226
	// plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
227
	at := b.AbortedTransactions
228
	sort.Slice(
229
		at,
230
		func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
231
	)
232
	return at
233
}
234

235
type FetchResponse struct {
236
	Blocks        map[string]map[int32]*FetchResponseBlock
237
	ThrottleTime  time.Duration
238
	ErrorCode     int16
239
	SessionID     int32
240
	Version       int16
241
	LogAppendTime bool
242
	Timestamp     time.Time
243
}
244

245
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
246
	r.Version = version
247

248
	if r.Version >= 1 {
249
		throttle, err := pd.getInt32()
250
		if err != nil {
251
			return err
252
		}
253
		r.ThrottleTime = time.Duration(throttle) * time.Millisecond
254
	}
255

256
	if r.Version >= 7 {
257
		r.ErrorCode, err = pd.getInt16()
258
		if err != nil {
259
			return err
260
		}
261
		r.SessionID, err = pd.getInt32()
262
		if err != nil {
263
			return err
264
		}
265
	}
266

267
	numTopics, err := pd.getArrayLength()
268
	if err != nil {
269
		return err
270
	}
271

272
	r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
273
	for i := 0; i < numTopics; i++ {
274
		name, err := pd.getString()
275
		if err != nil {
276
			return err
277
		}
278

279
		numBlocks, err := pd.getArrayLength()
280
		if err != nil {
281
			return err
282
		}
283

284
		r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
285

286
		for j := 0; j < numBlocks; j++ {
287
			id, err := pd.getInt32()
288
			if err != nil {
289
				return err
290
			}
291

292
			block := new(FetchResponseBlock)
293
			err = block.decode(pd, version)
294
			if err != nil {
295
				return err
296
			}
297
			r.Blocks[name][id] = block
298
		}
299
	}
300

301
	return nil
302
}
303

304
func (r *FetchResponse) encode(pe packetEncoder) (err error) {
305
	if r.Version >= 1 {
306
		pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
307
	}
308

309
	if r.Version >= 7 {
310
		pe.putInt16(r.ErrorCode)
311
		pe.putInt32(r.SessionID)
312
	}
313

314
	err = pe.putArrayLength(len(r.Blocks))
315
	if err != nil {
316
		return err
317
	}
318

319
	for topic, partitions := range r.Blocks {
320
		err = pe.putString(topic)
321
		if err != nil {
322
			return err
323
		}
324

325
		err = pe.putArrayLength(len(partitions))
326
		if err != nil {
327
			return err
328
		}
329

330
		for id, block := range partitions {
331
			pe.putInt32(id)
332
			err = block.encode(pe, r.Version)
333
			if err != nil {
334
				return err
335
			}
336
		}
337
	}
338
	return nil
339
}
340

341
func (r *FetchResponse) key() int16 {
342
	return 1
343
}
344

345
func (r *FetchResponse) version() int16 {
346
	return r.Version
347
}
348

349
func (r *FetchResponse) headerVersion() int16 {
350
	return 0
351
}
352

353
func (r *FetchResponse) requiredVersion() KafkaVersion {
354
	switch r.Version {
355
	case 0:
356
		return MinVersion
357
	case 1:
358
		return V0_9_0_0
359
	case 2:
360
		return V0_10_0_0
361
	case 3:
362
		return V0_10_1_0
363
	case 4, 5:
364
		return V0_11_0_0
365
	case 6:
366
		return V1_0_0_0
367
	case 7:
368
		return V1_1_0_0
369
	case 8:
370
		return V2_0_0_0
371
	case 9, 10:
372
		return V2_1_0_0
373
	case 11:
374
		return V2_3_0_0
375
	default:
376
		return MaxVersion
377
	}
378
}
379

380
func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
381
	if r.Blocks == nil {
382
		return nil
383
	}
384

385
	if r.Blocks[topic] == nil {
386
		return nil
387
	}
388

389
	return r.Blocks[topic][partition]
390
}
391

392
func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
393
	if r.Blocks == nil {
394
		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
395
	}
396
	partitions, ok := r.Blocks[topic]
397
	if !ok {
398
		partitions = make(map[int32]*FetchResponseBlock)
399
		r.Blocks[topic] = partitions
400
	}
401
	frb, ok := partitions[partition]
402
	if !ok {
403
		frb = new(FetchResponseBlock)
404
		partitions[partition] = frb
405
	}
406
	frb.Err = err
407
}
408

409
func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
410
	if r.Blocks == nil {
411
		r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
412
	}
413
	partitions, ok := r.Blocks[topic]
414
	if !ok {
415
		partitions = make(map[int32]*FetchResponseBlock)
416
		r.Blocks[topic] = partitions
417
	}
418
	frb, ok := partitions[partition]
419
	if !ok {
420
		frb = new(FetchResponseBlock)
421
		partitions[partition] = frb
422
	}
423

424
	return frb
425
}
426

427
func encodeKV(key, value Encoder) ([]byte, []byte) {
428
	var kb []byte
429
	var vb []byte
430
	if key != nil {
431
		kb, _ = key.Encode()
432
	}
433
	if value != nil {
434
		vb, _ = value.Encode()
435
	}
436

437
	return kb, vb
438
}
439

440
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
441
	frb := r.getOrCreateBlock(topic, partition)
442
	kb, vb := encodeKV(key, value)
443
	if r.LogAppendTime {
444
		timestamp = r.Timestamp
445
	}
446
	msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
447
	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
448
	if len(frb.RecordsSet) == 0 {
449
		records := newLegacyRecords(&MessageSet{})
450
		frb.RecordsSet = []*Records{&records}
451
	}
452
	set := frb.RecordsSet[0].MsgSet
453
	set.Messages = append(set.Messages, msgBlock)
454
}
455

456
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
457
	frb := r.getOrCreateBlock(topic, partition)
458
	kb, vb := encodeKV(key, value)
459
	if len(frb.RecordsSet) == 0 {
460
		records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
461
		frb.RecordsSet = []*Records{&records}
462
	}
463
	batch := frb.RecordsSet[0].RecordBatch
464
	rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
465
	batch.addRecord(rec)
466
}
467

468
// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
469
// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
470
// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
471
func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
472
	frb := r.getOrCreateBlock(topic, partition)
473
	kb, vb := encodeKV(key, value)
474

475
	records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
476
	batch := &RecordBatch{
477
		Version:         2,
478
		LogAppendTime:   r.LogAppendTime,
479
		FirstTimestamp:  timestamp,
480
		MaxTimestamp:    r.Timestamp,
481
		FirstOffset:     offset,
482
		LastOffsetDelta: 0,
483
		ProducerID:      producerID,
484
		IsTransactional: isTransactional,
485
	}
486
	rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
487
	batch.addRecord(rec)
488
	records.RecordBatch = batch
489

490
	frb.RecordsSet = append(frb.RecordsSet, &records)
491
}
492

493
func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
494
	frb := r.getOrCreateBlock(topic, partition)
495

496
	// batch
497
	batch := &RecordBatch{
498
		Version:         2,
499
		LogAppendTime:   r.LogAppendTime,
500
		FirstTimestamp:  timestamp,
501
		MaxTimestamp:    r.Timestamp,
502
		FirstOffset:     offset,
503
		LastOffsetDelta: 0,
504
		ProducerID:      producerID,
505
		IsTransactional: true,
506
		Control:         true,
507
	}
508

509
	// records
510
	records := newDefaultRecords(nil)
511
	records.RecordBatch = batch
512

513
	// record
514
	crAbort := ControlRecord{
515
		Version: 0,
516
		Type:    recordType,
517
	}
518
	crKey := &realEncoder{raw: make([]byte, 4)}
519
	crValue := &realEncoder{raw: make([]byte, 6)}
520
	crAbort.encode(crKey, crValue)
521
	rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
522
	batch.addRecord(rec)
523

524
	frb.RecordsSet = append(frb.RecordsSet, &records)
525
}
526

527
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
528
	r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
529
}
530

531
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
532
	r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
533
}
534

535
func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
536
	r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
537
}
538

539
func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
540
	// define controlRecord key and value
541
	r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
542
}
543

544
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
545
	frb := r.getOrCreateBlock(topic, partition)
546
	if len(frb.RecordsSet) == 0 {
547
		records := newDefaultRecords(&RecordBatch{Version: 2})
548
		frb.RecordsSet = []*Records{&records}
549
	}
550
	batch := frb.RecordsSet[0].RecordBatch
551
	batch.LastOffsetDelta = offset
552
}
553

554
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
555
	frb := r.getOrCreateBlock(topic, partition)
556
	frb.LastStableOffset = offset
557
}
558

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.