mosn

Форк
0
/
retry_test.go 
338 строк · 9.2 Кб
1
package integrate
2

3
import (
4
	"context"
5
	"fmt"
6
	"net"
7
	"net/http"
8
	"strings"
9
	"testing"
10
	"time"
11

12
	"mosn.io/mosn/pkg/protocol"
13
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
14
	"mosn.io/mosn/pkg/types"
15
	"mosn.io/mosn/test/util"
16
	"mosn.io/mosn/test/util/mosn"
17
)
18

19
type RetryCase struct {
20
	*TestCase
21
	GoodServer util.UpstreamServer
22
	BadServer  util.UpstreamServer
23
	BadIsClose bool
24
}
25

26
func NewRetryCase(t *testing.T, serverProto, meshProto types.ProtocolName, isClose bool) *RetryCase {
27
	app1 := "127.0.0.1:8080"
28
	app2 := "127.0.0.1:8081"
29
	var good, bad util.UpstreamServer
30
	switch serverProto {
31
	case protocol.HTTP1:
32
		good = util.NewHTTPServer(t, &PathHTTPHandler{})
33
		bad = util.NewHTTPServer(t, &BadHTTPHandler{})
34
	case protocol.HTTP2:
35
		good = util.NewUpstreamHTTP2(t, app1, &PathHTTPHandler{})
36
		bad = util.NewUpstreamHTTP2(t, app2, &BadHTTPHandler{})
37
	}
38
	tc := NewTestCase(t, serverProto, meshProto, util.NewRPCServer(t, "", bolt.ProtocolName)) // Empty RPC server for get rpc client
39
	return &RetryCase{
40
		TestCase:   tc,
41
		GoodServer: good,
42
		BadServer:  bad,
43
		BadIsClose: isClose,
44
	}
45
}
46
func (c *RetryCase) StartProxy() {
47
	c.GoodServer.GoServe()
48
	c.BadServer.GoServe()
49
	app1 := c.GoodServer.Addr()
50
	app2 := c.BadServer.Addr()
51
	if c.BadIsClose {
52
		c.BadServer.Close()
53
	}
54
	clientMeshAddr := util.CurrentMeshAddr()
55
	c.ClientMeshAddr = clientMeshAddr
56
	cfg := util.CreateProxyMesh(clientMeshAddr, []string{app1, app2}, c.AppProtocol)
57
	mesh := mosn.NewMosn(cfg)
58
	go mesh.Start()
59
	go func() {
60
		<-c.Finish
61
		c.GoodServer.Close()
62
		if !c.BadIsClose {
63
			c.BadServer.Close()
64
		}
65
		mesh.Close()
66
		c.Finish <- true
67
	}()
68
	time.Sleep(5 * time.Second) //wait server and mesh start
69

70
}
71

72
func (c *RetryCase) Start(tls bool) {
73
	c.GoodServer.GoServe()
74
	c.BadServer.GoServe()
75
	app1 := c.GoodServer.Addr()
76
	app2 := c.BadServer.Addr()
77
	if c.BadIsClose {
78
		c.BadServer.Close()
79
	}
80
	clientMeshAddr := util.CurrentMeshAddr()
81
	c.ClientMeshAddr = clientMeshAddr
82
	serverMeshAddr := util.CurrentMeshAddr()
83
	cfg := util.CreateMeshToMeshConfig(clientMeshAddr, serverMeshAddr, c.AppProtocol, c.MeshProtocol, []string{app1, app2}, tls)
84
	mesh := mosn.NewMosn(cfg)
85
	go mesh.Start()
86
	go func() {
87
		<-c.Finish
88
		c.GoodServer.Close()
89
		if !c.BadIsClose {
90
			c.BadServer.Close()
91
		}
92
		mesh.Close()
93
		c.Finish <- true
94
	}()
95
	time.Sleep(5 * time.Second) //wait server and mesh start
96
}
97

98
type PathHTTPHandler struct{}
99

100
func (h *PathHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
101
	w.Header().Set("Content-Type", "text/plain")
102
	if strings.Trim(r.URL.Path, "/") != HTTPTestPath {
103
		w.WriteHeader(http.StatusInternalServerError)
104
	}
105
	fmt.Fprintf(w, "\nRequestId:%s\n", r.Header.Get("Requestid"))
106
}
107

108
// BadServer Handler
109
type BadHTTPHandler struct{}
110

111
func (h *BadHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
112
	w.Header().Set("Content-Type", "text/plain")
113
	w.WriteHeader(http.StatusInternalServerError)
114
	fmt.Fprintf(w, "\nRequestId:%s\n", r.Header.Get("Requestid"))
115
}
116

117
func TestRetry(t *testing.T) {
118
	util.StartRetry = true
119
	defer func() {
120
		util.StartRetry = false
121
	}()
122
	testCases := []*RetryCase{
123
		// A server reponse not success
124
		NewRetryCase(t, protocol.HTTP1, protocol.HTTP1, false),
125
		//NewRetryCase(t, protocol.HTTP1, protocol.HTTP2, false),
126
		//NewRetryCase(t, protocol.HTTP2, protocol.HTTP1, false),
127
		NewRetryCase(t, protocol.HTTP2, protocol.HTTP2, false),
128
		// A server is shutdown
129
		NewRetryCase(t, protocol.HTTP1, protocol.HTTP1, true),
130
		// NewRetryCase(t, protocol.HTTP1, protocol.HTTP2, true),
131
		NewRetryCase(t, protocol.HTTP2, protocol.HTTP2, true),
132
		// HTTP2 and SofaRPC will create connection to upstream before send request to upstream
133
		// If upstream is closed, it will failed directly, and we cannot do a retry before we send a request to upstream
134
		/*
135
			NewRetryCase(t, protocol.HTTP2, protocol.HTTP1, true),
136
			NewRetryCase(t, protocol.HTTP2, protocol.HTTP2, true),
137
		*/
138
	}
139
	for i, tc := range testCases {
140
		t.Logf("start case #%d\n", i)
141
		tc.Start(false)
142
		// at least run twice
143
		go tc.RunCase(2, 0)
144
		select {
145
		case err := <-tc.C:
146
			if err != nil {
147
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
148
			}
149
		case <-time.After(15 * time.Second):
150
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
151
		}
152
		tc.FinishCase()
153
	}
154
}
155

156
func TestRetryProxy(t *testing.T) {
157
	util.StartRetry = true
158
	defer func() {
159
		util.StartRetry = false
160
	}()
161
	testCases := []*RetryCase{
162
		NewRetryCase(t, protocol.HTTP1, protocol.HTTP1, false),
163
		NewRetryCase(t, protocol.HTTP2, protocol.HTTP2, false),
164
		//NewRetryCase(t, protocol.HTTP1, protocol.HTTP1, true),
165
		//NewRetryCase(t, protocol.HTTP2, protocol.HTTP2, true),
166
		//NewRetryCase(t, protocol.SofaRPC, protocol.SofaRPC, true),
167
	}
168
	for i, tc := range testCases {
169
		t.Logf("start case #%d\n", i)
170
		tc.StartProxy()
171
		go tc.RunCase(10, 0)
172
		select {
173
		case err := <-tc.C:
174
			if err != nil {
175
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
176
			}
177
		case <-time.After(30 * time.Second):
178
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
179
		}
180
		tc.FinishCase()
181

182
	}
183
}
184

185
type XRetryCase struct {
186
	*XTestCase
187
	GoodServer util.UpstreamServer
188
	BadServer  util.UpstreamServer
189
	BadIsClose bool
190
}
191

192
func NewXRetryCase(t *testing.T, subProtocol types.ProtocolName, isClose bool) *XRetryCase {
193
	app1 := "127.0.0.1:8080"
194
	app2 := "127.0.0.1:8081"
195
	var good, bad util.UpstreamServer
196
	switch subProtocol {
197
	case bolt.ProtocolName:
198
		good = util.NewRPCServer(t, app1, bolt.ProtocolName)
199
		bad = util.RPCServer{
200
			Client:         util.NewRPCClient(t, "rpcClient", bolt.ProtocolName),
201
			Name:           app2,
202
			UpstreamServer: util.NewUpstreamServer(t, app2, ServeBadBoltV1),
203
		}
204
	}
205
	tc := NewXTestCase(t, subProtocol, util.NewRPCServer(t, "", subProtocol)) // Empty RPC server for get rpc client
206
	return &XRetryCase{
207
		XTestCase:  tc,
208
		GoodServer: good,
209
		BadServer:  bad,
210
		BadIsClose: isClose,
211
	}
212
}
213
func (c *XRetryCase) StartProxy() {
214
	c.GoodServer.GoServe()
215
	c.BadServer.GoServe()
216
	app1 := c.GoodServer.Addr()
217
	app2 := c.BadServer.Addr()
218
	if c.BadIsClose {
219
		c.BadServer.Close()
220
	}
221
	clientMeshAddr := util.CurrentMeshAddr()
222
	c.ClientMeshAddr = clientMeshAddr
223
	cfg := util.CreateXProtocolProxyMesh(clientMeshAddr, []string{app1, app2}, c.SubProtocol)
224
	mesh := mosn.NewMosn(cfg)
225
	go mesh.Start()
226
	go func() {
227
		<-c.Finish
228
		c.GoodServer.Close()
229
		if !c.BadIsClose {
230
			c.BadServer.Close()
231
		}
232
		mesh.Close()
233
		c.Finish <- true
234
	}()
235
	time.Sleep(5 * time.Second) //wait server and mesh start
236

237
}
238

239
func (c *XRetryCase) Start(tls bool) {
240
	c.GoodServer.GoServe()
241
	c.BadServer.GoServe()
242
	app1 := c.GoodServer.Addr()
243
	app2 := c.BadServer.Addr()
244
	if c.BadIsClose {
245
		c.BadServer.Close()
246
	}
247
	clientMeshAddr := util.CurrentMeshAddr()
248
	c.ClientMeshAddr = clientMeshAddr
249
	serverMeshAddr := util.CurrentMeshAddr()
250
	cfg := util.CreateXProtocolMesh(clientMeshAddr, serverMeshAddr, c.SubProtocol, []string{app1, app2}, tls)
251
	mesh := mosn.NewMosn(cfg)
252
	go mesh.Start()
253
	go func() {
254
		<-c.Finish
255
		c.GoodServer.Close()
256
		if !c.BadIsClose {
257
			c.BadServer.Close()
258
		}
259
		mesh.Close()
260
		c.Finish <- true
261
	}()
262
	time.Sleep(5 * time.Second) //wait server and mesh start
263
}
264

265
func ServeBadBoltV1(t *testing.T, conn net.Conn) {
266

267
	proto := (&bolt.XCodec{}).NewXProtocol(context.Background())
268
	response := func(iobuf types.IoBuffer) ([]byte, bool) {
269
		cmd, _ := proto.Decode(nil, iobuf)
270
		if cmd == nil {
271
			return nil, false
272
		}
273
		if req, ok := cmd.(*bolt.Request); ok {
274
			resp := bolt.NewRpcResponse(req.RequestId, bolt.ResponseStatusServerException, nil, nil)
275
			iobufresp, err := proto.Encode(nil, resp)
276
			if err != nil {
277
				t.Errorf("Build response error: %v\n", err)
278
				return nil, true
279
			}
280
			return iobufresp.Bytes(), true
281
		}
282
		return nil, true
283
	}
284
	util.ServeRPC(t, conn, response)
285
}
286

287
func TestXRetry(t *testing.T) {
288
	util.StartRetry = true
289
	defer func() {
290
		util.StartRetry = false
291
	}()
292
	testCases := []*XRetryCase{
293
		// A server reponse not success
294
		NewXRetryCase(t, bolt.ProtocolName, true),
295
		//TODO: boltv2, dubbo, tars
296
	}
297
	for i, tc := range testCases {
298
		t.Logf("start case #%d\n", i)
299
		tc.Start(false)
300
		// at least run twice
301
		go tc.RunCase(2, 0)
302
		select {
303
		case err := <-tc.C:
304
			if err != nil {
305
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test xprotocol %s failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, tc.SubProtocol, err)
306
			}
307
		case <-time.After(15 * time.Second):
308
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v xprotocol %s  hang\n", i, tc.AppProtocol, tc.MeshProtocol, tc.SubProtocol)
309
		}
310
		tc.FinishCase()
311
	}
312
}
313

314
func TestXRetryProxy(t *testing.T) {
315
	util.StartRetry = true
316
	defer func() {
317
		util.StartRetry = false
318
	}()
319
	testCases := []*XRetryCase{
320
		NewXRetryCase(t, bolt.ProtocolName, true),
321
		//TODO: boltv2, dubbo, tars
322
	}
323
	for i, tc := range testCases {
324
		t.Logf("start case #%d\n", i)
325
		tc.StartProxy()
326
		go tc.RunCase(10, 0)
327
		select {
328
		case err := <-tc.C:
329
			if err != nil {
330
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v xprotocol %s test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, tc.SubProtocol, err)
331
			}
332
		case <-time.After(30 * time.Second):
333
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v xprotocol %s hang\n", i, tc.AppProtocol, tc.MeshProtocol, tc.SubProtocol)
334
		}
335
		tc.FinishCase()
336

337
	}
338
}
339

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.