mosn

Форк
0
/
faultinject_test.go 
321 строка · 8.1 Кб
1
package functiontest
2

3
import (
4
	"crypto/tls"
5
	"encoding/json"
6
	"fmt"
7
	"io/ioutil"
8
	"net"
9
	"net/http"
10
	"testing"
11
	"time"
12

13
	"golang.org/x/net/http2"
14
	v2 "mosn.io/mosn/pkg/config/v2"
15
	_ "mosn.io/mosn/pkg/filter/stream/faultinject"
16
	"mosn.io/mosn/pkg/protocol"
17
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
18
	_ "mosn.io/mosn/pkg/stream/xprotocol"
19
	"mosn.io/mosn/test/integrate"
20
	"mosn.io/mosn/test/util"
21
	"mosn.io/mosn/test/util/mosn"
22
)
23

24
func AddFaultInject(mosn *v2.MOSNConfig, listenername string, faultstr string) error {
25
	// make v2 config
26
	cfg := make(map[string]interface{})
27
	if err := json.Unmarshal([]byte(faultstr), &cfg); err != nil {
28
		return err
29
	}
30
	listeners := mosn.Servers[0].Listeners
31
	for i := range listeners {
32
		l := listeners[i]
33
		if l.Name == listenername {
34
			fault := v2.Filter{
35
				Type:   v2.FaultStream,
36
				Config: cfg,
37
			}
38
			l.ListenerConfig.StreamFilters = append(l.ListenerConfig.StreamFilters, fault)
39
		}
40
		listeners[i] = l
41
	}
42
	return nil
43
}
44

45
func MakeFaultStr(status int, delay time.Duration) string {
46
	tmpl := `{
47
		"delay":{
48
			"fixed_delay":"%s",
49
			"percentage": 100
50
		},
51
		"abort": {
52
			"status": %d,
53
			"percentage": %d
54
		}
55
	}`
56
	abortPercent := 0
57
	if status != 0 {
58
		abortPercent = 100
59
	}
60
	return fmt.Sprintf(tmpl, delay.String(), status, abortPercent)
61
}
62

63
// Proxy Mode is ok
64
type faultInjectCase struct {
65
	*integrate.TestCase
66
	abortstatus int
67
	delay       time.Duration
68
}
69

70
func (c *faultInjectCase) StartProxy() {
71
	c.AppServer.GoServe()
72
	appAddr := c.AppServer.Addr()
73
	clientMeshAddr := util.CurrentMeshAddr()
74
	c.ClientMeshAddr = clientMeshAddr
75
	cfg := util.CreateProxyMesh(clientMeshAddr, []string{appAddr}, c.AppProtocol)
76
	faultstr := MakeFaultStr(c.abortstatus, c.delay)
77
	AddFaultInject(cfg, "proxyListener", faultstr)
78
	mesh := mosn.NewMosn(cfg)
79
	go mesh.Start()
80
	go func() {
81
		<-c.Finish
82
		c.AppServer.Close()
83
		mesh.Close()
84
		c.Finish <- true
85
	}()
86
	time.Sleep(5 * time.Second) //wait server and mesh start
87
}
88

89
func (c *faultInjectCase) RunCase(n int, interval int) {
90
	var call func() error
91
	switch c.AppProtocol {
92
	case protocol.HTTP1:
93
		expectedCode := http.StatusOK
94
		if c.abortstatus != 0 {
95
			expectedCode = c.abortstatus
96
		}
97
		call = func() error {
98
			start := time.Now()
99
			resp, err := http.Get(fmt.Sprintf("http://%s/", c.ClientMeshAddr))
100
			if err != nil {
101
				return err
102
			}
103
			cost := time.Now().Sub(start)
104
			defer resp.Body.Close()
105
			if resp.StatusCode != expectedCode {
106
				return fmt.Errorf("response status: %d", resp.StatusCode)
107
			}
108
			b, err := ioutil.ReadAll(resp.Body)
109
			if err != nil {
110
				return err
111
			}
112
			if c.delay > 0 {
113
				if cost < c.delay {
114
					return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
115
				}
116
			}
117
			c.T.Logf("HTTP client receive data: %s\n", string(b))
118
			return nil
119
		}
120
	case protocol.HTTP2:
121
		expectedCode := http.StatusOK
122
		if c.abortstatus != 0 {
123
			expectedCode = c.abortstatus
124
		}
125
		tr := &http2.Transport{
126
			AllowHTTP: true,
127
			DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
128
				return net.Dial(netw, addr)
129
			},
130
		}
131
		httpClient := http.Client{Transport: tr}
132
		call = func() error {
133
			start := time.Now()
134
			resp, err := httpClient.Get(fmt.Sprintf("http://%s/", c.ClientMeshAddr))
135
			if err != nil {
136
				return err
137
			}
138
			cost := time.Now().Sub(start)
139
			defer resp.Body.Close()
140
			if resp.StatusCode != expectedCode {
141
				return fmt.Errorf("response status: %d", resp.StatusCode)
142

143
			}
144
			b, err := ioutil.ReadAll(resp.Body)
145
			if err != nil {
146
				return err
147
			}
148
			if c.delay > 0 {
149
				if cost < c.delay {
150
					return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
151
				}
152
			}
153
			c.T.Logf("HTTP2 client receive data: %s\n", string(b))
154
			return nil
155
		}
156
	}
157
	for i := 0; i < n; i++ {
158
		if err := call(); err != nil {
159
			c.C <- err
160
			return
161
		}
162
		time.Sleep(time.Duration(interval) * time.Millisecond)
163
	}
164
	c.C <- nil
165
}
166

167
func TestFaultInject(t *testing.T) {
168
	appaddr := "127.0.0.1:8080"
169
	testCases := []*faultInjectCase{
170
		// delay
171
		&faultInjectCase{
172
			TestCase: integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
173
			delay:    time.Second,
174
		},
175
		&faultInjectCase{
176
			TestCase: integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
177
			delay:    time.Second,
178
		},
179
		// abort
180
		&faultInjectCase{
181
			TestCase:    integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
182
			abortstatus: 500,
183
		},
184
		&faultInjectCase{
185
			TestCase:    integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
186
			abortstatus: 500,
187
		},
188
		// delay and abort
189
		&faultInjectCase{
190
			TestCase:    integrate.NewTestCase(t, protocol.HTTP1, protocol.HTTP1, util.NewHTTPServer(t, nil)),
191
			delay:       time.Second,
192
			abortstatus: 500,
193
		},
194
		&faultInjectCase{
195
			TestCase:    integrate.NewTestCase(t, protocol.HTTP2, protocol.HTTP2, util.NewUpstreamHTTP2(t, appaddr, nil)),
196
			delay:       time.Second,
197
			abortstatus: 500,
198
		},
199
	}
200
	for i, tc := range testCases {
201
		t.Logf("start case #%d\n", i)
202
		tc.StartProxy()
203
		go tc.RunCase(1, 0)
204
		select {
205
		case err := <-tc.C:
206
			if err != nil {
207
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
208
			}
209
		case <-time.After(15 * time.Second):
210
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
211
		}
212
		tc.FinishCase()
213
	}
214

215
}
216

217
type XFaultInjectCase struct {
218
	*integrate.XTestCase
219
	abortstatus int
220
	delay       time.Duration
221
}
222

223
func (c *XFaultInjectCase) StartProxy() {
224
	c.AppServer.GoServe()
225
	appAddr := c.AppServer.Addr()
226
	clientMeshAddr := util.CurrentMeshAddr()
227
	c.ClientMeshAddr = clientMeshAddr
228
	cfg := util.CreateXProtocolProxyMesh(clientMeshAddr, []string{appAddr}, c.SubProtocol)
229
	faultstr := MakeFaultStr(c.abortstatus, c.delay)
230
	AddFaultInject(cfg, "proxyListener", faultstr)
231
	mesh := mosn.NewMosn(cfg)
232
	go mesh.Start()
233
	go func() {
234
		<-c.Finish
235
		c.AppServer.Close()
236
		mesh.Close()
237
		c.Finish <- true
238
	}()
239
	time.Sleep(5 * time.Second) //wait server and mesh start
240
}
241

242
func (c *XFaultInjectCase) RunCase(n int, interval int) {
243
	var call func() error
244
	switch c.SubProtocol {
245
	case bolt.ProtocolName:
246
		server, ok := c.AppServer.(*util.RPCServer)
247
		if !ok {
248
			c.C <- fmt.Errorf("need a sofa rpc server")
249
			return
250
		}
251
		client := server.Client
252
		// TODO: rpc abort status have something wrong, fix it later
253
		if c.abortstatus != 0 {
254
			client.ExpectedStatus = int16(bolt.ResponseStatusUnknown)
255
		}
256
		if err := client.Connect(c.ClientMeshAddr); err != nil {
257
			c.C <- err
258
			return
259
		}
260
		defer client.Close()
261
		call = func() error {
262
			start := time.Now()
263
			client.SendRequest()
264
			if !util.WaitMapEmpty(&client.Waits, 2*time.Second) {
265
				return fmt.Errorf("request get no response")
266
			}
267
			cost := time.Now().Sub(start)
268
			if c.delay > 0 {
269
				if cost < c.delay {
270
					return fmt.Errorf("expected delay %s, but cost %s", c.delay.String(), cost.String())
271
				}
272
			}
273
			return nil
274
		}
275
	}
276
	for i := 0; i < n; i++ {
277
		if err := call(); err != nil {
278
			c.C <- err
279
			return
280
		}
281
		time.Sleep(time.Duration(interval) * time.Millisecond)
282
	}
283
	c.C <- nil
284
}
285

286
func TestXFaultInject(t *testing.T) {
287
	appaddr := "127.0.0.1:8080"
288
	testCases := []*XFaultInjectCase{
289
		// delay
290
		&XFaultInjectCase{
291
			XTestCase: integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
292
			delay:     time.Second,
293
		},
294
		// abort
295
		&XFaultInjectCase{
296
			XTestCase:   integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
297
			abortstatus: 500,
298
		},
299
		// delay and abort
300
		&XFaultInjectCase{
301
			XTestCase:   integrate.NewXTestCase(t, bolt.ProtocolName, util.NewRPCServer(t, appaddr, bolt.ProtocolName)),
302
			delay:       time.Second,
303
			abortstatus: 500,
304
		},
305
	}
306
	for i, tc := range testCases {
307
		t.Logf("start case #%d\n", i)
308
		tc.StartProxy()
309
		go tc.RunCase(1, 0)
310
		select {
311
		case err := <-tc.C:
312
			if err != nil {
313
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
314
			}
315
		case <-time.After(15 * time.Second):
316
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
317
		}
318
		tc.FinishCase()
319
	}
320

321
}
322

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.