cubefs

Форк
0
428 строк · 11.9 Кб
1
package sarama
2

3
import (
4
	"bytes"
5
	"encoding/binary"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"net"
10
	"reflect"
11
	"strconv"
12
	"sync"
13
	"time"
14

15
	"github.com/davecgh/go-spew/spew"
16
)
17

18
const (
19
	expectationTimeout = 500 * time.Millisecond
20
)
21

22
type GSSApiHandlerFunc func([]byte) []byte
23

24
type requestHandlerFunc func(req *request) (res encoderWithHeader)
25

26
// RequestNotifierFunc is invoked when a mock broker processes a request successfully
27
// and will provides the number of bytes read and written.
28
type RequestNotifierFunc func(bytesRead, bytesWritten int)
29

30
// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
31
// to facilitate testing of higher level or specialized consumers and producers
32
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
33
// but rather provides a facility to do that. It takes care of the TCP
34
// transport, request unmarshalling, response marshaling, and makes it the test
35
// writer responsibility to program correct according to the Kafka API protocol
36
// MockBroker behavior.
37
//
38
// MockBroker is implemented as a TCP server listening on a kernel-selected
39
// localhost port that can accept many connections. It reads Kafka requests
40
// from that connection and returns responses programmed by the SetHandlerByMap
41
// function. If a MockBroker receives a request that it has no programmed
42
// response for, then it returns nothing and the request times out.
43
//
44
// A set of MockRequest builders to define mappings used by MockBroker is
45
// provided by Sarama. But users can develop MockRequests of their own and use
46
// them along with or instead of the standard ones.
47
//
48
// When running tests with MockBroker it is strongly recommended to specify
49
// a timeout to `go test` so that if the broker hangs waiting for a response,
50
// the test panics.
51
//
52
// It is not necessary to prefix message length or correlation ID to your
53
// response bytes, the server does that automatically as a convenience.
54
type MockBroker struct {
55
	brokerID      int32
56
	port          int32
57
	closing       chan none
58
	stopper       chan none
59
	expectations  chan encoderWithHeader
60
	listener      net.Listener
61
	t             TestReporter
62
	latency       time.Duration
63
	handler       requestHandlerFunc
64
	notifier      RequestNotifierFunc
65
	history       []RequestResponse
66
	lock          sync.Mutex
67
	gssApiHandler GSSApiHandlerFunc
68
}
69

70
// RequestResponse represents a Request/Response pair processed by MockBroker.
71
type RequestResponse struct {
72
	Request  protocolBody
73
	Response encoder
74
}
75

76
// SetLatency makes broker pause for the specified period every time before
77
// replying.
78
func (b *MockBroker) SetLatency(latency time.Duration) {
79
	b.latency = latency
80
}
81

82
// SetHandlerByMap defines mapping of Request types to MockResponses. When a
83
// request is received by the broker, it looks up the request type in the map
84
// and uses the found MockResponse instance to generate an appropriate reply.
85
// If the request type is not found in the map then nothing is sent.
86
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
87
	fnMap := make(map[string]MockResponse)
88
	for k, v := range handlerMap {
89
		fnMap[k] = v
90
	}
91
	b.setHandler(func(req *request) (res encoderWithHeader) {
92
		reqTypeName := reflect.TypeOf(req.body).Elem().Name()
93
		mockResponse := fnMap[reqTypeName]
94
		if mockResponse == nil {
95
			return nil
96
		}
97
		return mockResponse.For(req.body)
98
	})
99
}
100

101
// SetNotifier set a function that will get invoked whenever a request has been
102
// processed successfully and will provide the number of bytes read and written
103
func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
104
	b.lock.Lock()
105
	b.notifier = notifier
106
	b.lock.Unlock()
107
}
108

109
// BrokerID returns broker ID assigned to the broker.
110
func (b *MockBroker) BrokerID() int32 {
111
	return b.brokerID
112
}
113

114
// History returns a slice of RequestResponse pairs in the order they were
115
// processed by the broker. Note that in case of multiple connections to the
116
// broker the order expected by a test can be different from the order recorded
117
// in the history, unless some synchronization is implemented in the test.
118
func (b *MockBroker) History() []RequestResponse {
119
	b.lock.Lock()
120
	history := make([]RequestResponse, len(b.history))
121
	copy(history, b.history)
122
	b.lock.Unlock()
123
	return history
124
}
125

126
// Port returns the TCP port number the broker is listening for requests on.
127
func (b *MockBroker) Port() int32 {
128
	return b.port
129
}
130

131
// Addr returns the broker connection string in the form "<address>:<port>".
132
func (b *MockBroker) Addr() string {
133
	return b.listener.Addr().String()
134
}
135

136
// Close terminates the broker blocking until it stops internal goroutines and
137
// releases all resources.
138
func (b *MockBroker) Close() {
139
	close(b.expectations)
140
	if len(b.expectations) > 0 {
141
		buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
142
		for e := range b.expectations {
143
			_, _ = buf.WriteString(spew.Sdump(e))
144
		}
145
		b.t.Error(buf.String())
146
	}
147
	close(b.closing)
148
	<-b.stopper
149
}
150

151
// setHandler sets the specified function as the request handler. Whenever
152
// a mock broker reads a request from the wire it passes the request to the
153
// function and sends back whatever the handler function returns.
154
func (b *MockBroker) setHandler(handler requestHandlerFunc) {
155
	b.lock.Lock()
156
	b.handler = handler
157
	b.lock.Unlock()
158
}
159

160
func (b *MockBroker) serverLoop() {
161
	defer close(b.stopper)
162
	var err error
163
	var conn net.Conn
164

165
	go func() {
166
		<-b.closing
167
		err := b.listener.Close()
168
		if err != nil {
169
			b.t.Error(err)
170
		}
171
	}()
172

173
	wg := &sync.WaitGroup{}
174
	i := 0
175
	for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
176
		wg.Add(1)
177
		go b.handleRequests(conn, i, wg)
178
		i++
179
	}
180
	wg.Wait()
181
	Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
182
}
183

184
func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
185
	b.gssApiHandler = handler
186
}
187

188
func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
189
	var (
190
		bytesRead   int
191
		lengthBytes = make([]byte, 4)
192
	)
193

194
	if _, err := io.ReadFull(r, lengthBytes); err != nil {
195
		return nil, err
196
	}
197

198
	bytesRead += len(lengthBytes)
199
	length := int32(binary.BigEndian.Uint32(lengthBytes))
200

201
	if length <= 4 || length > MaxRequestSize {
202
		return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
203
	}
204

205
	encodedReq := make([]byte, length)
206
	if _, err := io.ReadFull(r, encodedReq); err != nil {
207
		return nil, err
208
	}
209

210
	bytesRead += len(encodedReq)
211

212
	fullBytes := append(lengthBytes, encodedReq...)
213

214
	return fullBytes, nil
215
}
216

217
func (b *MockBroker) isGSSAPI(buffer []byte) bool {
218
	return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
219
}
220

221
func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
222
	defer wg.Done()
223
	defer func() {
224
		_ = conn.Close()
225
	}()
226
	s := spew.NewDefaultConfig()
227
	s.MaxDepth = 1
228
	Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
229
	var err error
230

231
	abort := make(chan none)
232
	defer close(abort)
233
	go func() {
234
		select {
235
		case <-b.closing:
236
			_ = conn.Close()
237
		case <-abort:
238
		}
239
	}()
240

241
	var bytesWritten int
242
	var bytesRead int
243
	for {
244
		buffer, err := b.readToBytes(conn)
245
		if err != nil {
246
			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
247
			b.serverError(err)
248
			break
249
		}
250

251
		bytesWritten = 0
252
		if !b.isGSSAPI(buffer) {
253
			req, br, err := decodeRequest(bytes.NewReader(buffer))
254
			bytesRead = br
255
			if err != nil {
256
				Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
257
				b.serverError(err)
258
				break
259
			}
260

261
			if b.latency > 0 {
262
				time.Sleep(b.latency)
263
			}
264

265
			b.lock.Lock()
266
			res := b.handler(req)
267
			b.history = append(b.history, RequestResponse{req.body, res})
268
			b.lock.Unlock()
269

270
			if res == nil {
271
				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
272
				continue
273
			}
274
			Logger.Printf(
275
				"*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
276
				b.brokerID, idx, req.body, res,
277
				s.Sprintf("%#v", req.body),
278
				s.Sprintf("%#v", res),
279
			)
280

281
			encodedRes, err := encode(res, nil)
282
			if err != nil {
283
				b.serverError(err)
284
				break
285
			}
286
			if len(encodedRes) == 0 {
287
				b.lock.Lock()
288
				if b.notifier != nil {
289
					b.notifier(bytesRead, 0)
290
				}
291
				b.lock.Unlock()
292
				continue
293
			}
294

295
			resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
296
			if _, err = conn.Write(resHeader); err != nil {
297
				b.serverError(err)
298
				break
299
			}
300
			if _, err = conn.Write(encodedRes); err != nil {
301
				b.serverError(err)
302
				break
303
			}
304
			bytesWritten = len(resHeader) + len(encodedRes)
305
		} else {
306
			// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
307
			// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
308
			b.lock.Lock()
309
			res := b.gssApiHandler(buffer)
310
			b.lock.Unlock()
311
			if res == nil {
312
				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
313
				continue
314
			}
315
			if _, err = conn.Write(res); err != nil {
316
				b.serverError(err)
317
				break
318
			}
319
			bytesWritten = len(res)
320
		}
321

322
		b.lock.Lock()
323
		if b.notifier != nil {
324
			b.notifier(bytesRead, bytesWritten)
325
		}
326
		b.lock.Unlock()
327
	}
328
	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
329
}
330

331
func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {
332
	headerLength := uint32(8)
333

334
	if headerVersion >= 1 {
335
		headerLength = 9
336
	}
337

338
	resHeader := make([]byte, headerLength)
339
	binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
340
	binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))
341

342
	if headerVersion >= 1 {
343
		binary.PutUvarint(resHeader[8:], 0)
344
	}
345

346
	return resHeader
347
}
348

349
func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
350
	select {
351
	case res, ok := <-b.expectations:
352
		if !ok {
353
			return nil
354
		}
355
		return res
356
	case <-time.After(expectationTimeout):
357
		return nil
358
	}
359
}
360

361
func (b *MockBroker) serverError(err error) {
362
	isConnectionClosedError := false
363
	opError := &net.OpError{}
364
	if errors.As(err, &opError) {
365
		isConnectionClosedError = true
366
	} else if errors.Is(err, io.EOF) {
367
		isConnectionClosedError = true
368
	} else if err.Error() == "use of closed network connection" {
369
		isConnectionClosedError = true
370
	}
371

372
	if isConnectionClosedError {
373
		return
374
	}
375

376
	b.t.Errorf(err.Error())
377
}
378

379
// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
380
// test framework and a channel of responses to use.  If an error occurs it is
381
// simply logged to the TestReporter and the broker exits.
382
func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
383
	return NewMockBrokerAddr(t, brokerID, "localhost:0")
384
}
385

386
// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
387
// it rather than just some ephemeral port.
388
func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
389
	listener, err := net.Listen("tcp", addr)
390
	if err != nil {
391
		t.Fatal(err)
392
	}
393
	return NewMockBrokerListener(t, brokerID, listener)
394
}
395

396
// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
397
func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
398
	var err error
399

400
	broker := &MockBroker{
401
		closing:      make(chan none),
402
		stopper:      make(chan none),
403
		t:            t,
404
		brokerID:     brokerID,
405
		expectations: make(chan encoderWithHeader, 512),
406
		listener:     listener,
407
	}
408
	broker.handler = broker.defaultRequestHandler
409

410
	Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
411
	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
412
	if err != nil {
413
		t.Fatal(err)
414
	}
415
	tmp, err := strconv.ParseInt(portStr, 10, 32)
416
	if err != nil {
417
		t.Fatal(err)
418
	}
419
	broker.port = int32(tmp)
420

421
	go broker.serverLoop()
422

423
	return broker
424
}
425

426
func (b *MockBroker) Returns(e encoderWithHeader) {
427
	b.expectations <- e
428
}
429

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.