9
v2 "mosn.io/mosn/pkg/config/v2"
10
_ "mosn.io/mosn/pkg/filter/stream/mirror"
11
"mosn.io/mosn/pkg/protocol"
12
"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
13
_ "mosn.io/mosn/pkg/stream/http"
14
_ "mosn.io/mosn/pkg/stream/http2"
15
"mosn.io/mosn/pkg/types"
16
_ "mosn.io/mosn/pkg/upstream/cluster"
17
"mosn.io/mosn/test/util"
18
"mosn.io/mosn/test/util/mosn"
21
type MirrorCase struct {
23
Servers []util.UpstreamServer
24
upstreamAddrs []string
28
type mirrorHandler struct {
34
handlers = [3]mirrorHandler{}
37
func (h *mirrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
42
func NewMirrorCase(t *testing.T, serverProto, meshProto types.ProtocolName) *MirrorCase {
43
upstreamAddrs := [3]string{
48
var server []util.UpstreamServer
51
for i := 0; i < len(upstreamAddrs); i++ {
52
s := util.NewHTTPServer(t, &handlers[i])
53
server = append(server, s)
56
for i := 0; i < len(upstreamAddrs); i++ {
57
s := util.NewUpstreamHTTP2(t, upstreamAddrs[i], &handlers[i])
58
server = append(server, s)
62
tc := NewTestCase(t, serverProto, meshProto, util.NewRPCServer(t, "", bolt.ProtocolName)) // empty server
69
// mosn config with stream filter called inject
70
func createMirrorProxyMesh(addr string, hosts []string, proto types.ProtocolName) *v2.MOSNConfig {
71
clusterName := "default_server"
72
cmconfig := v2.ClusterManagerConfig{
73
Clusters: []v2.Cluster{
74
util.NewBasicCluster(clusterName, hosts),
77
routers := []v2.Router{
79
RouterConfig: v2.RouterConfig{
80
Match: v2.RouterMatch{
83
Route: v2.RouteAction{
84
RouterActionConfig: v2.RouterActionConfig{
85
ClusterName: clusterName,
88
RequestMirrorPolicies: &v2.RequestMirrorPolicy{
95
chains := []v2.FilterChain{
96
util.NewFilterChain("proxyVirtualHost", proto, proto, routers),
98
listener := util.NewListener("proxyListener", addr, chains)
99
listener.StreamFilters = []v2.Filter{
102
Config: map[string]interface{}{
108
cfg := util.NewMOSNConfig([]v2.Listener{listener}, cmconfig)
112
func (c *MirrorCase) StartProxy() {
114
for i := 0; i < len(c.Servers); i++ {
115
c.Servers[i].GoServe()
116
addrs = append(addrs, c.Servers[i].Addr())
118
clientMeshAddr := util.CurrentMeshAddr()
119
c.ClientMeshAddr = clientMeshAddr
120
fmt.Printf(" start mosn with protocol:%s, local addr:%s\n, upstreamaddr:%v",
121
string(c.AppProtocol), clientMeshAddr, addrs)
122
cfg := createMirrorProxyMesh(clientMeshAddr, addrs, c.AppProtocol)
123
// cfg := util.CreateProxyMesh(clientMeshAddr, c.upstreamAddrs, c.AppProtocol)
124
mesh := mosn.NewMosn(cfg)
128
for i := 0; i < len(c.Servers); i++ {
134
time.Sleep(5 * time.Second) //wait server and mesh start
137
func TestMirror(t *testing.T) {
138
testCases := []*MirrorCase{
139
NewMirrorCase(t, protocol.HTTP1, protocol.HTTP1),
140
// NewMirrorCase(t, protocol.HTTP1, protocol.HTTP2),
141
// NewMirrorCase(t, protocol.HTTP2, protocol.HTTP1),
142
NewMirrorCase(t, protocol.HTTP2, protocol.HTTP2),
145
for i, tc := range testCases {
146
t.Logf("start case #%d\n", i)
148
// at least run twice
149
go tc.RunCase(caseCount, 0)
153
t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v test failed, error: %v\n", i, tc.AppProtocol, tc.MeshProtocol, err)
155
case <-time.After(15 * time.Second):
156
t.Errorf("[ERROR MESSAGE] #%d %v to mesh %v hang\n", i, tc.AppProtocol, tc.MeshProtocol)
158
time.Sleep(2 * time.Second)
162
for i := 0; i < len(handlers); i++ {
163
if handlers[i].ReqCnt != caseCount*len(testCases) {
164
t.Errorf("broadcast request not received, reqcnt: %d, addr: %s", handlers[i].ReqCnt, handlers[i].LocalAddr)