13
"mosn.io/mosn/pkg/log"
14
"mosn.io/mosn/pkg/network"
15
"mosn.io/mosn/pkg/protocol/xprotocol/bolt" // register bolt
16
"mosn.io/mosn/pkg/stream"
17
_ "mosn.io/mosn/pkg/stream/xprotocol" // register xprotocol
18
mtypes "mosn.io/mosn/pkg/types"
19
"mosn.io/mosn/test/lib"
20
"mosn.io/mosn/test/lib/types"
24
lib.RegisterCreateClient("bolt", NewBoltClient)
27
// MockBoltClient use mosn xprotocol.bolt protocol and stream
28
type MockBoltClient struct {
30
config *BoltClientConfig
32
stats *types.ClientStats
36
connPool chan *BoltConn
39
func NewBoltClient(config interface{}) types.MockClient {
40
cfg, err := NewBoltClientConfig(config)
42
log.DefaultLogger.Errorf("new bolt client config error: %v", err)
48
return &MockBoltClient{
50
stats: types.NewClientStats(),
51
maxConnNum: cfg.MaxConn,
52
connPool: make(chan *BoltConn, cfg.MaxConn),
56
func (c *MockBoltClient) SyncCall() bool {
57
conn, err := c.getOrCreateConnection()
59
log.DefaultLogger.Errorf("get connection from pool error: %v", err)
63
c.releaseConnection(conn)
65
c.stats.Records().RecordRequest()
66
resp, err := conn.SyncSendRequest(c.config.Request)
69
case ErrClosedConnection:
70
c.stats.Records().RecordResponse(2)
71
case ErrRequestTimeout:
72
// TODO: support timeout verify
73
c.stats.Records().RecordResponse(3)
75
status = c.config.Verify.Verify(resp)
76
c.stats.Records().RecordResponse(resp.GetResponseStatus())
78
log.DefaultLogger.Errorf("unexpected error got: %v", err)
80
c.stats.Response(status)
85
func (c *MockBoltClient) AsyncCall() {
88
func (c *MockBoltClient) Stats() types.ClientStatsReadOnly {
92
// Close will close all the connections
93
func (c *MockBoltClient) Close() {
96
case conn := <-c.connPool:
98
c.releaseConnection(conn)
100
return // no more connections
106
// connpool implementation
107
func (c *MockBoltClient) getOrCreateConnection() (*BoltConn, error) {
109
case conn := <-c.connPool:
110
if !conn.IsClosed() {
113
// got a closed connection, try to make a new one
114
atomic.AddUint32(&c.curConnNum, ^uint32(0))
116
// try to make a new connection
118
// connection is full, wait connection
120
if atomic.LoadUint32(&c.curConnNum) >= c.maxConnNum {
121
return <-c.connPool, nil
123
conn, err := NewConn(c.config.TargetAddr, func() {
124
c.stats.CloseConnection()
129
atomic.AddUint32(&c.curConnNum, 1)
130
c.stats.ActiveConnection()
135
func (c *MockBoltClient) releaseConnection(conn *BoltConn) {
137
atomic.AddUint32(&c.curConnNum, ^uint32(0))
141
case c.connPool <- conn:
146
type BoltConn struct {
147
conn mtypes.ClientConnection
154
func NewConn(addr string, cb func()) (*BoltConn, error) {
155
remoteAddr, err := net.ResolveTCPAddr("tcp", addr)
160
stop: make(chan struct{}),
163
conn := network.NewClientConnection(time.Second, nil, remoteAddr, make(chan struct{}))
164
if err := conn.Connect(); err != nil {
167
conn.AddConnectionEventListener(hconn)
169
ctx := context.Background()
170
s := stream.NewStreamClient(ctx, bolt.ProtocolName, conn, nil)
172
return nil, errors.New("protocol not registered")
178
func (c *BoltConn) OnEvent(event api.ConnectionEvent) {
181
if c.closeCallback != nil {
187
func (c *BoltConn) Close() {
188
c.conn.Close(api.NoFlush, api.LocalClose)
191
func (c *BoltConn) IsClosed() bool {
200
func (c *BoltConn) ReqID() uint32 {
201
return atomic.AddUint32(&c.reqId, 1)
204
func (c *BoltConn) AsyncSendRequest(receiver *receiver, req *RequestConfig) {
205
headers, body := req.BuildRequest(receiver.requestId)
206
ctx := context.Background()
207
encoder := c.stream.NewStream(ctx, receiver)
208
encoder.AppendHeaders(ctx, headers, body == nil)
210
encoder.AppendData(ctx, body, true)
215
ErrClosedConnection = errors.New("send request on closed connection")
216
ErrRequestTimeout = errors.New("sync call timeout")
219
func (c *BoltConn) SyncSendRequest(req *RequestConfig) (*Response, error) {
222
return nil, ErrClosedConnection
224
ch := make(chan *Response)
225
r := newReceiver(c.ReqID(), ch)
226
c.AsyncSendRequest(r, req)
227
// set default timeout, if a timeout is configured, use it
228
timeout := 5 * time.Second
229
if req != nil && req.Timeout > 0 {
230
timeout = req.Timeout
232
timer := time.NewTimer(timeout)
238
return nil, ErrRequestTimeout
243
type receiver struct {
247
ch chan<- *Response // write only
250
func newReceiver(id uint32, ch chan<- *Response) *receiver {
259
func (r *receiver) OnReceive(ctx context.Context, headers api.HeaderMap, data buffer.IoBuffer, _ api.HeaderMap) {
260
r.data.Cost = time.Now().Sub(r.start)
261
cmd := headers.(api.XRespFrame)
262
resp := cmd.(*bolt.Response)
263
r.data.Header = resp.ResponseHeader
264
r.data.Header.RequestId = r.requestId
265
r.data.Content = data
269
func (r *receiver) OnDecodeError(context context.Context, err error, _ api.HeaderMap) {
271
r.data.Cost = time.Now().Sub(r.start)
272
r.data.Header = bolt.ResponseHeader{
273
Protocol: bolt.ProtocolCode,
274
CmdType: bolt.CmdTypeResponse,
275
CmdCode: bolt.CmdCodeRpcResponse,
276
Version: bolt.ProtocolVersion,
277
Codec: bolt.Hessian2Serialize,
278
RequestId: r.requestId,
279
ResponseStatus: bolt.ResponseStatusError,
281
r.data.Content = buffer.NewIoBufferString(err.Error())
285
type Response struct {
286
Header bolt.ResponseHeader
287
Content buffer.IoBuffer
291
func (r *Response) GetResponseStatus() int16 {
292
return int16(r.Header.ResponseStatus)