mosn

Форк
0
/
mirror_test.go 
167 строк · 4.2 Кб
1
package integrate
2

3
import (
4
	"fmt"
5
	"net/http"
6
	"testing"
7
	"time"
8

9
	v2 "mosn.io/mosn/pkg/config/v2"
10
	_ "mosn.io/mosn/pkg/filter/stream/mirror"
11
	"mosn.io/mosn/pkg/protocol"
12
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
13
	_ "mosn.io/mosn/pkg/stream/http"
14
	_ "mosn.io/mosn/pkg/stream/http2"
15
	"mosn.io/mosn/pkg/types"
16
	_ "mosn.io/mosn/pkg/upstream/cluster"
17
	"mosn.io/mosn/test/util"
18
	"mosn.io/mosn/test/util/mosn"
19
)
20

21
type MirrorCase struct {
22
	*TestCase
23
	Servers       []util.UpstreamServer
24
	upstreamAddrs []string
25
}
26

27
// Server Implement
28
type mirrorHandler struct {
29
	ReqCnt    int
30
	LocalAddr string
31
}
32

33
var (
34
	handlers = [3]mirrorHandler{}
35
)
36

37
func (h *mirrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
38
	h.ReqCnt++
39
	w.WriteHeader(200)
40
}
41

42
func NewMirrorCase(t *testing.T, serverProto, meshProto types.ProtocolName) *MirrorCase {
43
	upstreamAddrs := [3]string{
44
		"127.0.0.1:9080",
45
		"127.0.0.1:9081",
46
		"127.0.0.1:9082",
47
	}
48
	var server []util.UpstreamServer
49
	switch serverProto {
50
	case protocol.HTTP1:
51
		for i := 0; i < len(upstreamAddrs); i++ {
52
			s := util.NewHTTPServer(t, &handlers[i])
53
			server = append(server, s)
54
		}
55
	case protocol.HTTP2:
56
		for i := 0; i < len(upstreamAddrs); i++ {
57
			s := util.NewUpstreamHTTP2(t, upstreamAddrs[i], &handlers[i])
58
			server = append(server, s)
59
		}
60
	}
61

62
	tc := NewTestCase(t, serverProto, meshProto, util.NewRPCServer(t, "", bolt.ProtocolName)) // empty server
63
	return &MirrorCase{
64
		TestCase: tc,
65
		Servers:  server,
66
	}
67
}
68

69
// mosn config with stream filter called inject
70
func createMirrorProxyMesh(addr string, hosts []string, proto types.ProtocolName) *v2.MOSNConfig {
71
	clusterName := "default_server"
72
	cmconfig := v2.ClusterManagerConfig{
73
		Clusters: []v2.Cluster{
74
			util.NewBasicCluster(clusterName, hosts),
75
		},
76
	}
77
	routers := []v2.Router{
78
		{
79
			RouterConfig: v2.RouterConfig{
80
				Match: v2.RouterMatch{
81
					Prefix: "/",
82
				},
83
				Route: v2.RouteAction{
84
					RouterActionConfig: v2.RouterActionConfig{
85
						ClusterName: clusterName,
86
					},
87
				},
88
				RequestMirrorPolicies: &v2.RequestMirrorPolicy{
89
					Cluster: clusterName,
90
					Percent: 100,
91
				},
92
			},
93
		},
94
	}
95
	chains := []v2.FilterChain{
96
		util.NewFilterChain("proxyVirtualHost", proto, proto, routers),
97
	}
98
	listener := util.NewListener("proxyListener", addr, chains)
99
	listener.StreamFilters = []v2.Filter{
100
		{
101
			Type: "mirror",
102
			Config: map[string]interface{}{
103
				"amplification": 1,
104
				"broadcast":     true,
105
			},
106
		},
107
	}
108
	cfg := util.NewMOSNConfig([]v2.Listener{listener}, cmconfig)
109
	return cfg
110
}
111

112
func (c *MirrorCase) StartProxy() {
113
	var addrs []string
114
	for i := 0; i < len(c.Servers); i++ {
115
		c.Servers[i].GoServe()
116
		addrs = append(addrs, c.Servers[i].Addr())
117
	}
118
	clientMeshAddr := util.CurrentMeshAddr()
119
	c.ClientMeshAddr = clientMeshAddr
120
	fmt.Printf(" start mosn with protocol:%s, local addr:%s\n, upstreamaddr:%v",
121
		string(c.AppProtocol), clientMeshAddr, addrs)
122
	cfg := createMirrorProxyMesh(clientMeshAddr, addrs, c.AppProtocol)
123
	// cfg := util.CreateProxyMesh(clientMeshAddr, c.upstreamAddrs, c.AppProtocol)
124
	mesh := mosn.NewMosn(cfg)
125
	go mesh.Start()
126
	go func() {
127
		<-c.Finish
128
		for i := 0; i < len(c.Servers); i++ {
129
			c.Servers[i].Close()
130
		}
131
		mesh.Close()
132
		c.Finish <- true
133
	}()
134
	time.Sleep(5 * time.Second) //wait server and mesh start
135
}
136

137
func TestMirror(t *testing.T) {
138
	testCases := []*MirrorCase{
139
		NewMirrorCase(t, protocol.HTTP1, protocol.HTTP1),
140
		//	NewMirrorCase(t, protocol.HTTP1, protocol.HTTP2),
141
		//	NewMirrorCase(t, protocol.HTTP2, protocol.HTTP1),
142
		NewMirrorCase(t, protocol.HTTP2, protocol.HTTP2),
143
	}
144
	caseCount := 2
145
	for i, tc := range testCases {
146
		t.Logf("start case #%d\n", i)
147
		tc.StartProxy()
148
		// at least run twice
149
		go tc.RunCase(caseCount, 0)
150
		select {
151
		case err := <-tc.C:
152
			if err != nil {
153
				t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
154
			}
155
		case <-time.After(15 * time.Second):
156
			t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
157
		}
158
		time.Sleep(2 * time.Second)
159
		tc.FinishCase()
160
	}
161

162
	for i := 0; i < len(handlers); i++ {
163
		if handlers[i].ReqCnt != caseCount*len(testCases) {
164
			t.Errorf("broadcast request not received, reqcnt: %d, addr: %s", handlers[i].ReqCnt, handlers[i].LocalAddr)
165
		}
166
	}
167
}
168

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.