mosn
321 строка · 8.1 Кб
1package functiontest
2
3import (
4"crypto/tls"
5"encoding/json"
6"fmt"
7"io/ioutil"
8"net"
9"net/http"
10"testing"
11"time"
12
13"golang.org/x/net/http2"
14v2 "mosn.io/mosn/pkg/config/v2"
15_ "mosn.io/mosn/pkg/filter/stream/faultinject"
16"mosn.io/mosn/pkg/protocol"
17"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
18_ "mosn.io/mosn/pkg/stream/xprotocol"
19"mosn.io/mosn/test/integrate"
20"mosn.io/mosn/test/util"
21"mosn.io/mosn/test/util/mosn"
22)
23
24func AddFaultInject(mosn *v2.MOSNConfig, listenername string, faultstr string) error {
25// make v2 config
26cfg := make(map[string]interface{})
27if err := json.Unmarshal([]byte(faultstr), &cfg); err != nil {
28return err
29}
30listeners := mosn.Servers[0].Listeners
31for i := range listeners {
32l := listeners[i]
33if l.Name == listenername {
34fault := v2.Filter{
35Type: v2.FaultStream,
36Config: cfg,
37}
38l.ListenerConfig.StreamFilters = append(l.ListenerConfig.StreamFilters, fault)
39}
40listeners[i] = l
41}
42return nil
43}
44
45func MakeFaultStr(status int, delay time.Duration) string {
46tmpl := `{
47"delay":{
48"fixed_delay":"%s",
49"percentage": 100
50},
51"abort": {
52"status": %d,
53"percentage": %d
54}
55}`
56abortPercent := 0
57if status != 0 {
58abortPercent = 100
59}
60return fmt.Sprintf(tmpl, delay.String(), status, abortPercent)
61}
62
63// Proxy Mode is ok
64type faultInjectCase struct {
65*integrate.TestCase
66abortstatus int
67delay time.Duration
68}
69
70func (c *faultInjectCase) StartProxy() {
71c.AppServer.GoServe()
72appAddr := c.AppServer.Addr()
73clientMeshAddr := util.CurrentMeshAddr()
74c.ClientMeshAddr = clientMeshAddr
75cfg := util.CreateProxyMesh(clientMeshAddr, []string{appAddr}, c.AppProtocol)
76faultstr := MakeFaultStr(c.abortstatus, c.delay)
77AddFaultInject(cfg, "proxyListener", faultstr)
78mesh := mosn.NewMosn(cfg)
79go mesh.Start()
80go func() {
81<-c.Finish
82c.AppServer.Close()
83mesh.Close()
84c.Finish <- true
85}()
86time.Sleep(5 * time.Second) //wait server and mesh start
87}
88
89func (c *faultInjectCase) RunCase(n int, interval int) {
90var call func() error
91switch c.AppProtocol {
92case protocol.HTTP1:
93expectedCode := http.StatusOK
94if c.abortstatus != 0 {
95expectedCode = c.abortstatus
96}
97call = func() error {
98start := time.Now()
99resp, err := http.Get(fmt.Sprintf("http://%s/", c.ClientMeshAddr))
100if err != nil {
101return err
102}
103cost := time.Now().Sub(start)
104defer resp.Body.Close()
105if resp.StatusCode != expectedCode {
106return fmt.Errorf("response status: %d", resp.StatusCode)
107}
108b, err := ioutil.ReadAll(resp.Body)
109if err != nil {
110return err
111}
112if c.delay > 0 {
113if cost < c.delay {
114return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
115}
116}
117c.T.Logf("HTTP client receive data: %s\n", string(b))
118return nil
119}
120case protocol.HTTP2:
121expectedCode := http.StatusOK
122if c.abortstatus != 0 {
123expectedCode = c.abortstatus
124}
125tr := &http2.Transport{
126AllowHTTP: true,
127DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
128return net.Dial(netw, addr)
129},
130}
131httpClient := http.Client{Transport: tr}
132call = func() error {
133start := time.Now()
134resp, err := httpClient.Get(fmt.Sprintf("http://%s/", c.ClientMeshAddr))
135if err != nil {
136return err
137}
138cost := time.Now().Sub(start)
139defer resp.Body.Close()
140if resp.StatusCode != expectedCode {
141return fmt.Errorf("response status: %d", resp.StatusCode)
142
143}
144b, err := ioutil.ReadAll(resp.Body)
145if err != nil {
146return err
147}
148if c.delay > 0 {
149if cost < c.delay {
150return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
151}
152}
153c.T.Logf("HTTP2 client receive data: %s\n", string(b))
154return nil
155}
156}
157for i := 0; i < n; i++ {
158if err := call(); err != nil {
159c.C <- err
160return
161}
162time.Sleep(time.Duration(interval) * time.Millisecond)
163}
164c.C <- nil
165}
166
167func TestFaultInject(t *testing.T) {
168appaddr := "127.0.0.1:8080"
169testCases := []*faultInjectCase{
170// delay
171&faultInjectCase{
172TestCase: integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
173delay: time.Second,
174},
175&faultInjectCase{
176TestCase: integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
177delay: time.Second,
178},
179// abort
180&faultInjectCase{
181TestCase: integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
182abortstatus: 500,
183},
184&faultInjectCase{
185TestCase: integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
186abortstatus: 500,
187},
188// delay and abort
189&faultInjectCase{
190TestCase: integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
191delay: time.Second,
192abortstatus: 500,
193},
194&faultInjectCase{
195TestCase: integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
196delay: time.Second,
197abortstatus: 500,
198},
199}
200for i, tc := range testCases {
201t.Logf("start case #%d\n", i)
202tc.StartProxy()
203go tc.RunCase(1, 0)
204select {
205case err := <-tc.C:
206if err != nil {
207t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
208}
209case <-time.After(15 * time.Second):
210t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
211}
212tc.FinishCase()
213}
214
215}
216
217type XFaultInjectCase struct {
218*integrate.XTestCase
219abortstatus int
220delay time.Duration
221}
222
223func (c *XFaultInjectCase) StartProxy() {
224c.AppServer.GoServe()
225appAddr := c.AppServer.Addr()
226clientMeshAddr := util.CurrentMeshAddr()
227c.ClientMeshAddr = clientMeshAddr
228cfg := util.CreateXProtocolProxyMesh(clientMeshAddr, []string{appAddr}, c.SubProtocol)
229faultstr := MakeFaultStr(c.abortstatus, c.delay)
230AddFaultInject(cfg, "proxyListener", faultstr)
231mesh := mosn.NewMosn(cfg)
232go mesh.Start()
233go func() {
234<-c.Finish
235c.AppServer.Close()
236mesh.Close()
237c.Finish <- true
238}()
239time.Sleep(5 * time.Second) //wait server and mesh start
240}
241
242func (c *XFaultInjectCase) RunCase(n int, interval int) {
243var call func() error
244switch c.SubProtocol {
245case bolt.ProtocolName:
246server, ok := c.AppServer.(*util.RPCServer)
247if !ok {
248c.C <- fmt.Errorf("need a sofa rpc server")
249return
250}
251client := server.Client
252// TODO: rpc abort status have something wrong, fix it later
253if c.abortstatus != 0 {
254client.ExpectedStatus = int16(bolt.ResponseStatusUnknown)
255}
256if err := client.Connect(c.ClientMeshAddr); err != nil {
257c.C <- err
258return
259}
260defer client.Close()
261call = func() error {
262start := time.Now()
263client.SendRequest()
264if !util.WaitMapEmpty(&client.Waits, 2*time.Second) {
265return fmt.Errorf("request get no response")
266}
267cost := time.Now().Sub(start)
268if c.delay > 0 {
269if cost < c.delay {
270return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
271}
272}
273return nil
274}
275}
276for i := 0; i < n; i++ {
277if err := call(); err != nil {
278c.C <- err
279return
280}
281time.Sleep(time.Duration(interval) * time.Millisecond)
282}
283c.C <- nil
284}
285
286func TestXFaultInject(t *testing.T) {
287appaddr := "127.0.0.1:8080"
288testCases := []*XFaultInjectCase{
289// delay
290&XFaultInjectCase{
291XTestCase: integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
292delay: time.Second,
293},
294// abort
295&XFaultInjectCase{
296XTestCase: integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
297abortstatus: 500,
298},
299// delay and abort
300&XFaultInjectCase{
301XTestCase: integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
302delay: time.Second,
303abortstatus: 500,
304},
305}
306for i, tc := range testCases {
307t.Logf("start case #%d\n", i)
308tc.StartProxy()
309go tc.RunCase(1, 0)
310select {
311case err := <-tc.C:
312if err != nil {
313t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
314}
315case <-time.After(15 * time.Second):
316t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
317}
318tc.FinishCase()
319}
320
321}
322