mosn

Форк
0
296 строк · 6.7 Кб
1
package boltv2
2

3
import (
4
	"context"
5
	"errors"
6
	"net"
7
	"sync/atomic"
8
	"time"
9

10
	"mosn.io/api"
11
	"mosn.io/mosn/pkg/log"
12
	"mosn.io/mosn/pkg/network"
13
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
14
	"mosn.io/mosn/pkg/protocol/xprotocol/boltv2"
15
	"mosn.io/mosn/pkg/stream"
16
	_ "mosn.io/mosn/pkg/stream/xprotocol" // register xprotocol
17
	mtypes "mosn.io/mosn/pkg/types"
18
	"mosn.io/mosn/test/lib"
19
	"mosn.io/mosn/test/lib/types"
20
	"mosn.io/pkg/buffer"
21
)
22

23
func init() {
24
	lib.RegisterCreateClient("boltv2", NewBoltClient)
25
}
26

27
// MockBoltClient use mosn xprotocol.bolt protocol and stream
28
type MockBoltClient struct {
29
	// config
30
	config *BoltClientConfig
31
	// stats
32
	stats *types.ClientStats
33
	// connection pool
34
	curConnNum uint32
35
	maxConnNum uint32
36
	connPool   chan *BoltConn
37
}
38

39
func NewBoltClient(config interface{}) types.MockClient {
40
	cfg, err := NewBoltClientConfig(config)
41
	if err != nil {
42
		log.DefaultLogger.Errorf("new bolt client config error: %v", err)
43
		return nil
44
	}
45
	if cfg.MaxConn == 0 {
46
		cfg.MaxConn = 1
47
	}
48
	return &MockBoltClient{
49
		config:     cfg,
50
		stats:      types.NewClientStats(),
51
		maxConnNum: cfg.MaxConn,
52
		connPool:   make(chan *BoltConn, cfg.MaxConn),
53
	}
54
}
55

56
func (c *MockBoltClient) SyncCall() bool {
57
	conn, err := c.getOrCreateConnection()
58
	if err != nil {
59
		log.DefaultLogger.Errorf("get connection from pool error: %v", err)
60
		return false
61
	}
62
	defer func() {
63
		c.releaseConnection(conn)
64
	}()
65
	c.stats.Records().RecordRequest()
66
	resp, err := conn.SyncSendRequest(c.config.Request)
67
	status := false
68
	switch err {
69
	case ErrClosedConnection:
70
		c.stats.Records().RecordResponse(2)
71
	case ErrRequestTimeout:
72
		// TODO: support timeout verify
73
		c.stats.Records().RecordResponse(3)
74
	case nil:
75
		status = c.config.Verify.Verify(resp)
76
		c.stats.Records().RecordResponse(resp.GetResponseStatus())
77
	default:
78
		log.DefaultLogger.Errorf("unexpected error got: %v", err)
79
	}
80
	c.stats.Response(status)
81
	return status
82
}
83

84
// TODO: implement it
85
func (c *MockBoltClient) AsyncCall() {
86
}
87

88
func (c *MockBoltClient) Stats() types.ClientStatsReadOnly {
89
	return c.stats
90
}
91

92
// Close will close all the connections
93
func (c *MockBoltClient) Close() {
94
	for {
95
		select {
96
		case conn := <-c.connPool:
97
			conn.Close()
98
			c.releaseConnection(conn)
99
		default:
100
			return // no more connections
101
		}
102
	}
103

104
}
105

106
// connpool implementation
107
func (c *MockBoltClient) getOrCreateConnection() (*BoltConn, error) {
108
	select {
109
	case conn := <-c.connPool:
110
		if !conn.IsClosed() {
111
			return conn, nil
112
		}
113
		// got a closed connection, try to make a new one
114
		atomic.AddUint32(&c.curConnNum, ^uint32(0))
115
	default:
116
		// try to make a new connection
117
	}
118
	// connection is full, wait connection
119
	// TODO: add timeout
120
	if atomic.LoadUint32(&c.curConnNum) >= c.maxConnNum {
121
		return <-c.connPool, nil
122
	}
123
	conn, err := NewConn(c.config.TargetAddr, func() {
124
		c.stats.CloseConnection()
125
	})
126
	if err != nil {
127
		return nil, err
128
	}
129
	atomic.AddUint32(&c.curConnNum, 1)
130
	c.stats.ActiveConnection()
131
	return conn, nil
132

133
}
134

135
func (c *MockBoltClient) releaseConnection(conn *BoltConn) {
136
	if conn.IsClosed() {
137
		atomic.AddUint32(&c.curConnNum, ^uint32(0))
138
		return
139
	}
140
	select {
141
	case c.connPool <- conn:
142
	default:
143
	}
144
}
145

146
type BoltConn struct {
147
	conn          mtypes.ClientConnection
148
	stream        stream.Client
149
	stop          chan struct{}
150
	closeCallback func()
151
	reqId         uint32
152
}
153

154
func NewConn(addr string, cb func()) (*BoltConn, error) {
155
	remoteAddr, err := net.ResolveTCPAddr("tcp", addr)
156
	if err != nil {
157
		return nil, err
158
	}
159
	hconn := &BoltConn{
160
		stop:          make(chan struct{}),
161
		closeCallback: cb,
162
	}
163
	conn := network.NewClientConnection(time.Second, nil, remoteAddr, make(chan struct{}))
164
	if err := conn.Connect(); err != nil {
165
		return nil, err
166
	}
167
	conn.AddConnectionEventListener(hconn)
168
	hconn.conn = conn
169
	ctx := context.Background()
170
	s := stream.NewStreamClient(ctx, boltv2.ProtocolName, conn, nil)
171
	if s == nil {
172
		return nil, errors.New("protocol not registered")
173
	}
174
	hconn.stream = s
175
	return hconn, nil
176
}
177

178
func (c *BoltConn) OnEvent(event api.ConnectionEvent) {
179
	if event.IsClose() {
180
		close(c.stop)
181
		if c.closeCallback != nil {
182
			c.closeCallback()
183
		}
184
	}
185
}
186

187
func (c *BoltConn) Close() {
188
	c.conn.Close(api.NoFlush, api.LocalClose)
189
}
190

191
func (c *BoltConn) IsClosed() bool {
192
	select {
193
	case <-c.stop:
194
		return true
195
	default:
196
		return false
197
	}
198
}
199

200
func (c *BoltConn) ReqID() uint32 {
201
	return atomic.AddUint32(&c.reqId, 1)
202
}
203

204
func (c *BoltConn) AsyncSendRequest(receiver *receiver, req *RequestConfig) {
205
	headers, body := req.BuildRequest(receiver.requestId)
206
	ctx := context.Background()
207
	encoder := c.stream.NewStream(ctx, receiver)
208
	encoder.AppendHeaders(ctx, headers, body == nil)
209
	if body != nil {
210
		encoder.AppendData(ctx, body, true)
211
	}
212
}
213

214
var (
215
	ErrClosedConnection = errors.New("send request on closed connection")
216
	ErrRequestTimeout   = errors.New("sync call timeout")
217
)
218

219
func (c *BoltConn) SyncSendRequest(req *RequestConfig) (*Response, error) {
220
	select {
221
	case <-c.stop:
222
		return nil, ErrClosedConnection
223
	default:
224
		ch := make(chan *Response)
225
		r := newReceiver(c.ReqID(), ch)
226
		c.AsyncSendRequest(r, req)
227
		// set default timeout, if a timeout is configured, use it
228
		timeout := 5 * time.Second
229
		if req != nil && req.Timeout > 0 {
230
			timeout = req.Timeout
231
		}
232
		timer := time.NewTimer(timeout)
233
		select {
234
		case resp := <-ch:
235
			timer.Stop()
236
			return resp, nil
237
		case <-timer.C:
238
			return nil, ErrRequestTimeout
239
		}
240
	}
241
}
242

243
type receiver struct {
244
	requestId uint32
245
	data      *Response
246
	start     time.Time
247
	ch        chan<- *Response // write only
248
}
249

250
func newReceiver(id uint32, ch chan<- *Response) *receiver {
251
	return &receiver{
252
		requestId: id,
253
		data:      &Response{},
254
		start:     time.Now(),
255
		ch:        ch,
256
	}
257
}
258

259
func (r *receiver) OnReceive(ctx context.Context, headers api.HeaderMap, data buffer.IoBuffer, _ api.HeaderMap) {
260
	r.data.Cost = time.Now().Sub(r.start)
261
	cmd := headers.(api.XRespFrame)
262
	resp := cmd.(*boltv2.Response)
263
	r.data.Header = resp.ResponseHeader
264
	r.data.Header.RequestId = r.requestId
265
	r.data.Content = data
266
	r.ch <- r.data
267
}
268

269
func (r *receiver) OnDecodeError(context context.Context, err error, _ api.HeaderMap) {
270
	// build an error
271
	r.data.Cost = time.Now().Sub(r.start)
272
	r.data.Header = boltv2.ResponseHeader{
273
		Version1: boltv2.ProtocolVersion1,
274
		ResponseHeader: bolt.ResponseHeader{
275
			Protocol:       boltv2.ProtocolCode,
276
			CmdType:        bolt.CmdTypeResponse,
277
			CmdCode:        bolt.CmdCodeRpcResponse,
278
			Version:        boltv2.ProtocolVersion,
279
			Codec:          bolt.Hessian2Serialize,
280
			RequestId:      r.requestId,
281
			ResponseStatus: bolt.ResponseStatusError,
282
		},
283
	}
284
	r.data.Content = buffer.NewIoBufferString(err.Error())
285
	r.ch <- r.data
286
}
287

288
type Response struct {
289
	Header  boltv2.ResponseHeader
290
	Content buffer.IoBuffer
291
	Cost    time.Duration
292
}
293

294
func (r *Response) GetResponseStatus() int16 {
295
	return int16(r.Header.ResponseStatus)
296
}
297

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.