cubefs

Форк
0
1408 строк · 37.0 Кб
1
package sarama
2

3
import (
4
	"fmt"
5
	"strings"
6
	"sync"
7
)
8

9
// TestReporter has methods matching go's testing.T to avoid importing
10
// `testing` in the main part of the library.
11
type TestReporter interface {
12
	Error(...interface{})
13
	Errorf(string, ...interface{})
14
	Fatal(...interface{})
15
	Fatalf(string, ...interface{})
16
}
17

18
// MockResponse is a response builder interface it defines one method that
19
// allows generating a response based on a request body. MockResponses are used
20
// to program behavior of MockBroker in tests.
21
type MockResponse interface {
22
	For(reqBody versionedDecoder) (res encoderWithHeader)
23
}
24

25
// MockWrapper is a mock response builder that returns a particular concrete
26
// response regardless of the actual request passed to the `For` method.
27
type MockWrapper struct {
28
	res encoderWithHeader
29
}
30

31
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
32
	return mw.res
33
}
34

35
func NewMockWrapper(res encoderWithHeader) *MockWrapper {
36
	return &MockWrapper{res: res}
37
}
38

39
// MockSequence is a mock response builder that is created from a sequence of
40
// concrete responses. Every time when a `MockBroker` calls its `For` method
41
// the next response from the sequence is returned. When the end of the
42
// sequence is reached the last element from the sequence is returned.
43
type MockSequence struct {
44
	responses []MockResponse
45
}
46

47
func NewMockSequence(responses ...interface{}) *MockSequence {
48
	ms := &MockSequence{}
49
	ms.responses = make([]MockResponse, len(responses))
50
	for i, res := range responses {
51
		switch res := res.(type) {
52
		case MockResponse:
53
			ms.responses[i] = res
54
		case encoderWithHeader:
55
			ms.responses[i] = NewMockWrapper(res)
56
		default:
57
			panic(fmt.Sprintf("Unexpected response type: %T", res))
58
		}
59
	}
60
	return ms
61
}
62

63
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
64
	res = mc.responses[0].For(reqBody)
65
	if len(mc.responses) > 1 {
66
		mc.responses = mc.responses[1:]
67
	}
68
	return res
69
}
70

71
type MockListGroupsResponse struct {
72
	groups map[string]string
73
	t      TestReporter
74
}
75

76
func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
77
	return &MockListGroupsResponse{
78
		groups: make(map[string]string),
79
		t:      t,
80
	}
81
}
82

83
func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
84
	request := reqBody.(*ListGroupsRequest)
85
	_ = request
86
	response := &ListGroupsResponse{
87
		Groups: m.groups,
88
	}
89
	return response
90
}
91

92
func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
93
	m.groups[groupID] = protocolType
94
	return m
95
}
96

97
type MockDescribeGroupsResponse struct {
98
	groups map[string]*GroupDescription
99
	t      TestReporter
100
}
101

102
func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
103
	return &MockDescribeGroupsResponse{
104
		t:      t,
105
		groups: make(map[string]*GroupDescription),
106
	}
107
}
108

109
func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
110
	m.groups[groupID] = description
111
	return m
112
}
113

114
func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
115
	request := reqBody.(*DescribeGroupsRequest)
116

117
	response := &DescribeGroupsResponse{}
118
	for _, requestedGroup := range request.Groups {
119
		if group, ok := m.groups[requestedGroup]; ok {
120
			response.Groups = append(response.Groups, group)
121
		} else {
122
			// Mimic real kafka - if a group doesn't exist, return
123
			// an entry with state "Dead"
124
			response.Groups = append(response.Groups, &GroupDescription{
125
				GroupId: requestedGroup,
126
				State:   "Dead",
127
			})
128
		}
129
	}
130

131
	return response
132
}
133

134
// MockMetadataResponse is a `MetadataResponse` builder.
135
type MockMetadataResponse struct {
136
	controllerID int32
137
	leaders      map[string]map[int32]int32
138
	brokers      map[string]int32
139
	t            TestReporter
140
}
141

142
func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
143
	return &MockMetadataResponse{
144
		leaders: make(map[string]map[int32]int32),
145
		brokers: make(map[string]int32),
146
		t:       t,
147
	}
148
}
149

150
func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
151
	partitions := mmr.leaders[topic]
152
	if partitions == nil {
153
		partitions = make(map[int32]int32)
154
		mmr.leaders[topic] = partitions
155
	}
156
	partitions[partition] = brokerID
157
	return mmr
158
}
159

160
func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
161
	mmr.brokers[addr] = brokerID
162
	return mmr
163
}
164

165
func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
166
	mmr.controllerID = brokerID
167
	return mmr
168
}
169

170
func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
171
	metadataRequest := reqBody.(*MetadataRequest)
172
	metadataResponse := &MetadataResponse{
173
		Version:      metadataRequest.version(),
174
		ControllerID: mmr.controllerID,
175
	}
176
	for addr, brokerID := range mmr.brokers {
177
		metadataResponse.AddBroker(addr, brokerID)
178
	}
179

180
	// Generate set of replicas
181
	var replicas []int32
182
	var offlineReplicas []int32
183
	for _, brokerID := range mmr.brokers {
184
		replicas = append(replicas, brokerID)
185
	}
186

187
	if len(metadataRequest.Topics) == 0 {
188
		for topic, partitions := range mmr.leaders {
189
			for partition, brokerID := range partitions {
190
				metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
191
			}
192
		}
193
		return metadataResponse
194
	}
195
	for _, topic := range metadataRequest.Topics {
196
		for partition, brokerID := range mmr.leaders[topic] {
197
			metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
198
		}
199
	}
200
	return metadataResponse
201
}
202

203
// MockOffsetResponse is an `OffsetResponse` builder.
204
type MockOffsetResponse struct {
205
	offsets map[string]map[int32]map[int64]int64
206
	t       TestReporter
207
}
208

209
func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
210
	return &MockOffsetResponse{
211
		offsets: make(map[string]map[int32]map[int64]int64),
212
		t:       t,
213
	}
214
}
215

216
func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
217
	partitions := mor.offsets[topic]
218
	if partitions == nil {
219
		partitions = make(map[int32]map[int64]int64)
220
		mor.offsets[topic] = partitions
221
	}
222
	times := partitions[partition]
223
	if times == nil {
224
		times = make(map[int64]int64)
225
		partitions[partition] = times
226
	}
227
	times[time] = offset
228
	return mor
229
}
230

231
func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
232
	offsetRequest := reqBody.(*OffsetRequest)
233
	offsetResponse := &OffsetResponse{Version: offsetRequest.Version}
234
	for topic, partitions := range offsetRequest.blocks {
235
		for partition, block := range partitions {
236
			offset := mor.getOffset(topic, partition, block.time)
237
			offsetResponse.AddTopicPartition(topic, partition, offset)
238
		}
239
	}
240
	return offsetResponse
241
}
242

243
func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
244
	partitions := mor.offsets[topic]
245
	if partitions == nil {
246
		mor.t.Errorf("missing topic: %s", topic)
247
	}
248
	times := partitions[partition]
249
	if times == nil {
250
		mor.t.Errorf("missing partition: %d", partition)
251
	}
252
	offset, ok := times[time]
253
	if !ok {
254
		mor.t.Errorf("missing time: %d", time)
255
	}
256
	return offset
257
}
258

259
// MockFetchResponse is a `FetchResponse` builder.
260
type MockFetchResponse struct {
261
	messages       map[string]map[int32]map[int64]Encoder
262
	messagesLock   *sync.RWMutex
263
	highWaterMarks map[string]map[int32]int64
264
	t              TestReporter
265
	batchSize      int
266
}
267

268
func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
269
	return &MockFetchResponse{
270
		messages:       make(map[string]map[int32]map[int64]Encoder),
271
		messagesLock:   &sync.RWMutex{},
272
		highWaterMarks: make(map[string]map[int32]int64),
273
		t:              t,
274
		batchSize:      batchSize,
275
	}
276
}
277

278
func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
279
	mfr.messagesLock.Lock()
280
	defer mfr.messagesLock.Unlock()
281
	partitions := mfr.messages[topic]
282
	if partitions == nil {
283
		partitions = make(map[int32]map[int64]Encoder)
284
		mfr.messages[topic] = partitions
285
	}
286
	messages := partitions[partition]
287
	if messages == nil {
288
		messages = make(map[int64]Encoder)
289
		partitions[partition] = messages
290
	}
291
	messages[offset] = msg
292
	return mfr
293
}
294

295
func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
296
	partitions := mfr.highWaterMarks[topic]
297
	if partitions == nil {
298
		partitions = make(map[int32]int64)
299
		mfr.highWaterMarks[topic] = partitions
300
	}
301
	partitions[partition] = offset
302
	return mfr
303
}
304

305
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
306
	fetchRequest := reqBody.(*FetchRequest)
307
	res := &FetchResponse{
308
		Version: fetchRequest.Version,
309
	}
310
	for topic, partitions := range fetchRequest.blocks {
311
		for partition, block := range partitions {
312
			initialOffset := block.fetchOffset
313
			offset := initialOffset
314
			maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
315
			for i := 0; i < mfr.batchSize && offset < maxOffset; {
316
				msg := mfr.getMessage(topic, partition, offset)
317
				if msg != nil {
318
					res.AddMessage(topic, partition, nil, msg, offset)
319
					i++
320
				}
321
				offset++
322
			}
323
			fb := res.GetBlock(topic, partition)
324
			if fb == nil {
325
				res.AddError(topic, partition, ErrNoError)
326
				fb = res.GetBlock(topic, partition)
327
			}
328
			fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
329
		}
330
	}
331
	return res
332
}
333

334
func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
335
	mfr.messagesLock.RLock()
336
	defer mfr.messagesLock.RUnlock()
337
	partitions := mfr.messages[topic]
338
	if partitions == nil {
339
		return nil
340
	}
341
	messages := partitions[partition]
342
	if messages == nil {
343
		return nil
344
	}
345
	return messages[offset]
346
}
347

348
func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
349
	mfr.messagesLock.RLock()
350
	defer mfr.messagesLock.RUnlock()
351
	partitions := mfr.messages[topic]
352
	if partitions == nil {
353
		return 0
354
	}
355
	messages := partitions[partition]
356
	if messages == nil {
357
		return 0
358
	}
359
	return len(messages)
360
}
361

362
func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
363
	partitions := mfr.highWaterMarks[topic]
364
	if partitions == nil {
365
		return 0
366
	}
367
	return partitions[partition]
368
}
369

370
// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
371
type MockConsumerMetadataResponse struct {
372
	coordinators map[string]interface{}
373
	t            TestReporter
374
}
375

376
func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
377
	return &MockConsumerMetadataResponse{
378
		coordinators: make(map[string]interface{}),
379
		t:            t,
380
	}
381
}
382

383
func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
384
	mr.coordinators[group] = broker
385
	return mr
386
}
387

388
func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
389
	mr.coordinators[group] = kerror
390
	return mr
391
}
392

393
func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
394
	req := reqBody.(*ConsumerMetadataRequest)
395
	group := req.ConsumerGroup
396
	res := &ConsumerMetadataResponse{}
397
	v := mr.coordinators[group]
398
	switch v := v.(type) {
399
	case *MockBroker:
400
		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
401
	case KError:
402
		res.Err = v
403
	}
404
	return res
405
}
406

407
// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
408
type MockFindCoordinatorResponse struct {
409
	groupCoordinators map[string]interface{}
410
	transCoordinators map[string]interface{}
411
	t                 TestReporter
412
}
413

414
func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
415
	return &MockFindCoordinatorResponse{
416
		groupCoordinators: make(map[string]interface{}),
417
		transCoordinators: make(map[string]interface{}),
418
		t:                 t,
419
	}
420
}
421

422
func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
423
	switch coordinatorType {
424
	case CoordinatorGroup:
425
		mr.groupCoordinators[group] = broker
426
	case CoordinatorTransaction:
427
		mr.transCoordinators[group] = broker
428
	}
429
	return mr
430
}
431

432
func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
433
	switch coordinatorType {
434
	case CoordinatorGroup:
435
		mr.groupCoordinators[group] = kerror
436
	case CoordinatorTransaction:
437
		mr.transCoordinators[group] = kerror
438
	}
439
	return mr
440
}
441

442
func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
443
	req := reqBody.(*FindCoordinatorRequest)
444
	res := &FindCoordinatorResponse{}
445
	var v interface{}
446
	switch req.CoordinatorType {
447
	case CoordinatorGroup:
448
		v = mr.groupCoordinators[req.CoordinatorKey]
449
	case CoordinatorTransaction:
450
		v = mr.transCoordinators[req.CoordinatorKey]
451
	}
452
	switch v := v.(type) {
453
	case *MockBroker:
454
		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
455
	case KError:
456
		res.Err = v
457
	}
458
	return res
459
}
460

461
// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
462
type MockOffsetCommitResponse struct {
463
	errors map[string]map[string]map[int32]KError
464
	t      TestReporter
465
}
466

467
func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
468
	return &MockOffsetCommitResponse{t: t}
469
}
470

471
func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
472
	if mr.errors == nil {
473
		mr.errors = make(map[string]map[string]map[int32]KError)
474
	}
475
	topics := mr.errors[group]
476
	if topics == nil {
477
		topics = make(map[string]map[int32]KError)
478
		mr.errors[group] = topics
479
	}
480
	partitions := topics[topic]
481
	if partitions == nil {
482
		partitions = make(map[int32]KError)
483
		topics[topic] = partitions
484
	}
485
	partitions[partition] = kerror
486
	return mr
487
}
488

489
func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
490
	req := reqBody.(*OffsetCommitRequest)
491
	group := req.ConsumerGroup
492
	res := &OffsetCommitResponse{}
493
	for topic, partitions := range req.blocks {
494
		for partition := range partitions {
495
			res.AddError(topic, partition, mr.getError(group, topic, partition))
496
		}
497
	}
498
	return res
499
}
500

501
func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
502
	topics := mr.errors[group]
503
	if topics == nil {
504
		return ErrNoError
505
	}
506
	partitions := topics[topic]
507
	if partitions == nil {
508
		return ErrNoError
509
	}
510
	kerror, ok := partitions[partition]
511
	if !ok {
512
		return ErrNoError
513
	}
514
	return kerror
515
}
516

517
// MockProduceResponse is a `ProduceResponse` builder.
518
type MockProduceResponse struct {
519
	version int16
520
	errors  map[string]map[int32]KError
521
	t       TestReporter
522
}
523

524
func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
525
	return &MockProduceResponse{t: t}
526
}
527

528
func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
529
	mr.version = version
530
	return mr
531
}
532

533
func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
534
	if mr.errors == nil {
535
		mr.errors = make(map[string]map[int32]KError)
536
	}
537
	partitions := mr.errors[topic]
538
	if partitions == nil {
539
		partitions = make(map[int32]KError)
540
		mr.errors[topic] = partitions
541
	}
542
	partitions[partition] = kerror
543
	return mr
544
}
545

546
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
547
	req := reqBody.(*ProduceRequest)
548
	res := &ProduceResponse{
549
		Version: mr.version,
550
	}
551
	for topic, partitions := range req.records {
552
		for partition := range partitions {
553
			res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
554
		}
555
	}
556
	return res
557
}
558

559
func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
560
	partitions := mr.errors[topic]
561
	if partitions == nil {
562
		return ErrNoError
563
	}
564
	kerror, ok := partitions[partition]
565
	if !ok {
566
		return ErrNoError
567
	}
568
	return kerror
569
}
570

571
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
572
type MockOffsetFetchResponse struct {
573
	offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
574
	error   KError
575
	t       TestReporter
576
}
577

578
func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
579
	return &MockOffsetFetchResponse{t: t}
580
}
581

582
func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
583
	if mr.offsets == nil {
584
		mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
585
	}
586
	topics := mr.offsets[group]
587
	if topics == nil {
588
		topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
589
		mr.offsets[group] = topics
590
	}
591
	partitions := topics[topic]
592
	if partitions == nil {
593
		partitions = make(map[int32]*OffsetFetchResponseBlock)
594
		topics[topic] = partitions
595
	}
596
	partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
597
	return mr
598
}
599

600
func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
601
	mr.error = kerror
602
	return mr
603
}
604

605
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
606
	req := reqBody.(*OffsetFetchRequest)
607
	group := req.ConsumerGroup
608
	res := &OffsetFetchResponse{Version: req.Version}
609

610
	for topic, partitions := range mr.offsets[group] {
611
		for partition, block := range partitions {
612
			res.AddBlock(topic, partition, block)
613
		}
614
	}
615

616
	if res.Version >= 2 {
617
		res.Err = mr.error
618
	}
619
	return res
620
}
621

622
type MockCreateTopicsResponse struct {
623
	t TestReporter
624
}
625

626
func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
627
	return &MockCreateTopicsResponse{t: t}
628
}
629

630
func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
631
	req := reqBody.(*CreateTopicsRequest)
632
	res := &CreateTopicsResponse{
633
		Version: req.Version,
634
	}
635
	res.TopicErrors = make(map[string]*TopicError)
636

637
	for topic := range req.TopicDetails {
638
		if res.Version >= 1 && strings.HasPrefix(topic, "_") {
639
			msg := "insufficient permissions to create topic with reserved prefix"
640
			res.TopicErrors[topic] = &TopicError{
641
				Err:    ErrTopicAuthorizationFailed,
642
				ErrMsg: &msg,
643
			}
644
			continue
645
		}
646
		res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
647
	}
648
	return res
649
}
650

651
type MockDeleteTopicsResponse struct {
652
	t TestReporter
653
}
654

655
func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
656
	return &MockDeleteTopicsResponse{t: t}
657
}
658

659
func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
660
	req := reqBody.(*DeleteTopicsRequest)
661
	res := &DeleteTopicsResponse{}
662
	res.TopicErrorCodes = make(map[string]KError)
663

664
	for _, topic := range req.Topics {
665
		res.TopicErrorCodes[topic] = ErrNoError
666
	}
667
	res.Version = req.Version
668
	return res
669
}
670

671
type MockCreatePartitionsResponse struct {
672
	t TestReporter
673
}
674

675
func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
676
	return &MockCreatePartitionsResponse{t: t}
677
}
678

679
func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
680
	req := reqBody.(*CreatePartitionsRequest)
681
	res := &CreatePartitionsResponse{}
682
	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
683

684
	for topic := range req.TopicPartitions {
685
		if strings.HasPrefix(topic, "_") {
686
			msg := "insufficient permissions to create partition on topic with reserved prefix"
687
			res.TopicPartitionErrors[topic] = &TopicPartitionError{
688
				Err:    ErrTopicAuthorizationFailed,
689
				ErrMsg: &msg,
690
			}
691
			continue
692
		}
693
		res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
694
	}
695
	return res
696
}
697

698
type MockAlterPartitionReassignmentsResponse struct {
699
	t TestReporter
700
}
701

702
func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
703
	return &MockAlterPartitionReassignmentsResponse{t: t}
704
}
705

706
func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
707
	req := reqBody.(*AlterPartitionReassignmentsRequest)
708
	_ = req
709
	res := &AlterPartitionReassignmentsResponse{}
710
	return res
711
}
712

713
type MockListPartitionReassignmentsResponse struct {
714
	t TestReporter
715
}
716

717
func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
718
	return &MockListPartitionReassignmentsResponse{t: t}
719
}
720

721
func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
722
	req := reqBody.(*ListPartitionReassignmentsRequest)
723
	_ = req
724
	res := &ListPartitionReassignmentsResponse{}
725

726
	for topic, partitions := range req.blocks {
727
		for _, partition := range partitions {
728
			res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
729
		}
730
	}
731

732
	return res
733
}
734

735
type MockDeleteRecordsResponse struct {
736
	t TestReporter
737
}
738

739
func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
740
	return &MockDeleteRecordsResponse{t: t}
741
}
742

743
func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
744
	req := reqBody.(*DeleteRecordsRequest)
745
	res := &DeleteRecordsResponse{}
746
	res.Topics = make(map[string]*DeleteRecordsResponseTopic)
747

748
	for topic, deleteRecordRequestTopic := range req.Topics {
749
		partitions := make(map[int32]*DeleteRecordsResponsePartition)
750
		for partition := range deleteRecordRequestTopic.PartitionOffsets {
751
			partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
752
		}
753
		res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
754
	}
755
	return res
756
}
757

758
type MockDescribeConfigsResponse struct {
759
	t TestReporter
760
}
761

762
func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
763
	return &MockDescribeConfigsResponse{t: t}
764
}
765

766
func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
767
	req := reqBody.(*DescribeConfigsRequest)
768
	res := &DescribeConfigsResponse{
769
		Version: req.Version,
770
	}
771

772
	includeSynonyms := req.Version > 0
773
	includeSource := req.Version > 0
774

775
	for _, r := range req.Resources {
776
		var configEntries []*ConfigEntry
777
		switch r.Type {
778
		case BrokerResource:
779
			configEntries = append(configEntries,
780
				&ConfigEntry{
781
					Name:     "min.insync.replicas",
782
					Value:    "2",
783
					ReadOnly: false,
784
					Default:  false,
785
				},
786
			)
787
			res.Resources = append(res.Resources, &ResourceResponse{
788
				Name:    r.Name,
789
				Configs: configEntries,
790
			})
791
		case BrokerLoggerResource:
792
			configEntries = append(configEntries,
793
				&ConfigEntry{
794
					Name:     "kafka.controller.KafkaController",
795
					Value:    "DEBUG",
796
					ReadOnly: false,
797
					Default:  false,
798
				},
799
			)
800
			res.Resources = append(res.Resources, &ResourceResponse{
801
				Name:    r.Name,
802
				Configs: configEntries,
803
			})
804
		case TopicResource:
805
			maxMessageBytes := &ConfigEntry{
806
				Name:      "max.message.bytes",
807
				Value:     "1000000",
808
				ReadOnly:  false,
809
				Default:   !includeSource,
810
				Sensitive: false,
811
			}
812
			if includeSource {
813
				maxMessageBytes.Source = SourceDefault
814
			}
815
			if includeSynonyms {
816
				maxMessageBytes.Synonyms = []*ConfigSynonym{
817
					{
818
						ConfigName:  "max.message.bytes",
819
						ConfigValue: "500000",
820
					},
821
				}
822
			}
823
			retentionMs := &ConfigEntry{
824
				Name:      "retention.ms",
825
				Value:     "5000",
826
				ReadOnly:  false,
827
				Default:   false,
828
				Sensitive: false,
829
			}
830
			if includeSynonyms {
831
				retentionMs.Synonyms = []*ConfigSynonym{
832
					{
833
						ConfigName:  "log.retention.ms",
834
						ConfigValue: "2500",
835
					},
836
				}
837
			}
838
			password := &ConfigEntry{
839
				Name:      "password",
840
				Value:     "12345",
841
				ReadOnly:  false,
842
				Default:   false,
843
				Sensitive: true,
844
			}
845
			configEntries = append(
846
				configEntries, maxMessageBytes, retentionMs, password)
847
			res.Resources = append(res.Resources, &ResourceResponse{
848
				Name:    r.Name,
849
				Configs: configEntries,
850
			})
851
		}
852
	}
853
	return res
854
}
855

856
type MockDescribeConfigsResponseWithErrorCode struct {
857
	t TestReporter
858
}
859

860
func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
861
	return &MockDescribeConfigsResponseWithErrorCode{t: t}
862
}
863

864
func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
865
	req := reqBody.(*DescribeConfigsRequest)
866
	res := &DescribeConfigsResponse{
867
		Version: req.Version,
868
	}
869

870
	for _, r := range req.Resources {
871
		res.Resources = append(res.Resources, &ResourceResponse{
872
			Name:      r.Name,
873
			Type:      r.Type,
874
			ErrorCode: 83,
875
			ErrorMsg:  "",
876
		})
877
	}
878
	return res
879
}
880

881
type MockAlterConfigsResponse struct {
882
	t TestReporter
883
}
884

885
func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
886
	return &MockAlterConfigsResponse{t: t}
887
}
888

889
func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
890
	req := reqBody.(*AlterConfigsRequest)
891
	res := &AlterConfigsResponse{}
892

893
	for _, r := range req.Resources {
894
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
895
			Name:     r.Name,
896
			Type:     r.Type,
897
			ErrorMsg: "",
898
		})
899
	}
900
	return res
901
}
902

903
type MockAlterConfigsResponseWithErrorCode struct {
904
	t TestReporter
905
}
906

907
func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
908
	return &MockAlterConfigsResponseWithErrorCode{t: t}
909
}
910

911
func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
912
	req := reqBody.(*AlterConfigsRequest)
913
	res := &AlterConfigsResponse{}
914

915
	for _, r := range req.Resources {
916
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
917
			Name:      r.Name,
918
			Type:      r.Type,
919
			ErrorCode: 83,
920
			ErrorMsg:  "",
921
		})
922
	}
923
	return res
924
}
925

926
type MockIncrementalAlterConfigsResponse struct {
927
	t TestReporter
928
}
929

930
func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
931
	return &MockIncrementalAlterConfigsResponse{t: t}
932
}
933

934
func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
935
	req := reqBody.(*IncrementalAlterConfigsRequest)
936
	res := &IncrementalAlterConfigsResponse{}
937

938
	for _, r := range req.Resources {
939
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
940
			Name:     r.Name,
941
			Type:     r.Type,
942
			ErrorMsg: "",
943
		})
944
	}
945
	return res
946
}
947

948
type MockIncrementalAlterConfigsResponseWithErrorCode struct {
949
	t TestReporter
950
}
951

952
func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
953
	return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
954
}
955

956
func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
957
	req := reqBody.(*IncrementalAlterConfigsRequest)
958
	res := &IncrementalAlterConfigsResponse{}
959

960
	for _, r := range req.Resources {
961
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
962
			Name:      r.Name,
963
			Type:      r.Type,
964
			ErrorCode: 83,
965
			ErrorMsg:  "",
966
		})
967
	}
968
	return res
969
}
970

971
type MockCreateAclsResponse struct {
972
	t TestReporter
973
}
974

975
func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
976
	return &MockCreateAclsResponse{t: t}
977
}
978

979
func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
980
	req := reqBody.(*CreateAclsRequest)
981
	res := &CreateAclsResponse{}
982

983
	for range req.AclCreations {
984
		res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
985
	}
986
	return res
987
}
988

989
type MockCreateAclsResponseError struct {
990
	t TestReporter
991
}
992

993
func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {
994
	return &MockCreateAclsResponseError{t: t}
995
}
996

997
func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {
998
	req := reqBody.(*CreateAclsRequest)
999
	res := &CreateAclsResponse{}
1000

1001
	for range req.AclCreations {
1002
		res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})
1003
	}
1004
	return res
1005
}
1006

1007
type MockListAclsResponse struct {
1008
	t TestReporter
1009
}
1010

1011
func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
1012
	return &MockListAclsResponse{t: t}
1013
}
1014

1015
func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1016
	req := reqBody.(*DescribeAclsRequest)
1017
	res := &DescribeAclsResponse{}
1018
	res.Err = ErrNoError
1019
	acl := &ResourceAcls{}
1020
	if req.ResourceName != nil {
1021
		acl.Resource.ResourceName = *req.ResourceName
1022
	}
1023
	acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
1024
	acl.Resource.ResourceType = req.ResourceType
1025

1026
	host := "*"
1027
	if req.Host != nil {
1028
		host = *req.Host
1029
	}
1030

1031
	principal := "User:test"
1032
	if req.Principal != nil {
1033
		principal = *req.Principal
1034
	}
1035

1036
	permissionType := req.PermissionType
1037
	if permissionType == AclPermissionAny {
1038
		permissionType = AclPermissionAllow
1039
	}
1040

1041
	acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
1042
	res.ResourceAcls = append(res.ResourceAcls, acl)
1043
	res.Version = int16(req.Version)
1044
	return res
1045
}
1046

1047
type MockSaslAuthenticateResponse struct {
1048
	t                 TestReporter
1049
	kerror            KError
1050
	saslAuthBytes     []byte
1051
	sessionLifetimeMs int64
1052
}
1053

1054
func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
1055
	return &MockSaslAuthenticateResponse{t: t}
1056
}
1057

1058
func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
1059
	req := reqBody.(*SaslAuthenticateRequest)
1060
	res := &SaslAuthenticateResponse{}
1061
	res.Version = req.Version
1062
	res.Err = msar.kerror
1063
	res.SaslAuthBytes = msar.saslAuthBytes
1064
	res.SessionLifetimeMs = msar.sessionLifetimeMs
1065
	return res
1066
}
1067

1068
func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
1069
	msar.kerror = kerror
1070
	return msar
1071
}
1072

1073
func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
1074
	msar.saslAuthBytes = saslAuthBytes
1075
	return msar
1076
}
1077

1078
func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse {
1079
	msar.sessionLifetimeMs = sessionLifetimeMs
1080
	return msar
1081
}
1082

1083
type MockDeleteAclsResponse struct {
1084
	t TestReporter
1085
}
1086

1087
type MockSaslHandshakeResponse struct {
1088
	enabledMechanisms []string
1089
	kerror            KError
1090
	t                 TestReporter
1091
}
1092

1093
func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
1094
	return &MockSaslHandshakeResponse{t: t}
1095
}
1096

1097
func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
1098
	res := &SaslHandshakeResponse{}
1099
	res.Err = mshr.kerror
1100
	res.EnabledMechanisms = mshr.enabledMechanisms
1101
	return res
1102
}
1103

1104
func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
1105
	mshr.kerror = kerror
1106
	return mshr
1107
}
1108

1109
func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
1110
	mshr.enabledMechanisms = enabledMechanisms
1111
	return mshr
1112
}
1113

1114
func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
1115
	return &MockDeleteAclsResponse{t: t}
1116
}
1117

1118
func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1119
	req := reqBody.(*DeleteAclsRequest)
1120
	res := &DeleteAclsResponse{}
1121

1122
	for range req.Filters {
1123
		response := &FilterResponse{Err: ErrNoError}
1124
		response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
1125
		res.FilterResponses = append(res.FilterResponses, response)
1126
	}
1127
	res.Version = int16(req.Version)
1128
	return res
1129
}
1130

1131
type MockDeleteGroupsResponse struct {
1132
	deletedGroups []string
1133
}
1134

1135
func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
1136
	return &MockDeleteGroupsResponse{}
1137
}
1138

1139
func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
1140
	m.deletedGroups = groups
1141
	return m
1142
}
1143

1144
func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1145
	resp := &DeleteGroupsResponse{
1146
		GroupErrorCodes: map[string]KError{},
1147
	}
1148
	for _, group := range m.deletedGroups {
1149
		resp.GroupErrorCodes[group] = ErrNoError
1150
	}
1151
	return resp
1152
}
1153

1154
type MockDeleteOffsetResponse struct {
1155
	errorCode      KError
1156
	topic          string
1157
	partition      int32
1158
	errorPartition KError
1159
}
1160

1161
func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
1162
	return &MockDeleteOffsetResponse{}
1163
}
1164

1165
func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
1166
	m.errorCode = errorCode
1167
	m.topic = topic
1168
	m.partition = partition
1169
	m.errorPartition = errorPartition
1170
	return m
1171
}
1172

1173
func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
1174
	resp := &DeleteOffsetsResponse{
1175
		ErrorCode: m.errorCode,
1176
		Errors: map[string]map[int32]KError{
1177
			m.topic: {m.partition: m.errorPartition},
1178
		},
1179
	}
1180
	return resp
1181
}
1182

1183
type MockJoinGroupResponse struct {
1184
	t TestReporter
1185

1186
	ThrottleTime  int32
1187
	Err           KError
1188
	GenerationId  int32
1189
	GroupProtocol string
1190
	LeaderId      string
1191
	MemberId      string
1192
	Members       map[string][]byte
1193
}
1194

1195
func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
1196
	return &MockJoinGroupResponse{
1197
		t:       t,
1198
		Members: make(map[string][]byte),
1199
	}
1200
}
1201

1202
func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1203
	req := reqBody.(*JoinGroupRequest)
1204
	resp := &JoinGroupResponse{
1205
		Version:       req.Version,
1206
		ThrottleTime:  m.ThrottleTime,
1207
		Err:           m.Err,
1208
		GenerationId:  m.GenerationId,
1209
		GroupProtocol: m.GroupProtocol,
1210
		LeaderId:      m.LeaderId,
1211
		MemberId:      m.MemberId,
1212
		Members:       m.Members,
1213
	}
1214
	return resp
1215
}
1216

1217
func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
1218
	m.ThrottleTime = t
1219
	return m
1220
}
1221

1222
func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
1223
	m.Err = kerr
1224
	return m
1225
}
1226

1227
func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
1228
	m.GenerationId = id
1229
	return m
1230
}
1231

1232
func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
1233
	m.GroupProtocol = proto
1234
	return m
1235
}
1236

1237
func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
1238
	m.LeaderId = id
1239
	return m
1240
}
1241

1242
func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
1243
	m.MemberId = id
1244
	return m
1245
}
1246

1247
func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
1248
	bin, err := encode(meta, nil)
1249
	if err != nil {
1250
		panic(fmt.Sprintf("error encoding member metadata: %v", err))
1251
	}
1252
	m.Members[id] = bin
1253
	return m
1254
}
1255

1256
type MockLeaveGroupResponse struct {
1257
	t TestReporter
1258

1259
	Err KError
1260
}
1261

1262
func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
1263
	return &MockLeaveGroupResponse{t: t}
1264
}
1265

1266
func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1267
	resp := &LeaveGroupResponse{
1268
		Err: m.Err,
1269
	}
1270
	return resp
1271
}
1272

1273
func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
1274
	m.Err = kerr
1275
	return m
1276
}
1277

1278
type MockSyncGroupResponse struct {
1279
	t TestReporter
1280

1281
	Err              KError
1282
	MemberAssignment []byte
1283
}
1284

1285
func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
1286
	return &MockSyncGroupResponse{t: t}
1287
}
1288

1289
func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1290
	resp := &SyncGroupResponse{
1291
		Err:              m.Err,
1292
		MemberAssignment: m.MemberAssignment,
1293
	}
1294
	return resp
1295
}
1296

1297
func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
1298
	m.Err = kerr
1299
	return m
1300
}
1301

1302
func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
1303
	bin, err := encode(assignment, nil)
1304
	if err != nil {
1305
		panic(fmt.Sprintf("error encoding member assignment: %v", err))
1306
	}
1307
	m.MemberAssignment = bin
1308
	return m
1309
}
1310

1311
type MockHeartbeatResponse struct {
1312
	t TestReporter
1313

1314
	Err KError
1315
}
1316

1317
func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
1318
	return &MockHeartbeatResponse{t: t}
1319
}
1320

1321
func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
1322
	resp := &HeartbeatResponse{}
1323
	return resp
1324
}
1325

1326
func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
1327
	m.Err = kerr
1328
	return m
1329
}
1330

1331
type MockDescribeLogDirsResponse struct {
1332
	t       TestReporter
1333
	logDirs []DescribeLogDirsResponseDirMetadata
1334
}
1335

1336
func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
1337
	return &MockDescribeLogDirsResponse{t: t}
1338
}
1339

1340
func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
1341
	var topics []DescribeLogDirsResponseTopic
1342
	for topic := range topicPartitions {
1343
		var partitions []DescribeLogDirsResponsePartition
1344
		for i := 0; i < topicPartitions[topic]; i++ {
1345
			partitions = append(partitions, DescribeLogDirsResponsePartition{
1346
				PartitionID: int32(i),
1347
				IsTemporary: false,
1348
				OffsetLag:   int64(0),
1349
				Size:        int64(1234),
1350
			})
1351
		}
1352
		topics = append(topics, DescribeLogDirsResponseTopic{
1353
			Topic:      topic,
1354
			Partitions: partitions,
1355
		})
1356
	}
1357
	logDir := DescribeLogDirsResponseDirMetadata{
1358
		ErrorCode: ErrNoError,
1359
		Path:      logDirPath,
1360
		Topics:    topics,
1361
	}
1362
	m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
1363
	return m
1364
}
1365

1366
func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1367
	resp := &DescribeLogDirsResponse{
1368
		LogDirs: m.logDirs,
1369
	}
1370
	return resp
1371
}
1372

1373
type MockApiVersionsResponse struct {
1374
	t       TestReporter
1375
	apiKeys []ApiVersionsResponseKey
1376
}
1377

1378
func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
1379
	return &MockApiVersionsResponse{
1380
		t: t,
1381
		apiKeys: []ApiVersionsResponseKey{
1382
			{
1383
				ApiKey:     0,
1384
				MinVersion: 5,
1385
				MaxVersion: 8,
1386
			},
1387
			{
1388
				ApiKey:     1,
1389
				MinVersion: 7,
1390
				MaxVersion: 11,
1391
			},
1392
		},
1393
	}
1394
}
1395

1396
func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {
1397
	m.apiKeys = apiKeys
1398
	return m
1399
}
1400

1401
func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1402
	req := reqBody.(*ApiVersionsRequest)
1403
	res := &ApiVersionsResponse{
1404
		Version: req.Version,
1405
		ApiKeys: m.apiKeys,
1406
	}
1407
	return res
1408
}
1409

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.