mosn

Форк
0
/
keepalive_test.go 
108 строк · 2.8 Кб
1
package functiontest
2

3
import (
4
	"context"
5
	"net"
6
	"sync/atomic"
7
	"testing"
8
	"time"
9

10
	"mosn.io/api"
11

12
	"mosn.io/mosn/pkg/protocol/xprotocol/bolt"
13
	"mosn.io/mosn/pkg/types"
14
	"mosn.io/mosn/test/util"
15
	"mosn.io/mosn/test/util/mosn"
16
)
17

18
type heartBeatServer struct {
19
	util.UpstreamServer
20
	HeartBeatCount uint32
21
	boltProto      api.XProtocol
22
}
23

24
func (s *heartBeatServer) ServeBoltOrHeartbeat(t *testing.T, conn net.Conn) {
25
	response := func(iobuf types.IoBuffer) ([]byte, bool) {
26
		cmd, _ := s.boltProto.Decode(nil, iobuf)
27
		if cmd == nil {
28
			return nil, false
29
		}
30
		if req, ok := cmd.(*bolt.Request); ok {
31
			var iobufresp types.IoBuffer
32
			var err error
33
			switch req.CmdCode {
34
			case bolt.CmdCodeHeartbeat:
35
				hbAck := s.boltProto.Reply(context.TODO(), req)
36
				iobufresp, err = s.boltProto.Encode(context.Background(), hbAck)
37
				atomic.AddUint32(&s.HeartBeatCount, 1)
38
			case bolt.CmdCodeRpcRequest:
39
				resp := bolt.NewRpcResponse(req.RequestId, bolt.ResponseStatusSuccess, nil, nil)
40
				iobufresp, err = s.boltProto.Encode(context.Background(), resp)
41
			}
42
			if err != nil {
43
				return nil, true
44
			}
45
			return iobufresp.Bytes(), true
46
		}
47
		return nil, true
48
	}
49
	util.ServeRPC(t, conn, response)
50
}
51

52
// Test Proxy Mode
53
// TODO: support protocol convert
54
func TestKeepAlive(t *testing.T) {
55
	appAddr := "127.0.0.1:8080"
56
	server := &heartBeatServer{}
57
	server.UpstreamServer = util.NewUpstreamServer(t, appAddr, server.ServeBoltOrHeartbeat)
58
	server.boltProto = (&bolt.XCodec{}).NewXProtocol(context.Background())
59
	server.GoServe()
60
	clientMeshAddr := util.CurrentMeshAddr()
61
	cfg := util.CreateXProtocolProxyMesh(clientMeshAddr, []string{appAddr}, bolt.ProtocolName)
62
	mesh := mosn.NewMosn(cfg)
63
	go mesh.Start()
64
	stop := make(chan bool)
65
	go func() {
66
		<-stop
67
		server.Close()
68
		mesh.Close()
69
		stop <- true
70
	}()
71
	time.Sleep(5 * time.Second) //wait server and mesh start
72
	// start case
73
	client := util.NewRPCClient(t, "testKeepAlive", bolt.ProtocolName)
74
	if err := client.Connect(clientMeshAddr); err != nil {
75
		t.Fatal(err)
76
	}
77
	// send request, make a connection
78
	client.SendRequest()
79
	// sleep, makes the conn idle, mosn will keep alive with upstream
80
	// interval 15s, sleep to wait 2 heart beat
81
	time.Sleep(2*types.DefaultConnReadTimeout + 3*time.Second)
82
	// send request interval, to stop keep avlie
83
	st := make(chan bool)
84
	go func() {
85
		ticker := time.NewTicker(3 * time.Second)
86
		for {
87
			select {
88
			case <-st:
89
				ticker.Stop()
90
				st <- true
91
				return
92
			case <-ticker.C:
93
				client.SendRequest()
94
			}
95
		}
96
	}()
97
	time.Sleep(types.DefaultConnReadTimeout)
98
	// check, should have and only have 2 heart beat
99
	if server.HeartBeatCount != 2 {
100
		t.Errorf("server receive %d heart beats", server.HeartBeatCount)
101
	}
102
	// stop the ticker goroutine and then stop the case
103
	st <- true
104
	<-st
105
	// stop the case
106
	stop <- true
107
	<-stop
108
}
109

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.