mosn

Форк
0
209 строк · 4.1 Кб
1
package boltv1
2

3
import (
4
	"context"
5
	"net"
6
	"sync"
7

8
	"mosn.io/mosn/pkg/log"
9
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
10
	mtypes "mosn.io/mosn/pkg/types"
11
	"mosn.io/mosn/test/lib"
12
	"mosn.io/mosn/test/lib/types"
13
	"mosn.io/pkg/buffer"
14
)
15

16
func init() {
17
	lib.RegisterCreateServer("bolt", NewBoltServer)
18
}
19

20
type MockBoltServer struct {
21
	mutex sync.Mutex
22
	stats *types.ServerStats
23
	mgr   *ConnectionManager
24
	// configs
25
	muxConfigs map[string]*ResponseConfig
26
	addr       string
27
	// running
28
	listener net.Listener
29
}
30

31
// TODO: make xprotocol server
32
func NewBoltServer(config interface{}) types.MockServer {
33
	c, err := NewBoltServerConfig(config)
34
	if err != nil {
35
		return nil
36
	}
37
	stats := types.NewServerStats()
38
	if len(c.Mux) == 0 { // use default config
39
		c.Mux = map[string]*ResponseConfig{
40
			".*": &ResponseConfig{
41
				Condition:     nil, //  always matched true
42
				CommonBuilder: DefaultSucessBuilder,
43
				ErrorBuilder:  DefaultErrorBuilder,
44
			},
45
		}
46
	}
47
	return &MockBoltServer{
48
		stats: stats,
49
		mgr: &ConnectionManager{
50
			connections: make(map[uint64]net.Conn),
51
			stats:       stats,
52
		},
53
		muxConfigs: c.Mux,
54
		addr:       c.Addr,
55
	}
56
}
57

58
func (s *MockBoltServer) Start() {
59
	s.mutex.Lock()
60
	defer s.mutex.Unlock()
61
	if s.listener != nil {
62
		return
63
	}
64
	ln, err := net.Listen("tcp", s.addr)
65
	if err != nil {
66
		log.DefaultLogger.Fatalf("listen %s failed", s.addr)
67
	}
68
	s.listener = ln
69
	go func() {
70
		for {
71
			conn, err := ln.Accept()
72
			if err != nil {
73
				if ne, ok := err.(net.Error); ok && ne.Temporary() {
74
					continue
75
				}
76
				return
77
			}
78
			go s.serveConn(conn)
79
		}
80
	}()
81

82
}
83

84
func (s *MockBoltServer) Stop() {
85
	s.mutex.Lock()
86
	defer s.mutex.Unlock()
87
	s.listener.Close()
88
	s.listener = nil
89
	s.mgr.Clean()
90
}
91

92
func (s *MockBoltServer) Stats() types.ServerStatsReadOnly {
93
	return s.stats
94
}
95

96
// TODO: mux support more protocol to make x protocol server
97
func (s *MockBoltServer) mux(req *bolt.Request) (result *ResponseConfig) {
98
	if resp, ok := s.muxConfigs[".*"]; ok {
99
		result = resp // if .* exists, use it as default
100
	}
101
	v, ok := req.Get(mtypes.RPCRouteMatchKey)
102
	if !ok {
103
		return
104
	}
105
	resp, ok := s.muxConfigs[v]
106
	if !ok {
107
		return
108
	}
109
	return resp
110

111
}
112

113
// TODO: HandleRequest support more protocol to make x protocol server
114
func (s *MockBoltServer) handle(buf buffer.IoBuffer) *ResponseToWrite {
115
	ctx := context.Background()
116
	engine := (&bolt.XCodec{}).NewXProtocol(ctx)
117
	cmd, err := engine.Decode(ctx, buf)
118
	if cmd == nil || err != nil {
119
		return nil
120
	}
121
	req, ok := cmd.(*bolt.Request)
122
	if !ok {
123
		return nil
124
	}
125
	// select mux
126
	respconfig := s.mux(req)
127
	resp, status := respconfig.HandleRequest(req, engine)
128
	if resp != nil {
129
		iobuf, err := engine.Encode(ctx, resp)
130
		if err != nil {
131
			log.DefaultLogger.Errorf("engine encode error: %v", err)
132
			return nil
133
		}
134
		return &ResponseToWrite{
135
			StatusCode: status,
136
			Body:       iobuf.Bytes(),
137
		}
138
	}
139
	return nil
140
}
141

142
func (s *MockBoltServer) serveConn(conn net.Conn) {
143
	id := s.mgr.Add(conn)
144
	defer func() {
145
		s.mgr.Delete(id)
146
	}()
147
	iobuf := buffer.NewIoBuffer(10240)
148
	for {
149
		buf := make([]byte, 1024)
150
		n, err := conn.Read(buf)
151
		if err != nil {
152
			if err, ok := err.(net.Error); ok && err.Temporary() {
153
				continue
154
			}
155
			return
156
		}
157
		if n > 0 {
158
			iobuf.Write(buf[:n])
159
			for iobuf.Len() > 0 {
160
				resp := s.handle(iobuf)
161
				if resp == nil {
162
					break
163
				}
164
				s.stats.Records().RecordRequest()
165
				conn.Write(resp.Body)
166
				s.stats.Records().RecordResponse(resp.StatusCode)
167
			}
168
		}
169

170
	}
171
}
172

173
type ConnectionManager struct {
174
	mutex       sync.Mutex
175
	index       uint64
176
	connections map[uint64]net.Conn
177
	stats       *types.ServerStats
178
}
179

180
func (mgr *ConnectionManager) Add(conn net.Conn) uint64 {
181
	mgr.mutex.Lock()
182
	defer mgr.mutex.Unlock()
183
	mgr.index++
184
	mgr.connections[mgr.index] = conn
185
	mgr.stats.ActiveConnection()
186
	return mgr.index
187
}
188

189
func (mgr *ConnectionManager) Delete(id uint64) {
190
	mgr.mutex.Lock()
191
	defer mgr.mutex.Unlock()
192
	delete(mgr.connections, id)
193
	mgr.stats.CloseConnection()
194
}
195

196
func (mgr *ConnectionManager) Clean() {
197
	mgr.mutex.Lock()
198
	defer mgr.mutex.Unlock()
199
	for id, conn := range mgr.connections {
200
		conn.Close()
201
		delete(mgr.connections, id)
202
		mgr.stats.CloseConnection()
203
	}
204
}
205

206
type ResponseToWrite struct {
207
	StatusCode int16
208
	Body       []byte
209
}
210

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.