mosn

Форк
0
/
client.go 
305 строк · 6.9 Кб
1
package http
2

3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"net"
8
	"net/http"
9
	"sync/atomic"
10
	"time"
11

12
	"mosn.io/api"
13
	"mosn.io/mosn/pkg/log"
14
	"mosn.io/mosn/pkg/network"
15
	"mosn.io/mosn/pkg/protocol"
16
	mosnhttp "mosn.io/mosn/pkg/protocol/http"
17
	mosnhttp2 "mosn.io/mosn/pkg/protocol/http2"
18
	"mosn.io/mosn/pkg/stream"
19
	_ "mosn.io/mosn/pkg/stream/http"  // register http1
20
	_ "mosn.io/mosn/pkg/stream/http2" // register http2
21
	mtypes "mosn.io/mosn/pkg/types"
22
	"mosn.io/mosn/test/lib"
23
	"mosn.io/mosn/test/lib/types"
24
	"mosn.io/mosn/test/lib/utils"
25
	"mosn.io/pkg/buffer"
26
	"mosn.io/pkg/variable"
27
)
28

29
func init() {
30
	lib.RegisterCreateClient("Http1", NewHttpClient)
31
}
32

33
// MockHttpClient use mosn http protocol and stream
34
// control metrics and connection
35
type MockHttpClient struct {
36
	// config
37
	config *HttpClientConfig
38
	// stats
39
	stats *types.ClientStats
40
	// connection pool
41
	protocolName api.ProtocolName
42
	curConnNum   uint32
43
	maxConnNum   uint32
44
	connPool     chan *HttpConn
45
}
46

47
func NewHttpClient(config interface{}) types.MockClient {
48
	cfg, err := NewHttpClientConfig(config)
49
	if err != nil {
50
		log.DefaultLogger.Errorf("new http client config error: %v", err)
51
		return nil
52
	}
53
	if cfg.MaxConn == 0 {
54
		cfg.MaxConn = 1
55
	}
56
	if cfg.ProtocolName == "" {
57
		cfg.ProtocolName = "Http1"
58
	}
59
	return &MockHttpClient{
60
		config:       cfg,
61
		stats:        types.NewClientStats(),
62
		protocolName: api.ProtocolName(cfg.ProtocolName),
63
		maxConnNum:   cfg.MaxConn,
64
		connPool:     make(chan *HttpConn, cfg.MaxConn),
65
	}
66
}
67

68
func (c *MockHttpClient) SyncCall() bool {
69
	conn, err := c.getOrCreateConnection()
70
	if err != nil {
71
		log.DefaultLogger.Errorf("get connection from pool error: %v", err)
72
		return false
73
	}
74
	defer func() {
75
		c.releaseConnection(conn)
76
	}()
77
	c.stats.Records().RecordRequest()
78
	resp, err := conn.SyncSendRequest(c.config.Request, c.protocolName)
79
	status := false
80
	switch err {
81
	case ErrClosedConnection:
82
	case ErrRequestTimeout:
83
		// TODO: support timeout verify
84
	case nil:
85
		status = c.config.Verify.Verify(resp)
86
		c.stats.Records().RecordResponse(int16(resp.StatusCode))
87
	default:
88
		log.DefaultLogger.Errorf("unexpected error got: %v", err)
89
	}
90
	c.stats.Response(status)
91
	return status
92
}
93

94
// TODO: implement it
95
func (c *MockHttpClient) AsyncCall() {
96
}
97

98
func (c *MockHttpClient) Stats() types.ClientStatsReadOnly {
99
	return c.stats
100
}
101

102
// Close will close all the connections
103
func (c *MockHttpClient) Close() {
104
	for {
105
		select {
106
		case conn := <-c.connPool:
107
			conn.Close()
108
			c.releaseConnection(conn)
109
		default:
110
			return // no more connections
111
		}
112
	}
113

114
}
115

116
// connpool implementation
117
func (c *MockHttpClient) getOrCreateConnection() (*HttpConn, error) {
118
	select {
119
	case conn := <-c.connPool:
120
		if !conn.IsClosed() {
121
			return conn, nil
122
		}
123
		// got a closed connection, try to make a new one
124
		atomic.AddUint32(&c.curConnNum, ^uint32(0))
125
	default:
126
		// try to make a new connection
127
	}
128
	// connection is full, wait connection
129
	// TODO: add timeout
130
	if atomic.LoadUint32(&c.curConnNum) >= c.maxConnNum {
131
		return <-c.connPool, nil
132
	}
133
	conn, err := NewConn(c.config.TargetAddr, c.protocolName, func() {
134
		c.stats.CloseConnection()
135
	})
136
	if err != nil {
137
		return nil, err
138
	}
139
	atomic.AddUint32(&c.curConnNum, 1)
140
	c.stats.ActiveConnection()
141
	return conn, nil
142

143
}
144

145
func (c *MockHttpClient) releaseConnection(conn *HttpConn) {
146
	if conn.IsClosed() {
147
		atomic.AddUint32(&c.curConnNum, ^uint32(0))
148
		return
149
	}
150
	select {
151
	case c.connPool <- conn:
152
	default:
153
	}
154
}
155

156
type Response struct {
157
	StatusCode int
158
	Header     map[string][]string
159
	Body       []byte
160
	Cost       time.Duration
161
}
162

163
type HttpConn struct {
164
	conn          mtypes.ClientConnection
165
	stream        stream.Client
166
	stop          chan struct{}
167
	closeCallback func()
168
}
169

170
func NewConn(addr string, pname api.ProtocolName, cb func()) (*HttpConn, error) {
171

172
	var remoteAddr net.Addr
173
	var err error
174
	if remoteAddr, err = net.ResolveTCPAddr("tcp", addr); err != nil {
175
		remoteAddr, err = net.ResolveUnixAddr("unix", addr)
176
	}
177

178
	if err != nil {
179
		return nil, err
180
	}
181
	hconn := &HttpConn{
182
		stop:          make(chan struct{}),
183
		closeCallback: cb,
184
	}
185
	conn := network.NewClientConnection(time.Second, nil, remoteAddr, make(chan struct{}))
186
	conn.AddConnectionEventListener(hconn)
187
	hconn.conn = conn
188
	s := stream.NewStreamClient(context.Background(), pname, conn, nil)
189
	if s == nil {
190
		return nil, fmt.Errorf("protocol %s not registered", pname)
191
	}
192
	if err := conn.Connect(); err != nil {
193
		return nil, err
194
	}
195
	hconn.stream = s
196
	return hconn, nil
197
}
198

199
func (c *HttpConn) OnEvent(event api.ConnectionEvent) {
200
	if event.IsClose() {
201
		close(c.stop)
202
		if c.closeCallback != nil {
203
			c.closeCallback()
204
		}
205
	}
206
}
207

208
func (c *HttpConn) Close() {
209
	c.conn.Close(api.NoFlush, api.LocalClose)
210
}
211

212
func (c *HttpConn) IsClosed() bool {
213
	select {
214
	case <-c.stop:
215
		return true
216
	default:
217
		return false
218
	}
219
}
220

221
func (c *HttpConn) AsyncSendRequest(receiver mtypes.StreamReceiveListener, req *RequestConfig) {
222
	ctx := variable.NewVariableContext(context.Background())
223
	headers, body := req.BuildRequest(ctx)
224
	encoder := c.stream.NewStream(ctx, receiver)
225
	encoder.AppendHeaders(ctx, headers, body == nil)
226

227
	if body != nil {
228
		encoder.AppendData(ctx, body, true)
229
	}
230
}
231

232
var (
233
	ErrClosedConnection = errors.New("send request on closed connection")
234
	ErrRequestTimeout   = errors.New("sync call timeout")
235
)
236

237
func (c *HttpConn) SyncSendRequest(req *RequestConfig, proto api.ProtocolName) (*Response, error) {
238
	select {
239
	case <-c.stop:
240
		return nil, ErrClosedConnection
241
	default:
242
		ch := make(chan *Response)
243
		r := newReceiver(ch, proto)
244
		c.AsyncSendRequest(r, req)
245
		// set default timeout, if a timeout is configured, use it
246
		timeout := 5 * time.Second
247
		if req != nil && req.Timeout > 0 {
248
			timeout = req.Timeout
249
		}
250
		timer := time.NewTimer(timeout)
251
		select {
252
		case resp := <-ch:
253
			timer.Stop()
254
			return resp, nil
255
		case <-timer.C:
256
			return nil, ErrRequestTimeout
257
		}
258
	}
259
}
260

261
type receiver struct {
262
	proto api.ProtocolName
263
	data  *Response
264
	start time.Time
265
	ch    chan<- *Response // write only
266
}
267

268
func newReceiver(ch chan<- *Response, proto api.ProtocolName) *receiver {
269
	return &receiver{
270
		proto: proto,
271
		data:  &Response{},
272
		start: time.Now(),
273
		ch:    ch,
274
	}
275
}
276

277
func (r *receiver) OnReceive(ctx context.Context, headers api.HeaderMap, data buffer.IoBuffer, _ api.HeaderMap) {
278
	r.data.Cost = time.Now().Sub(r.start)
279
	switch r.proto {
280
	case protocol.HTTP1:
281
		cmd := headers.(mosnhttp.ResponseHeader).ResponseHeader
282
		r.data.Header = utils.ReadFasthttpResponseHeaders(cmd)
283
		r.data.StatusCode = cmd.StatusCode()
284
	case protocol.HTTP2:
285
		cmd := headers.(*mosnhttp2.RspHeader)
286
		r.data.Header = mosnhttp2.EncodeHeader(cmd.HeaderMap)
287
		r.data.StatusCode = cmd.Rsp.StatusCode
288
	}
289
	if data != nil {
290
		r.data.Body = data.Bytes()
291
	}
292
	r.ch <- r.data
293
}
294

295
func (r *receiver) OnDecodeError(context context.Context, err error, _ api.HeaderMap) {
296
	r.data.Cost = time.Now().Sub(r.start)
297
	r.data.Cost = time.Now().Sub(r.start)
298
	r.data.StatusCode = http.StatusInternalServerError
299
	r.data.Header = map[string][]string{
300
		"X-Mosn-Error": []string{
301
			err.Error(),
302
		},
303
	}
304
	r.ch <- r.data
305
}
306

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.