mosn

Форк
0
/
direct_test.go 
302 строки · 7.7 Кб
1
package functiontest
2

3
import (
4
	"crypto/tls"
5
	"fmt"
6
	"io/ioutil"
7
	"net"
8
	"net/http"
9
	"testing"
10
	"time"
11

12
	"golang.org/x/net/http2"
13
	v2 "mosn.io/mosn/pkg/config/v2"
14
	"mosn.io/mosn/pkg/protocol"
15
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
16
	"mosn.io/mosn/pkg/types"
17
	"mosn.io/mosn/test/util"
18
	"mosn.io/mosn/test/util/mosn"
19
)
20

21
// Test Direct Response
22
// TODO: fix direct response with body in rpc
23
// TODO: rpc status have something wrong
24

25
// Direct Response ignore upstream cluster information and route rule config
26
func CreateDirectMeshProxy(addr string, proto types.ProtocolName, response *v2.DirectResponseAction) *v2.MOSNConfig {
27
	cmconfig := v2.ClusterManagerConfig{
28
		Clusters: []v2.Cluster{
29
			v2.Cluster{
30
				Name: "cluster",
31
			},
32
		},
33
	}
34
	routers := []v2.Router{
35
		NewDirectResponseHeaderRouter(".*", response),
36
		NewDirectResponsePrefixRouter("/", response),
37
	}
38
	chains := []v2.FilterChain{
39
		util.NewFilterChain("proxyVirtualHost", proto, proto, routers),
40
	}
41
	listener := util.NewListener("proxyListener", addr, chains)
42
	return util.NewMOSNConfig([]v2.Listener{listener}, cmconfig)
43
}
44

45
// Direct Response ignore upstream cluster information and route rule config
46
func CreateXDirectMeshProxy(addr string, proto types.ProtocolName, response *v2.DirectResponseAction) *v2.MOSNConfig {
47
	cmconfig := v2.ClusterManagerConfig{
48
		Clusters: []v2.Cluster{
49
			v2.Cluster{
50
				Name: "cluster",
51
			},
52
		},
53
	}
54
	routers := []v2.Router{
55
		NewDirectResponseHeaderRouter(".*", response),
56
		NewDirectResponsePrefixRouter("/", response),
57
	}
58
	chains := []v2.FilterChain{
59
		util.NewXProtocolFilterChain("proxyVirtualHost", proto, routers),
60
	}
61
	listener := util.NewListener("proxyListener", addr, chains)
62
	return util.NewMOSNConfig([]v2.Listener{listener}, cmconfig)
63
}
64

65
func NewDirectResponseHeaderRouter(value string, response *v2.DirectResponseAction) v2.Router {
66
	header := v2.HeaderMatcher{Name: "service", Value: value}
67
	return v2.Router{
68
		RouterConfig: v2.RouterConfig{
69
			Match:          v2.RouterMatch{Headers: []v2.HeaderMatcher{header}},
70
			DirectResponse: response,
71
		},
72
	}
73
}
74
func NewDirectResponsePrefixRouter(prefix string, response *v2.DirectResponseAction) v2.Router {
75
	return v2.Router{
76
		RouterConfig: v2.RouterConfig{
77
			Match:          v2.RouterMatch{Prefix: prefix},
78
			DirectResponse: response,
79
		},
80
	}
81
}
82

83
// Proxy Mode is OK
84
type DirectResponseCase struct {
85
	Protocol   types.ProtocolName
86
	RPCClient  *util.RPCClient
87
	C          chan error
88
	T          *testing.T
89
	ClientAddr string
90
	Finish     chan bool
91
	status     int
92
	body       string
93
}
94

95
func NewDirectResponseCase(t *testing.T, proto types.ProtocolName, status int, body string, client *util.RPCClient) *DirectResponseCase {
96
	return &DirectResponseCase{
97
		Protocol:  proto,
98
		RPCClient: client,
99
		C:         make(chan error),
100
		T:         t,
101
		Finish:    make(chan bool),
102
		status:    status,
103
		body:      body,
104
	}
105
}
106

107
func (c *DirectResponseCase) StartProxy() {
108
	addr := util.CurrentMeshAddr()
109
	c.ClientAddr = addr
110
	resp := &v2.DirectResponseAction{
111
		StatusCode: c.status,
112
		Body:       c.body,
113
	}
114
	cfg := CreateDirectMeshProxy(addr, c.Protocol, resp)
115
	mesh := mosn.NewMosn(cfg)
116
	go mesh.Start()
117
	go func() {
118
		<-c.Finish
119
		mesh.Close()
120
		c.Finish <- true
121
	}()
122
	time.Sleep(5 * time.Second) //wait server and mesh start
123
}
124

125
func (c *DirectResponseCase) FinishCase() {
126
	c.Finish <- true
127
	<-c.Finish
128
}
129

130
func (c *DirectResponseCase) RunCase(n int, interval time.Duration) {
131
	var call func() error
132
	switch c.Protocol {
133
	case protocol.HTTP1:
134
		call = func() error {
135
			url := fmt.Sprintf("http://%s/", c.ClientAddr)
136
			resp, err := http.Get(url)
137
			if err != nil {
138
				return err
139
			}
140
			defer resp.Body.Close()
141
			if resp.StatusCode != c.status {
142
				return fmt.Errorf("response status: %d", resp.StatusCode)
143
			}
144
			b, err := ioutil.ReadAll(resp.Body)
145
			if err != nil {
146
				return err
147
			}
148
			if string(b) != c.body {
149
				return fmt.Errorf("response body: %s", string(b))
150
			}
151
			return nil
152
		}
153
	case protocol.HTTP2:
154
		tr := &http2.Transport{
155
			AllowHTTP: true,
156
			DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
157
				return net.Dial(netw, addr)
158
			},
159
		}
160
		httpClient := http.Client{Transport: tr}
161
		call = func() error {
162
			url := fmt.Sprintf("http://%s/", c.ClientAddr)
163
			resp, err := httpClient.Get(url)
164
			if err != nil {
165
				return err
166
			}
167
			defer resp.Body.Close()
168
			if resp.StatusCode != c.status {
169
				return fmt.Errorf("response status: %d", resp.StatusCode)
170
			}
171
			b, err := ioutil.ReadAll(resp.Body)
172
			if err != nil {
173
				return err
174
			}
175
			if string(b) != c.body {
176
				return fmt.Errorf("response body: %s", string(b))
177
			}
178
			return nil
179
		}
180
	}
181
	for i := 0; i < n; i++ {
182
		if err := call(); err != nil {
183
			c.C <- err
184
			return
185
		}
186
		time.Sleep(interval)
187
	}
188
	c.C <- nil
189
}
190

191
// xprotocl extend
192
type XDirectResponseCase struct {
193
	*DirectResponseCase
194
}
195

196
func NewXDirectResponseCase(t *testing.T, proto types.ProtocolName, status int, body string, client *util.RPCClient) *XDirectResponseCase {
197
	return &XDirectResponseCase{
198
		DirectResponseCase: &DirectResponseCase{
199
			Protocol:  proto,
200
			RPCClient: client,
201
			C:         make(chan error),
202
			T:         t,
203
			Finish:    make(chan bool),
204
			status:    status,
205
			body:      body,
206
		}}
207
}
208

209
func (c *XDirectResponseCase) StartProxy() {
210
	addr := util.CurrentMeshAddr()
211
	c.ClientAddr = addr
212
	resp := &v2.DirectResponseAction{
213
		StatusCode: c.status,
214
		Body:       c.body,
215
	}
216
	cfg := CreateXDirectMeshProxy(addr, c.Protocol, resp)
217
	mesh := mosn.NewMosn(cfg)
218
	go mesh.Start()
219
	go func() {
220
		<-c.Finish
221
		mesh.Close()
222
		c.Finish <- true
223
	}()
224
	time.Sleep(5 * time.Second) //wait server and mesh start
225
}
226
func (c *XDirectResponseCase) RunCase(n int, interval time.Duration) {
227
	var call func() error
228
	switch c.Protocol {
229
	case bolt.ProtocolName:
230
		client := c.RPCClient
231
		if err := client.Connect(c.ClientAddr); err != nil {
232
			c.C <- err
233
			return
234
		}
235
		if c.status != 200 {
236
			client.ExpectedStatus = int16(bolt.ResponseStatusUnknown)
237
		}
238
		defer client.Close()
239
		call = func() error {
240
			client.SendRequest()
241
			if !util.WaitMapEmpty(&client.Waits, 2*time.Second) {
242
				return fmt.Errorf("request get no response")
243
			}
244
			return nil
245
		}
246
	}
247
	for i := 0; i < n; i++ {
248
		if err := call(); err != nil {
249
			c.C <- err
250
			return
251
		}
252
		time.Sleep(interval)
253
	}
254
	c.C <- nil
255
}
256

257
func TestDirectResponse(t *testing.T) {
258
	testCases := []*DirectResponseCase{
259
		NewDirectResponseCase(t, protocol.HTTP1, 500, "", nil),
260
		NewDirectResponseCase(t, protocol.HTTP2, 500, "", nil),
261
		NewDirectResponseCase(t, protocol.HTTP1, 500, "internal error", nil),
262
		NewDirectResponseCase(t, protocol.HTTP2, 500, "internal error", nil),
263
		NewDirectResponseCase(t, protocol.HTTP1, 200, "testdata", nil),
264
		NewDirectResponseCase(t, protocol.HTTP2, 200, "testdata", nil),
265
	}
266
	for i, tc := range testCases {
267
		t.Logf("start case #%d\n", i)
268
		tc.StartProxy()
269
		go tc.RunCase(1, 0)
270
		select {
271
		case err := <-tc.C:
272
			if err != nil {
273
				t.Errorf("[ERROR MESSAGE] #%d protocol %v test failed, error: %v\n", i, tc.Protocol, err)
274
			}
275
		case <-time.After(15 * time.Second):
276
			t.Errorf("[ERROR MESSAGE] #%d protocol %v hang\n", i, tc.Protocol)
277
		}
278
		tc.FinishCase()
279
	}
280
}
281

282
func TestXDirectResponse(t *testing.T) {
283
	testCases := []*XDirectResponseCase{
284
		// RPC
285
		// FIXME: RPC cannot direct response success, code will be transfer
286
		NewXDirectResponseCase(t, bolt.ProtocolName, 500, "", util.NewRPCClient(t, "directfail", bolt.ProtocolName)),
287
	}
288
	for i, tc := range testCases {
289
		t.Logf("start case #%d\n", i)
290
		tc.StartProxy()
291
		go tc.RunCase(1, 0)
292
		select {
293
		case err := <-tc.C:
294
			if err != nil {
295
				t.Errorf("[ERROR MESSAGE] #%d protocol %v test failed, error: %v\n", i, tc.Protocol, err)
296
			}
297
		case <-time.After(15 * time.Second):
298
			t.Errorf("[ERROR MESSAGE] #%d protocol %v hang\n", i, tc.Protocol)
299
		}
300
		tc.FinishCase()
301
	}
302
}
303

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.