cubefs
1408 строк · 37.0 Кб
1package sarama2
3import (4"fmt"5"strings"6"sync"7)
8
9// TestReporter has methods matching go's testing.T to avoid importing
10// `testing` in the main part of the library.
11type TestReporter interface {12Error(...interface{})13Errorf(string, ...interface{})14Fatal(...interface{})15Fatalf(string, ...interface{})16}
17
18// MockResponse is a response builder interface it defines one method that
19// allows generating a response based on a request body. MockResponses are used
20// to program behavior of MockBroker in tests.
21type MockResponse interface {22For(reqBody versionedDecoder) (res encoderWithHeader)23}
24
25// MockWrapper is a mock response builder that returns a particular concrete
26// response regardless of the actual request passed to the `For` method.
27type MockWrapper struct {28res encoderWithHeader
29}
30
31func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {32return mw.res33}
34
35func NewMockWrapper(res encoderWithHeader) *MockWrapper {36return &MockWrapper{res: res}37}
38
39// MockSequence is a mock response builder that is created from a sequence of
40// concrete responses. Every time when a `MockBroker` calls its `For` method
41// the next response from the sequence is returned. When the end of the
42// sequence is reached the last element from the sequence is returned.
43type MockSequence struct {44responses []MockResponse45}
46
47func NewMockSequence(responses ...interface{}) *MockSequence {48ms := &MockSequence{}49ms.responses = make([]MockResponse, len(responses))50for i, res := range responses {51switch res := res.(type) {52case MockResponse:53ms.responses[i] = res54case encoderWithHeader:55ms.responses[i] = NewMockWrapper(res)56default:57panic(fmt.Sprintf("Unexpected response type: %T", res))58}59}60return ms61}
62
63func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {64res = mc.responses[0].For(reqBody)65if len(mc.responses) > 1 {66mc.responses = mc.responses[1:]67}68return res69}
70
71type MockListGroupsResponse struct {72groups map[string]string73t TestReporter
74}
75
76func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {77return &MockListGroupsResponse{78groups: make(map[string]string),79t: t,80}81}
82
83func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {84request := reqBody.(*ListGroupsRequest)85_ = request86response := &ListGroupsResponse{87Groups: m.groups,88}89return response90}
91
92func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {93m.groups[groupID] = protocolType94return m95}
96
97type MockDescribeGroupsResponse struct {98groups map[string]*GroupDescription99t TestReporter
100}
101
102func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {103return &MockDescribeGroupsResponse{104t: t,105groups: make(map[string]*GroupDescription),106}107}
108
109func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {110m.groups[groupID] = description111return m112}
113
114func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {115request := reqBody.(*DescribeGroupsRequest)116
117response := &DescribeGroupsResponse{}118for _, requestedGroup := range request.Groups {119if group, ok := m.groups[requestedGroup]; ok {120response.Groups = append(response.Groups, group)121} else {122// Mimic real kafka - if a group doesn't exist, return123// an entry with state "Dead"124response.Groups = append(response.Groups, &GroupDescription{125GroupId: requestedGroup,126State: "Dead",127})128}129}130
131return response132}
133
134// MockMetadataResponse is a `MetadataResponse` builder.
135type MockMetadataResponse struct {136controllerID int32137leaders map[string]map[int32]int32138brokers map[string]int32139t TestReporter
140}
141
142func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {143return &MockMetadataResponse{144leaders: make(map[string]map[int32]int32),145brokers: make(map[string]int32),146t: t,147}148}
149
150func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {151partitions := mmr.leaders[topic]152if partitions == nil {153partitions = make(map[int32]int32)154mmr.leaders[topic] = partitions155}156partitions[partition] = brokerID157return mmr158}
159
160func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {161mmr.brokers[addr] = brokerID162return mmr163}
164
165func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {166mmr.controllerID = brokerID167return mmr168}
169
170func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {171metadataRequest := reqBody.(*MetadataRequest)172metadataResponse := &MetadataResponse{173Version: metadataRequest.version(),174ControllerID: mmr.controllerID,175}176for addr, brokerID := range mmr.brokers {177metadataResponse.AddBroker(addr, brokerID)178}179
180// Generate set of replicas181var replicas []int32182var offlineReplicas []int32183for _, brokerID := range mmr.brokers {184replicas = append(replicas, brokerID)185}186
187if len(metadataRequest.Topics) == 0 {188for topic, partitions := range mmr.leaders {189for partition, brokerID := range partitions {190metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)191}192}193return metadataResponse194}195for _, topic := range metadataRequest.Topics {196for partition, brokerID := range mmr.leaders[topic] {197metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)198}199}200return metadataResponse201}
202
203// MockOffsetResponse is an `OffsetResponse` builder.
204type MockOffsetResponse struct {205offsets map[string]map[int32]map[int64]int64206t TestReporter
207}
208
209func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {210return &MockOffsetResponse{211offsets: make(map[string]map[int32]map[int64]int64),212t: t,213}214}
215
216func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {217partitions := mor.offsets[topic]218if partitions == nil {219partitions = make(map[int32]map[int64]int64)220mor.offsets[topic] = partitions221}222times := partitions[partition]223if times == nil {224times = make(map[int64]int64)225partitions[partition] = times226}227times[time] = offset228return mor229}
230
231func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {232offsetRequest := reqBody.(*OffsetRequest)233offsetResponse := &OffsetResponse{Version: offsetRequest.Version}234for topic, partitions := range offsetRequest.blocks {235for partition, block := range partitions {236offset := mor.getOffset(topic, partition, block.time)237offsetResponse.AddTopicPartition(topic, partition, offset)238}239}240return offsetResponse241}
242
243func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {244partitions := mor.offsets[topic]245if partitions == nil {246mor.t.Errorf("missing topic: %s", topic)247}248times := partitions[partition]249if times == nil {250mor.t.Errorf("missing partition: %d", partition)251}252offset, ok := times[time]253if !ok {254mor.t.Errorf("missing time: %d", time)255}256return offset257}
258
259// MockFetchResponse is a `FetchResponse` builder.
260type MockFetchResponse struct {261messages map[string]map[int32]map[int64]Encoder262messagesLock *sync.RWMutex263highWaterMarks map[string]map[int32]int64264t TestReporter
265batchSize int266}
267
268func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {269return &MockFetchResponse{270messages: make(map[string]map[int32]map[int64]Encoder),271messagesLock: &sync.RWMutex{},272highWaterMarks: make(map[string]map[int32]int64),273t: t,274batchSize: batchSize,275}276}
277
278func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {279mfr.messagesLock.Lock()280defer mfr.messagesLock.Unlock()281partitions := mfr.messages[topic]282if partitions == nil {283partitions = make(map[int32]map[int64]Encoder)284mfr.messages[topic] = partitions285}286messages := partitions[partition]287if messages == nil {288messages = make(map[int64]Encoder)289partitions[partition] = messages290}291messages[offset] = msg292return mfr293}
294
295func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {296partitions := mfr.highWaterMarks[topic]297if partitions == nil {298partitions = make(map[int32]int64)299mfr.highWaterMarks[topic] = partitions300}301partitions[partition] = offset302return mfr303}
304
305func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {306fetchRequest := reqBody.(*FetchRequest)307res := &FetchResponse{308Version: fetchRequest.Version,309}310for topic, partitions := range fetchRequest.blocks {311for partition, block := range partitions {312initialOffset := block.fetchOffset313offset := initialOffset314maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))315for i := 0; i < mfr.batchSize && offset < maxOffset; {316msg := mfr.getMessage(topic, partition, offset)317if msg != nil {318res.AddMessage(topic, partition, nil, msg, offset)319i++320}321offset++322}323fb := res.GetBlock(topic, partition)324if fb == nil {325res.AddError(topic, partition, ErrNoError)326fb = res.GetBlock(topic, partition)327}328fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)329}330}331return res332}
333
334func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {335mfr.messagesLock.RLock()336defer mfr.messagesLock.RUnlock()337partitions := mfr.messages[topic]338if partitions == nil {339return nil340}341messages := partitions[partition]342if messages == nil {343return nil344}345return messages[offset]346}
347
348func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {349mfr.messagesLock.RLock()350defer mfr.messagesLock.RUnlock()351partitions := mfr.messages[topic]352if partitions == nil {353return 0354}355messages := partitions[partition]356if messages == nil {357return 0358}359return len(messages)360}
361
362func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {363partitions := mfr.highWaterMarks[topic]364if partitions == nil {365return 0366}367return partitions[partition]368}
369
370// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
371type MockConsumerMetadataResponse struct {372coordinators map[string]interface{}373t TestReporter
374}
375
376func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {377return &MockConsumerMetadataResponse{378coordinators: make(map[string]interface{}),379t: t,380}381}
382
383func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {384mr.coordinators[group] = broker385return mr386}
387
388func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {389mr.coordinators[group] = kerror390return mr391}
392
393func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {394req := reqBody.(*ConsumerMetadataRequest)395group := req.ConsumerGroup396res := &ConsumerMetadataResponse{}397v := mr.coordinators[group]398switch v := v.(type) {399case *MockBroker:400res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}401case KError:402res.Err = v403}404return res405}
406
407// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
408type MockFindCoordinatorResponse struct {409groupCoordinators map[string]interface{}410transCoordinators map[string]interface{}411t TestReporter
412}
413
414func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {415return &MockFindCoordinatorResponse{416groupCoordinators: make(map[string]interface{}),417transCoordinators: make(map[string]interface{}),418t: t,419}420}
421
422func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {423switch coordinatorType {424case CoordinatorGroup:425mr.groupCoordinators[group] = broker426case CoordinatorTransaction:427mr.transCoordinators[group] = broker428}429return mr430}
431
432func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {433switch coordinatorType {434case CoordinatorGroup:435mr.groupCoordinators[group] = kerror436case CoordinatorTransaction:437mr.transCoordinators[group] = kerror438}439return mr440}
441
442func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {443req := reqBody.(*FindCoordinatorRequest)444res := &FindCoordinatorResponse{}445var v interface{}446switch req.CoordinatorType {447case CoordinatorGroup:448v = mr.groupCoordinators[req.CoordinatorKey]449case CoordinatorTransaction:450v = mr.transCoordinators[req.CoordinatorKey]451}452switch v := v.(type) {453case *MockBroker:454res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}455case KError:456res.Err = v457}458return res459}
460
461// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
462type MockOffsetCommitResponse struct {463errors map[string]map[string]map[int32]KError464t TestReporter
465}
466
467func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {468return &MockOffsetCommitResponse{t: t}469}
470
471func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {472if mr.errors == nil {473mr.errors = make(map[string]map[string]map[int32]KError)474}475topics := mr.errors[group]476if topics == nil {477topics = make(map[string]map[int32]KError)478mr.errors[group] = topics479}480partitions := topics[topic]481if partitions == nil {482partitions = make(map[int32]KError)483topics[topic] = partitions484}485partitions[partition] = kerror486return mr487}
488
489func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {490req := reqBody.(*OffsetCommitRequest)491group := req.ConsumerGroup492res := &OffsetCommitResponse{}493for topic, partitions := range req.blocks {494for partition := range partitions {495res.AddError(topic, partition, mr.getError(group, topic, partition))496}497}498return res499}
500
501func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {502topics := mr.errors[group]503if topics == nil {504return ErrNoError505}506partitions := topics[topic]507if partitions == nil {508return ErrNoError509}510kerror, ok := partitions[partition]511if !ok {512return ErrNoError513}514return kerror515}
516
517// MockProduceResponse is a `ProduceResponse` builder.
518type MockProduceResponse struct {519version int16520errors map[string]map[int32]KError521t TestReporter
522}
523
524func NewMockProduceResponse(t TestReporter) *MockProduceResponse {525return &MockProduceResponse{t: t}526}
527
528func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {529mr.version = version530return mr531}
532
533func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {534if mr.errors == nil {535mr.errors = make(map[string]map[int32]KError)536}537partitions := mr.errors[topic]538if partitions == nil {539partitions = make(map[int32]KError)540mr.errors[topic] = partitions541}542partitions[partition] = kerror543return mr544}
545
546func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {547req := reqBody.(*ProduceRequest)548res := &ProduceResponse{549Version: mr.version,550}551for topic, partitions := range req.records {552for partition := range partitions {553res.AddTopicPartition(topic, partition, mr.getError(topic, partition))554}555}556return res557}
558
559func (mr *MockProduceResponse) getError(topic string, partition int32) KError {560partitions := mr.errors[topic]561if partitions == nil {562return ErrNoError563}564kerror, ok := partitions[partition]565if !ok {566return ErrNoError567}568return kerror569}
570
571// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
572type MockOffsetFetchResponse struct {573offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock574error KError575t TestReporter
576}
577
578func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {579return &MockOffsetFetchResponse{t: t}580}
581
582func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {583if mr.offsets == nil {584mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)585}586topics := mr.offsets[group]587if topics == nil {588topics = make(map[string]map[int32]*OffsetFetchResponseBlock)589mr.offsets[group] = topics590}591partitions := topics[topic]592if partitions == nil {593partitions = make(map[int32]*OffsetFetchResponseBlock)594topics[topic] = partitions595}596partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}597return mr598}
599
600func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {601mr.error = kerror602return mr603}
604
605func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {606req := reqBody.(*OffsetFetchRequest)607group := req.ConsumerGroup608res := &OffsetFetchResponse{Version: req.Version}609
610for topic, partitions := range mr.offsets[group] {611for partition, block := range partitions {612res.AddBlock(topic, partition, block)613}614}615
616if res.Version >= 2 {617res.Err = mr.error618}619return res620}
621
622type MockCreateTopicsResponse struct {623t TestReporter
624}
625
626func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {627return &MockCreateTopicsResponse{t: t}628}
629
630func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {631req := reqBody.(*CreateTopicsRequest)632res := &CreateTopicsResponse{633Version: req.Version,634}635res.TopicErrors = make(map[string]*TopicError)636
637for topic := range req.TopicDetails {638if res.Version >= 1 && strings.HasPrefix(topic, "_") {639msg := "insufficient permissions to create topic with reserved prefix"640res.TopicErrors[topic] = &TopicError{641Err: ErrTopicAuthorizationFailed,642ErrMsg: &msg,643}644continue645}646res.TopicErrors[topic] = &TopicError{Err: ErrNoError}647}648return res649}
650
651type MockDeleteTopicsResponse struct {652t TestReporter
653}
654
655func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {656return &MockDeleteTopicsResponse{t: t}657}
658
659func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {660req := reqBody.(*DeleteTopicsRequest)661res := &DeleteTopicsResponse{}662res.TopicErrorCodes = make(map[string]KError)663
664for _, topic := range req.Topics {665res.TopicErrorCodes[topic] = ErrNoError666}667res.Version = req.Version668return res669}
670
671type MockCreatePartitionsResponse struct {672t TestReporter
673}
674
675func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {676return &MockCreatePartitionsResponse{t: t}677}
678
679func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {680req := reqBody.(*CreatePartitionsRequest)681res := &CreatePartitionsResponse{}682res.TopicPartitionErrors = make(map[string]*TopicPartitionError)683
684for topic := range req.TopicPartitions {685if strings.HasPrefix(topic, "_") {686msg := "insufficient permissions to create partition on topic with reserved prefix"687res.TopicPartitionErrors[topic] = &TopicPartitionError{688Err: ErrTopicAuthorizationFailed,689ErrMsg: &msg,690}691continue692}693res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}694}695return res696}
697
698type MockAlterPartitionReassignmentsResponse struct {699t TestReporter
700}
701
702func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {703return &MockAlterPartitionReassignmentsResponse{t: t}704}
705
706func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {707req := reqBody.(*AlterPartitionReassignmentsRequest)708_ = req709res := &AlterPartitionReassignmentsResponse{}710return res711}
712
713type MockListPartitionReassignmentsResponse struct {714t TestReporter
715}
716
717func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {718return &MockListPartitionReassignmentsResponse{t: t}719}
720
721func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {722req := reqBody.(*ListPartitionReassignmentsRequest)723_ = req724res := &ListPartitionReassignmentsResponse{}725
726for topic, partitions := range req.blocks {727for _, partition := range partitions {728res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})729}730}731
732return res733}
734
735type MockDeleteRecordsResponse struct {736t TestReporter
737}
738
739func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {740return &MockDeleteRecordsResponse{t: t}741}
742
743func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {744req := reqBody.(*DeleteRecordsRequest)745res := &DeleteRecordsResponse{}746res.Topics = make(map[string]*DeleteRecordsResponseTopic)747
748for topic, deleteRecordRequestTopic := range req.Topics {749partitions := make(map[int32]*DeleteRecordsResponsePartition)750for partition := range deleteRecordRequestTopic.PartitionOffsets {751partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}752}753res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}754}755return res756}
757
758type MockDescribeConfigsResponse struct {759t TestReporter
760}
761
762func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {763return &MockDescribeConfigsResponse{t: t}764}
765
766func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {767req := reqBody.(*DescribeConfigsRequest)768res := &DescribeConfigsResponse{769Version: req.Version,770}771
772includeSynonyms := req.Version > 0773includeSource := req.Version > 0774
775for _, r := range req.Resources {776var configEntries []*ConfigEntry777switch r.Type {778case BrokerResource:779configEntries = append(configEntries,780&ConfigEntry{781Name: "min.insync.replicas",782Value: "2",783ReadOnly: false,784Default: false,785},786)787res.Resources = append(res.Resources, &ResourceResponse{788Name: r.Name,789Configs: configEntries,790})791case BrokerLoggerResource:792configEntries = append(configEntries,793&ConfigEntry{794Name: "kafka.controller.KafkaController",795Value: "DEBUG",796ReadOnly: false,797Default: false,798},799)800res.Resources = append(res.Resources, &ResourceResponse{801Name: r.Name,802Configs: configEntries,803})804case TopicResource:805maxMessageBytes := &ConfigEntry{806Name: "max.message.bytes",807Value: "1000000",808ReadOnly: false,809Default: !includeSource,810Sensitive: false,811}812if includeSource {813maxMessageBytes.Source = SourceDefault814}815if includeSynonyms {816maxMessageBytes.Synonyms = []*ConfigSynonym{817{818ConfigName: "max.message.bytes",819ConfigValue: "500000",820},821}822}823retentionMs := &ConfigEntry{824Name: "retention.ms",825Value: "5000",826ReadOnly: false,827Default: false,828Sensitive: false,829}830if includeSynonyms {831retentionMs.Synonyms = []*ConfigSynonym{832{833ConfigName: "log.retention.ms",834ConfigValue: "2500",835},836}837}838password := &ConfigEntry{839Name: "password",840Value: "12345",841ReadOnly: false,842Default: false,843Sensitive: true,844}845configEntries = append(846configEntries, maxMessageBytes, retentionMs, password)847res.Resources = append(res.Resources, &ResourceResponse{848Name: r.Name,849Configs: configEntries,850})851}852}853return res854}
855
856type MockDescribeConfigsResponseWithErrorCode struct {857t TestReporter
858}
859
860func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {861return &MockDescribeConfigsResponseWithErrorCode{t: t}862}
863
864func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {865req := reqBody.(*DescribeConfigsRequest)866res := &DescribeConfigsResponse{867Version: req.Version,868}869
870for _, r := range req.Resources {871res.Resources = append(res.Resources, &ResourceResponse{872Name: r.Name,873Type: r.Type,874ErrorCode: 83,875ErrorMsg: "",876})877}878return res879}
880
881type MockAlterConfigsResponse struct {882t TestReporter
883}
884
885func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {886return &MockAlterConfigsResponse{t: t}887}
888
889func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {890req := reqBody.(*AlterConfigsRequest)891res := &AlterConfigsResponse{}892
893for _, r := range req.Resources {894res.Resources = append(res.Resources, &AlterConfigsResourceResponse{895Name: r.Name,896Type: r.Type,897ErrorMsg: "",898})899}900return res901}
902
903type MockAlterConfigsResponseWithErrorCode struct {904t TestReporter
905}
906
907func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {908return &MockAlterConfigsResponseWithErrorCode{t: t}909}
910
911func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {912req := reqBody.(*AlterConfigsRequest)913res := &AlterConfigsResponse{}914
915for _, r := range req.Resources {916res.Resources = append(res.Resources, &AlterConfigsResourceResponse{917Name: r.Name,918Type: r.Type,919ErrorCode: 83,920ErrorMsg: "",921})922}923return res924}
925
926type MockIncrementalAlterConfigsResponse struct {927t TestReporter
928}
929
930func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {931return &MockIncrementalAlterConfigsResponse{t: t}932}
933
934func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {935req := reqBody.(*IncrementalAlterConfigsRequest)936res := &IncrementalAlterConfigsResponse{}937
938for _, r := range req.Resources {939res.Resources = append(res.Resources, &AlterConfigsResourceResponse{940Name: r.Name,941Type: r.Type,942ErrorMsg: "",943})944}945return res946}
947
948type MockIncrementalAlterConfigsResponseWithErrorCode struct {949t TestReporter
950}
951
952func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {953return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}954}
955
956func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {957req := reqBody.(*IncrementalAlterConfigsRequest)958res := &IncrementalAlterConfigsResponse{}959
960for _, r := range req.Resources {961res.Resources = append(res.Resources, &AlterConfigsResourceResponse{962Name: r.Name,963Type: r.Type,964ErrorCode: 83,965ErrorMsg: "",966})967}968return res969}
970
971type MockCreateAclsResponse struct {972t TestReporter
973}
974
975func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {976return &MockCreateAclsResponse{t: t}977}
978
979func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {980req := reqBody.(*CreateAclsRequest)981res := &CreateAclsResponse{}982
983for range req.AclCreations {984res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})985}986return res987}
988
989type MockCreateAclsResponseError struct {990t TestReporter
991}
992
993func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {994return &MockCreateAclsResponseError{t: t}995}
996
997func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {998req := reqBody.(*CreateAclsRequest)999res := &CreateAclsResponse{}1000
1001for range req.AclCreations {1002res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})1003}1004return res1005}
1006
1007type MockListAclsResponse struct {1008t TestReporter
1009}
1010
1011func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {1012return &MockListAclsResponse{t: t}1013}
1014
1015func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {1016req := reqBody.(*DescribeAclsRequest)1017res := &DescribeAclsResponse{}1018res.Err = ErrNoError1019acl := &ResourceAcls{}1020if req.ResourceName != nil {1021acl.Resource.ResourceName = *req.ResourceName1022}1023acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter1024acl.Resource.ResourceType = req.ResourceType1025
1026host := "*"1027if req.Host != nil {1028host = *req.Host1029}1030
1031principal := "User:test"1032if req.Principal != nil {1033principal = *req.Principal1034}1035
1036permissionType := req.PermissionType1037if permissionType == AclPermissionAny {1038permissionType = AclPermissionAllow1039}1040
1041acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})1042res.ResourceAcls = append(res.ResourceAcls, acl)1043res.Version = int16(req.Version)1044return res1045}
1046
1047type MockSaslAuthenticateResponse struct {1048t TestReporter
1049kerror KError
1050saslAuthBytes []byte1051sessionLifetimeMs int641052}
1053
1054func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {1055return &MockSaslAuthenticateResponse{t: t}1056}
1057
1058func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {1059req := reqBody.(*SaslAuthenticateRequest)1060res := &SaslAuthenticateResponse{}1061res.Version = req.Version1062res.Err = msar.kerror1063res.SaslAuthBytes = msar.saslAuthBytes1064res.SessionLifetimeMs = msar.sessionLifetimeMs1065return res1066}
1067
1068func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {1069msar.kerror = kerror1070return msar1071}
1072
1073func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {1074msar.saslAuthBytes = saslAuthBytes1075return msar1076}
1077
1078func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse {1079msar.sessionLifetimeMs = sessionLifetimeMs1080return msar1081}
1082
1083type MockDeleteAclsResponse struct {1084t TestReporter
1085}
1086
1087type MockSaslHandshakeResponse struct {1088enabledMechanisms []string1089kerror KError
1090t TestReporter
1091}
1092
1093func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {1094return &MockSaslHandshakeResponse{t: t}1095}
1096
1097func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {1098res := &SaslHandshakeResponse{}1099res.Err = mshr.kerror1100res.EnabledMechanisms = mshr.enabledMechanisms1101return res1102}
1103
1104func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {1105mshr.kerror = kerror1106return mshr1107}
1108
1109func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {1110mshr.enabledMechanisms = enabledMechanisms1111return mshr1112}
1113
1114func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {1115return &MockDeleteAclsResponse{t: t}1116}
1117
1118func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {1119req := reqBody.(*DeleteAclsRequest)1120res := &DeleteAclsResponse{}1121
1122for range req.Filters {1123response := &FilterResponse{Err: ErrNoError}1124response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})1125res.FilterResponses = append(res.FilterResponses, response)1126}1127res.Version = int16(req.Version)1128return res1129}
1130
1131type MockDeleteGroupsResponse struct {1132deletedGroups []string1133}
1134
1135func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {1136return &MockDeleteGroupsResponse{}1137}
1138
1139func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {1140m.deletedGroups = groups1141return m1142}
1143
1144func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {1145resp := &DeleteGroupsResponse{1146GroupErrorCodes: map[string]KError{},1147}1148for _, group := range m.deletedGroups {1149resp.GroupErrorCodes[group] = ErrNoError1150}1151return resp1152}
1153
1154type MockDeleteOffsetResponse struct {1155errorCode KError
1156topic string1157partition int321158errorPartition KError
1159}
1160
1161func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {1162return &MockDeleteOffsetResponse{}1163}
1164
1165func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {1166m.errorCode = errorCode1167m.topic = topic1168m.partition = partition1169m.errorPartition = errorPartition1170return m1171}
1172
1173func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {1174resp := &DeleteOffsetsResponse{1175ErrorCode: m.errorCode,1176Errors: map[string]map[int32]KError{1177m.topic: {m.partition: m.errorPartition},1178},1179}1180return resp1181}
1182
1183type MockJoinGroupResponse struct {1184t TestReporter
1185
1186ThrottleTime int321187Err KError
1188GenerationId int321189GroupProtocol string1190LeaderId string1191MemberId string1192Members map[string][]byte1193}
1194
1195func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {1196return &MockJoinGroupResponse{1197t: t,1198Members: make(map[string][]byte),1199}1200}
1201
1202func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {1203req := reqBody.(*JoinGroupRequest)1204resp := &JoinGroupResponse{1205Version: req.Version,1206ThrottleTime: m.ThrottleTime,1207Err: m.Err,1208GenerationId: m.GenerationId,1209GroupProtocol: m.GroupProtocol,1210LeaderId: m.LeaderId,1211MemberId: m.MemberId,1212Members: m.Members,1213}1214return resp1215}
1216
1217func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {1218m.ThrottleTime = t1219return m1220}
1221
1222func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {1223m.Err = kerr1224return m1225}
1226
1227func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {1228m.GenerationId = id1229return m1230}
1231
1232func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {1233m.GroupProtocol = proto1234return m1235}
1236
1237func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {1238m.LeaderId = id1239return m1240}
1241
1242func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {1243m.MemberId = id1244return m1245}
1246
1247func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {1248bin, err := encode(meta, nil)1249if err != nil {1250panic(fmt.Sprintf("error encoding member metadata: %v", err))1251}1252m.Members[id] = bin1253return m1254}
1255
1256type MockLeaveGroupResponse struct {1257t TestReporter
1258
1259Err KError
1260}
1261
1262func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {1263return &MockLeaveGroupResponse{t: t}1264}
1265
1266func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {1267resp := &LeaveGroupResponse{1268Err: m.Err,1269}1270return resp1271}
1272
1273func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {1274m.Err = kerr1275return m1276}
1277
1278type MockSyncGroupResponse struct {1279t TestReporter
1280
1281Err KError
1282MemberAssignment []byte1283}
1284
1285func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {1286return &MockSyncGroupResponse{t: t}1287}
1288
1289func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {1290resp := &SyncGroupResponse{1291Err: m.Err,1292MemberAssignment: m.MemberAssignment,1293}1294return resp1295}
1296
1297func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {1298m.Err = kerr1299return m1300}
1301
1302func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {1303bin, err := encode(assignment, nil)1304if err != nil {1305panic(fmt.Sprintf("error encoding member assignment: %v", err))1306}1307m.MemberAssignment = bin1308return m1309}
1310
1311type MockHeartbeatResponse struct {1312t TestReporter
1313
1314Err KError
1315}
1316
1317func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {1318return &MockHeartbeatResponse{t: t}1319}
1320
1321func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {1322resp := &HeartbeatResponse{}1323return resp1324}
1325
1326func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {1327m.Err = kerr1328return m1329}
1330
1331type MockDescribeLogDirsResponse struct {1332t TestReporter
1333logDirs []DescribeLogDirsResponseDirMetadata1334}
1335
1336func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {1337return &MockDescribeLogDirsResponse{t: t}1338}
1339
1340func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {1341var topics []DescribeLogDirsResponseTopic1342for topic := range topicPartitions {1343var partitions []DescribeLogDirsResponsePartition1344for i := 0; i < topicPartitions[topic]; i++ {1345partitions = append(partitions, DescribeLogDirsResponsePartition{1346PartitionID: int32(i),1347IsTemporary: false,1348OffsetLag: int64(0),1349Size: int64(1234),1350})1351}1352topics = append(topics, DescribeLogDirsResponseTopic{1353Topic: topic,1354Partitions: partitions,1355})1356}1357logDir := DescribeLogDirsResponseDirMetadata{1358ErrorCode: ErrNoError,1359Path: logDirPath,1360Topics: topics,1361}1362m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}1363return m1364}
1365
1366func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {1367resp := &DescribeLogDirsResponse{1368LogDirs: m.logDirs,1369}1370return resp1371}
1372
1373type MockApiVersionsResponse struct {1374t TestReporter
1375apiKeys []ApiVersionsResponseKey1376}
1377
1378func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {1379return &MockApiVersionsResponse{1380t: t,1381apiKeys: []ApiVersionsResponseKey{1382{1383ApiKey: 0,1384MinVersion: 5,1385MaxVersion: 8,1386},1387{1388ApiKey: 1,1389MinVersion: 7,1390MaxVersion: 11,1391},1392},1393}1394}
1395
1396func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {1397m.apiKeys = apiKeys1398return m1399}
1400
1401func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {1402req := reqBody.(*ApiVersionsRequest)1403res := &ApiVersionsResponse{1404Version: req.Version,1405ApiKeys: m.apiKeys,1406}1407return res1408}
1409