9
"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
10
mtypes "mosn.io/mosn/pkg/types"
11
"mosn.io/mosn/test/lib"
12
"mosn.io/mosn/test/lib/types"
17
lib.RegisterCreateServer("bolt", NewBoltServer)
20
type MockBoltServer struct {
22
stats *types.ServerStats
23
mgr *ConnectionManager
25
muxConfigs map[string]*ResponseConfig
31
// TODO: make xprotocol server
32
func NewBoltServer(config interface{}) types.MockServer {
33
c, err := NewBoltServerConfig(config)
37
stats := types.NewServerStats()
38
if len(c.Mux) == 0 { // use default config
39
c.Mux = map[string]*ResponseConfig{
40
".*": &ResponseConfig{
41
Condition: nil, // always matched true
42
CommonBuilder: DefaultSucessBuilder,
43
ErrorBuilder: DefaultErrorBuilder,
47
return &MockBoltServer{
49
mgr: &ConnectionManager{
50
connections: make(map[uint64]net.Conn),
58
func (s *MockBoltServer) Start() {
60
defer s.mutex.Unlock()
61
if s.listener != nil {
64
ln, err := net.Listen("tcp", s.addr)
66
log.DefaultLogger.Fatalf("listen %s failed", s.addr)
71
conn, err := ln.Accept()
73
if ne, ok := err.(net.Error); ok && ne.Temporary() {
84
func (s *MockBoltServer) Stop() {
86
defer s.mutex.Unlock()
92
func (s *MockBoltServer) Stats() types.ServerStatsReadOnly {
96
// TODO: mux support more protocol to make x protocol server
97
func (s *MockBoltServer) mux(req *bolt.Request) (result *ResponseConfig) {
98
if resp, ok := s.muxConfigs[".*"]; ok {
99
result = resp // if .* exists, use it as default
101
v, ok := req.Get(mtypes.RPCRouteMatchKey)
105
resp, ok := s.muxConfigs[v]
113
// TODO: HandleRequest support more protocol to make x protocol server
114
func (s *MockBoltServer) handle(buf buffer.IoBuffer) *ResponseToWrite {
115
ctx := context.Background()
116
engine := (&bolt.XCodec{}).NewXProtocol(ctx)
117
cmd, err := engine.Decode(ctx, buf)
118
if cmd == nil || err != nil {
121
req, ok := cmd.(*bolt.Request)
126
respconfig := s.mux(req)
127
resp, status := respconfig.HandleRequest(req, engine)
129
iobuf, err := engine.Encode(ctx, resp)
131
log.DefaultLogger.Errorf("engine encode error: %v", err)
134
return &ResponseToWrite{
142
func (s *MockBoltServer) serveConn(conn net.Conn) {
143
id := s.mgr.Add(conn)
147
iobuf := buffer.NewIoBuffer(10240)
149
buf := make([]byte, 1024)
150
n, err := conn.Read(buf)
152
if err, ok := err.(net.Error); ok && err.Temporary() {
159
for iobuf.Len() > 0 {
160
resp := s.handle(iobuf)
164
s.stats.Records().RecordRequest()
165
conn.Write(resp.Body)
166
s.stats.Records().RecordResponse(resp.StatusCode)
173
type ConnectionManager struct {
176
connections map[uint64]net.Conn
177
stats *types.ServerStats
180
func (mgr *ConnectionManager) Add(conn net.Conn) uint64 {
182
defer mgr.mutex.Unlock()
184
mgr.connections[mgr.index] = conn
185
mgr.stats.ActiveConnection()
189
func (mgr *ConnectionManager) Delete(id uint64) {
191
defer mgr.mutex.Unlock()
192
delete(mgr.connections, id)
193
mgr.stats.CloseConnection()
196
func (mgr *ConnectionManager) Clean() {
198
defer mgr.mutex.Unlock()
199
for id, conn := range mgr.connections {
201
delete(mgr.connections, id)
202
mgr.stats.CloseConnection()
206
type ResponseToWrite struct {