cubefs
428 строк · 11.9 Кб
1package sarama2
3import (4"bytes"5"encoding/binary"6"errors"7"fmt"8"io"9"net"10"reflect"11"strconv"12"sync"13"time"14
15"github.com/davecgh/go-spew/spew"16)
17
18const (19expectationTimeout = 500 * time.Millisecond20)
21
22type GSSApiHandlerFunc func([]byte) []byte23
24type requestHandlerFunc func(req *request) (res encoderWithHeader)25
26// RequestNotifierFunc is invoked when a mock broker processes a request successfully
27// and will provides the number of bytes read and written.
28type RequestNotifierFunc func(bytesRead, bytesWritten int)29
30// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
31// to facilitate testing of higher level or specialized consumers and producers
32// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
33// but rather provides a facility to do that. It takes care of the TCP
34// transport, request unmarshalling, response marshaling, and makes it the test
35// writer responsibility to program correct according to the Kafka API protocol
36// MockBroker behavior.
37//
38// MockBroker is implemented as a TCP server listening on a kernel-selected
39// localhost port that can accept many connections. It reads Kafka requests
40// from that connection and returns responses programmed by the SetHandlerByMap
41// function. If a MockBroker receives a request that it has no programmed
42// response for, then it returns nothing and the request times out.
43//
44// A set of MockRequest builders to define mappings used by MockBroker is
45// provided by Sarama. But users can develop MockRequests of their own and use
46// them along with or instead of the standard ones.
47//
48// When running tests with MockBroker it is strongly recommended to specify
49// a timeout to `go test` so that if the broker hangs waiting for a response,
50// the test panics.
51//
52// It is not necessary to prefix message length or correlation ID to your
53// response bytes, the server does that automatically as a convenience.
54type MockBroker struct {55brokerID int3256port int3257closing chan none58stopper chan none59expectations chan encoderWithHeader60listener net.Listener61t TestReporter
62latency time.Duration63handler requestHandlerFunc
64notifier RequestNotifierFunc
65history []RequestResponse66lock sync.Mutex67gssApiHandler GSSApiHandlerFunc
68}
69
70// RequestResponse represents a Request/Response pair processed by MockBroker.
71type RequestResponse struct {72Request protocolBody
73Response encoder
74}
75
76// SetLatency makes broker pause for the specified period every time before
77// replying.
78func (b *MockBroker) SetLatency(latency time.Duration) {79b.latency = latency80}
81
82// SetHandlerByMap defines mapping of Request types to MockResponses. When a
83// request is received by the broker, it looks up the request type in the map
84// and uses the found MockResponse instance to generate an appropriate reply.
85// If the request type is not found in the map then nothing is sent.
86func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {87fnMap := make(map[string]MockResponse)88for k, v := range handlerMap {89fnMap[k] = v90}91b.setHandler(func(req *request) (res encoderWithHeader) {92reqTypeName := reflect.TypeOf(req.body).Elem().Name()93mockResponse := fnMap[reqTypeName]94if mockResponse == nil {95return nil96}97return mockResponse.For(req.body)98})99}
100
101// SetNotifier set a function that will get invoked whenever a request has been
102// processed successfully and will provide the number of bytes read and written
103func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {104b.lock.Lock()105b.notifier = notifier106b.lock.Unlock()107}
108
109// BrokerID returns broker ID assigned to the broker.
110func (b *MockBroker) BrokerID() int32 {111return b.brokerID112}
113
114// History returns a slice of RequestResponse pairs in the order they were
115// processed by the broker. Note that in case of multiple connections to the
116// broker the order expected by a test can be different from the order recorded
117// in the history, unless some synchronization is implemented in the test.
118func (b *MockBroker) History() []RequestResponse {119b.lock.Lock()120history := make([]RequestResponse, len(b.history))121copy(history, b.history)122b.lock.Unlock()123return history124}
125
126// Port returns the TCP port number the broker is listening for requests on.
127func (b *MockBroker) Port() int32 {128return b.port129}
130
131// Addr returns the broker connection string in the form "<address>:<port>".
132func (b *MockBroker) Addr() string {133return b.listener.Addr().String()134}
135
136// Close terminates the broker blocking until it stops internal goroutines and
137// releases all resources.
138func (b *MockBroker) Close() {139close(b.expectations)140if len(b.expectations) > 0 {141buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))142for e := range b.expectations {143_, _ = buf.WriteString(spew.Sdump(e))144}145b.t.Error(buf.String())146}147close(b.closing)148<-b.stopper149}
150
151// setHandler sets the specified function as the request handler. Whenever
152// a mock broker reads a request from the wire it passes the request to the
153// function and sends back whatever the handler function returns.
154func (b *MockBroker) setHandler(handler requestHandlerFunc) {155b.lock.Lock()156b.handler = handler157b.lock.Unlock()158}
159
160func (b *MockBroker) serverLoop() {161defer close(b.stopper)162var err error163var conn net.Conn164
165go func() {166<-b.closing167err := b.listener.Close()168if err != nil {169b.t.Error(err)170}171}()172
173wg := &sync.WaitGroup{}174i := 0175for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {176wg.Add(1)177go b.handleRequests(conn, i, wg)178i++179}180wg.Wait()181Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)182}
183
184func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {185b.gssApiHandler = handler186}
187
188func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {189var (190bytesRead int191lengthBytes = make([]byte, 4)192)193
194if _, err := io.ReadFull(r, lengthBytes); err != nil {195return nil, err196}197
198bytesRead += len(lengthBytes)199length := int32(binary.BigEndian.Uint32(lengthBytes))200
201if length <= 4 || length > MaxRequestSize {202return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}203}204
205encodedReq := make([]byte, length)206if _, err := io.ReadFull(r, encodedReq); err != nil {207return nil, err208}209
210bytesRead += len(encodedReq)211
212fullBytes := append(lengthBytes, encodedReq...)213
214return fullBytes, nil215}
216
217func (b *MockBroker) isGSSAPI(buffer []byte) bool {218return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})219}
220
221func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {222defer wg.Done()223defer func() {224_ = conn.Close()225}()226s := spew.NewDefaultConfig()227s.MaxDepth = 1228Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)229var err error230
231abort := make(chan none)232defer close(abort)233go func() {234select {235case <-b.closing:236_ = conn.Close()237case <-abort:238}239}()240
241var bytesWritten int242var bytesRead int243for {244buffer, err := b.readToBytes(conn)245if err != nil {246Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))247b.serverError(err)248break249}250
251bytesWritten = 0252if !b.isGSSAPI(buffer) {253req, br, err := decodeRequest(bytes.NewReader(buffer))254bytesRead = br255if err != nil {256Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))257b.serverError(err)258break259}260
261if b.latency > 0 {262time.Sleep(b.latency)263}264
265b.lock.Lock()266res := b.handler(req)267b.history = append(b.history, RequestResponse{req.body, res})268b.lock.Unlock()269
270if res == nil {271Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))272continue273}274Logger.Printf(275"*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",276b.brokerID, idx, req.body, res,277s.Sprintf("%#v", req.body),278s.Sprintf("%#v", res),279)280
281encodedRes, err := encode(res, nil)282if err != nil {283b.serverError(err)284break285}286if len(encodedRes) == 0 {287b.lock.Lock()288if b.notifier != nil {289b.notifier(bytesRead, 0)290}291b.lock.Unlock()292continue293}294
295resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))296if _, err = conn.Write(resHeader); err != nil {297b.serverError(err)298break299}300if _, err = conn.Write(encodedRes); err != nil {301b.serverError(err)302break303}304bytesWritten = len(resHeader) + len(encodedRes)305} else {306// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.307// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism308b.lock.Lock()309res := b.gssApiHandler(buffer)310b.lock.Unlock()311if res == nil {312Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))313continue314}315if _, err = conn.Write(res); err != nil {316b.serverError(err)317break318}319bytesWritten = len(res)320}321
322b.lock.Lock()323if b.notifier != nil {324b.notifier(bytesRead, bytesWritten)325}326b.lock.Unlock()327}328Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)329}
330
331func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {332headerLength := uint32(8)333
334if headerVersion >= 1 {335headerLength = 9336}337
338resHeader := make([]byte, headerLength)339binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)340binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))341
342if headerVersion >= 1 {343binary.PutUvarint(resHeader[8:], 0)344}345
346return resHeader347}
348
349func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {350select {351case res, ok := <-b.expectations:352if !ok {353return nil354}355return res356case <-time.After(expectationTimeout):357return nil358}359}
360
361func (b *MockBroker) serverError(err error) {362isConnectionClosedError := false363opError := &net.OpError{}364if errors.As(err, &opError) {365isConnectionClosedError = true366} else if errors.Is(err, io.EOF) {367isConnectionClosedError = true368} else if err.Error() == "use of closed network connection" {369isConnectionClosedError = true370}371
372if isConnectionClosedError {373return374}375
376b.t.Errorf(err.Error())377}
378
379// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
380// test framework and a channel of responses to use. If an error occurs it is
381// simply logged to the TestReporter and the broker exits.
382func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {383return NewMockBrokerAddr(t, brokerID, "localhost:0")384}
385
386// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
387// it rather than just some ephemeral port.
388func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {389listener, err := net.Listen("tcp", addr)390if err != nil {391t.Fatal(err)392}393return NewMockBrokerListener(t, brokerID, listener)394}
395
396// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
397func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {398var err error399
400broker := &MockBroker{401closing: make(chan none),402stopper: make(chan none),403t: t,404brokerID: brokerID,405expectations: make(chan encoderWithHeader, 512),406listener: listener,407}408broker.handler = broker.defaultRequestHandler409
410Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())411_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())412if err != nil {413t.Fatal(err)414}415tmp, err := strconv.ParseInt(portStr, 10, 32)416if err != nil {417t.Fatal(err)418}419broker.port = int32(tmp)420
421go broker.serverLoop()422
423return broker424}
425
426func (b *MockBroker) Returns(e encoderWithHeader) {427b.expectations <- e428}
429