13
"mosn.io/mosn/pkg/log"
14
"mosn.io/mosn/pkg/network"
15
"mosn.io/mosn/pkg/protocol"
16
mosnhttp "mosn.io/mosn/pkg/protocol/http"
17
mosnhttp2 "mosn.io/mosn/pkg/protocol/http2"
18
"mosn.io/mosn/pkg/stream"
19
_ "mosn.io/mosn/pkg/stream/http" // register http1
20
_ "mosn.io/mosn/pkg/stream/http2" // register http2
21
mtypes "mosn.io/mosn/pkg/types"
22
"mosn.io/mosn/test/lib"
23
"mosn.io/mosn/test/lib/types"
24
"mosn.io/mosn/test/lib/utils"
26
"mosn.io/pkg/variable"
30
lib.RegisterCreateClient("Http1", NewHttpClient)
33
// MockHttpClient use mosn http protocol and stream
34
// control metrics and connection
35
type MockHttpClient struct {
37
config *HttpClientConfig
39
stats *types.ClientStats
41
protocolName api.ProtocolName
44
connPool chan *HttpConn
47
func NewHttpClient(config interface{}) types.MockClient {
48
cfg, err := NewHttpClientConfig(config)
50
log.DefaultLogger.Errorf("new http client config error: %v", err)
56
if cfg.ProtocolName == "" {
57
cfg.ProtocolName = "Http1"
59
return &MockHttpClient{
61
stats: types.NewClientStats(),
62
protocolName: api.ProtocolName(cfg.ProtocolName),
63
maxConnNum: cfg.MaxConn,
64
connPool: make(chan *HttpConn, cfg.MaxConn),
68
func (c *MockHttpClient) SyncCall() bool {
69
conn, err := c.getOrCreateConnection()
71
log.DefaultLogger.Errorf("get connection from pool error: %v", err)
75
c.releaseConnection(conn)
77
c.stats.Records().RecordRequest()
78
resp, err := conn.SyncSendRequest(c.config.Request, c.protocolName)
81
case ErrClosedConnection:
82
case ErrRequestTimeout:
83
// TODO: support timeout verify
85
status = c.config.Verify.Verify(resp)
86
c.stats.Records().RecordResponse(int16(resp.StatusCode))
88
log.DefaultLogger.Errorf("unexpected error got: %v", err)
90
c.stats.Response(status)
95
func (c *MockHttpClient) AsyncCall() {
98
func (c *MockHttpClient) Stats() types.ClientStatsReadOnly {
102
// Close will close all the connections
103
func (c *MockHttpClient) Close() {
106
case conn := <-c.connPool:
108
c.releaseConnection(conn)
110
return // no more connections
116
// connpool implementation
117
func (c *MockHttpClient) getOrCreateConnection() (*HttpConn, error) {
119
case conn := <-c.connPool:
120
if !conn.IsClosed() {
123
// got a closed connection, try to make a new one
124
atomic.AddUint32(&c.curConnNum, ^uint32(0))
126
// try to make a new connection
128
// connection is full, wait connection
130
if atomic.LoadUint32(&c.curConnNum) >= c.maxConnNum {
131
return <-c.connPool, nil
133
conn, err := NewConn(c.config.TargetAddr, c.protocolName, func() {
134
c.stats.CloseConnection()
139
atomic.AddUint32(&c.curConnNum, 1)
140
c.stats.ActiveConnection()
145
func (c *MockHttpClient) releaseConnection(conn *HttpConn) {
147
atomic.AddUint32(&c.curConnNum, ^uint32(0))
151
case c.connPool <- conn:
156
type Response struct {
158
Header map[string][]string
163
type HttpConn struct {
164
conn mtypes.ClientConnection
170
func NewConn(addr string, pname api.ProtocolName, cb func()) (*HttpConn, error) {
172
var remoteAddr net.Addr
174
if remoteAddr, err = net.ResolveTCPAddr("tcp", addr); err != nil {
175
remoteAddr, err = net.ResolveUnixAddr("unix", addr)
182
stop: make(chan struct{}),
185
conn := network.NewClientConnection(time.Second, nil, remoteAddr, make(chan struct{}))
186
conn.AddConnectionEventListener(hconn)
188
s := stream.NewStreamClient(context.Background(), pname, conn, nil)
190
return nil, fmt.Errorf("protocol %s not registered", pname)
192
if err := conn.Connect(); err != nil {
199
func (c *HttpConn) OnEvent(event api.ConnectionEvent) {
202
if c.closeCallback != nil {
208
func (c *HttpConn) Close() {
209
c.conn.Close(api.NoFlush, api.LocalClose)
212
func (c *HttpConn) IsClosed() bool {
221
func (c *HttpConn) AsyncSendRequest(receiver mtypes.StreamReceiveListener, req *RequestConfig) {
222
ctx := variable.NewVariableContext(context.Background())
223
headers, body := req.BuildRequest(ctx)
224
encoder := c.stream.NewStream(ctx, receiver)
225
encoder.AppendHeaders(ctx, headers, body == nil)
228
encoder.AppendData(ctx, body, true)
233
ErrClosedConnection = errors.New("send request on closed connection")
234
ErrRequestTimeout = errors.New("sync call timeout")
237
func (c *HttpConn) SyncSendRequest(req *RequestConfig, proto api.ProtocolName) (*Response, error) {
240
return nil, ErrClosedConnection
242
ch := make(chan *Response)
243
r := newReceiver(ch, proto)
244
c.AsyncSendRequest(r, req)
245
// set default timeout, if a timeout is configured, use it
246
timeout := 5 * time.Second
247
if req != nil && req.Timeout > 0 {
248
timeout = req.Timeout
250
timer := time.NewTimer(timeout)
256
return nil, ErrRequestTimeout
261
type receiver struct {
262
proto api.ProtocolName
265
ch chan<- *Response // write only
268
func newReceiver(ch chan<- *Response, proto api.ProtocolName) *receiver {
277
func (r *receiver) OnReceive(ctx context.Context, headers api.HeaderMap, data buffer.IoBuffer, _ api.HeaderMap) {
278
r.data.Cost = time.Now().Sub(r.start)
281
cmd := headers.(mosnhttp.ResponseHeader).ResponseHeader
282
r.data.Header = utils.ReadFasthttpResponseHeaders(cmd)
283
r.data.StatusCode = cmd.StatusCode()
285
cmd := headers.(*mosnhttp2.RspHeader)
286
r.data.Header = mosnhttp2.EncodeHeader(cmd.HeaderMap)
287
r.data.StatusCode = cmd.Rsp.StatusCode
290
r.data.Body = data.Bytes()
295
func (r *receiver) OnDecodeError(context context.Context, err error, _ api.HeaderMap) {
296
r.data.Cost = time.Now().Sub(r.start)
297
r.data.Cost = time.Now().Sub(r.start)
298
r.data.StatusCode = http.StatusInternalServerError
299
r.data.Header = map[string][]string{
300
"X-Mosn-Error": []string{