16
"github.com/xtaci/smux"
21
log.Println(http.ListenAndServe("0.0.0.0:9090", nil))
25
// setupServer starts new server listening on a random localhost port and
26
// returns address of the server, function to stop the server, new client
27
// connection to this server or an error.
28
func setupServer(tb testing.TB) (addr string, stopfunc func(), err error) {
29
ln, err := net.Listen("tcp", "localhost:0")
35
conn, err := ln.Accept()
39
go handleConnection(conn)
42
return ln.Addr().String(), func() { ln.Close() }, nil
45
func handleConnection(conn net.Conn) {
46
session, _ := smux.Server(conn, nil)
48
if stream, err := session.AcceptStream(); err == nil {
49
go func(s io.ReadWriteCloser) {
50
buf := make([]byte, 65536)
65
// setupServer starts new server listening on a random localhost port and
66
// returns address of the server, function to stop the server, new client
67
// connection to this server or an error.
68
func setupServerV2(tb testing.TB) (addr string, stopfunc func(), err error) {
69
ln, err := net.Listen("tcp", "localhost:0")
75
conn, err := ln.Accept()
79
go handleConnectionV2(conn)
82
return ln.Addr().String(), func() { ln.Close() }, nil
85
func handleConnectionV2(conn net.Conn) {
86
config := smux.DefaultConfig()
88
session, _ := smux.Server(conn, config)
90
if stream, err := session.AcceptStream(); err == nil {
91
go func(s io.ReadWriteCloser) {
92
buf := make([]byte, 65536)
107
func newConfig() *SmuxConnPoolConfig {
108
cfg := DefaultSmuxConnPoolConfig()
109
cfg.TotalStreams = 1000000
113
func newPool() *SmuxConnectPool {
114
return NewSmuxConnectPool(newConfig())
117
func newPoolV2() *SmuxConnectPool {
119
cfg.TotalStreams = 1000000
121
return NewSmuxConnectPool(cfg)
124
func TestShiftAddrPort(t *testing.T) {
125
if ans := ShiftAddrPort(":17010", 100); ans != ":17110" {
128
if ans := ShiftAddrPort("1.0.0.0:0", 100); ans != "1.0.0.0:100" {
133
func TestVerifySmuxPoolConfig(t *testing.T) {
134
cfg := DefaultSmuxConnPoolConfig()
135
if err := VerifySmuxPoolConfig(cfg); err != nil {
140
func TestEcho(t *testing.T) {
142
addr, stop, err := setupServer(t)
147
stream, err := pool.GetConnect(addr)
151
defer pool.PutConnect(stream, true)
153
buf := make([]byte, 10)
156
for i := 0; i < N; i++ {
157
msg := fmt.Sprintf("hello%v", i)
158
stream.Write([]byte(msg))
160
if n, err := stream.Read(buf); err != nil {
163
received += string(buf[:n])
166
if sent != received {
167
t.Fatal("data mimatch")
171
func TestWriteTo(t *testing.T) {
175
ln, err := net.Listen("tcp", "localhost:0")
182
conn, err := ln.Accept()
186
session, _ := smux.Server(conn, nil)
188
if stream, err := session.AcceptStream(); err == nil {
189
go func(s io.ReadWriteCloser) {
191
buf := make([]byte, 65536)
193
n, err := s.Read(buf)
212
addr := ln.Addr().String()
213
stream, err := pool.GetConnect(addr)
214
defer pool.PutConnect(stream, true)
219
sndbuf := make([]byte, N)
220
for i := range sndbuf {
221
sndbuf[i] = byte(rand.Int())
224
go stream.Write(sndbuf)
226
var rcvbuf bytes.Buffer
227
nw, ew := stream.WriteTo(&rcvbuf)
233
t.Fatal("WriteTo nw mismatch", nw)
236
if !bytes.Equal(sndbuf, rcvbuf.Bytes()) {
237
t.Fatal("mismatched echo bytes")
241
func TestWriteToV2(t *testing.T) {
242
poolV2 := newPoolV2()
243
config := smux.DefaultConfig()
247
ln, err := net.Listen("tcp", "localhost:0")
254
conn, err := ln.Accept()
258
session, _ := smux.Server(conn, config)
260
if stream, err := session.AcceptStream(); err == nil {
261
go func(s io.ReadWriteCloser) {
263
buf := make([]byte, 65536)
265
n, err := s.Read(buf)
284
addr := ln.Addr().String()
285
stream, err := poolV2.GetConnect(addr)
289
defer poolV2.PutConnect(stream, true)
292
sndbuf := make([]byte, N)
293
for i := range sndbuf {
294
sndbuf[i] = byte(rand.Int())
297
go stream.Write(sndbuf)
299
var rcvbuf bytes.Buffer
300
nw, ew := stream.WriteTo(&rcvbuf)
306
t.Fatal("WriteTo nw mismatch", nw)
309
if !bytes.Equal(sndbuf, rcvbuf.Bytes()) {
310
t.Fatal("mismatched echo bytes")
314
func TestSpeed(t *testing.T) {
316
addr, stop, err := setupServer(t)
321
stream, err := pool.GetConnect(addr)
325
t.Log(stream.LocalAddr(), stream.RemoteAddr())
328
var wg sync.WaitGroup
331
buf := make([]byte, 1024*1024)
334
n, err := stream.Read(buf)
340
if nrecv == 4096*4096 {
345
pool.PutConnect(stream, true)
346
t.Log("time for 16MB rtt", time.Since(start))
349
msg := make([]byte, 8192)
350
for i := 0; i < 2048; i++ {
356
func TestTokenLimit(t *testing.T) {
358
cfg.TotalStreams = 1000
359
pool := NewSmuxConnectPool(cfg)
360
addr, stop, err := setupServer(t)
365
streams := make([]*smux.Stream, 0, 1001)
366
for i := 0; i < 1000; i++ {
367
s, err := pool.GetConnect(addr)
371
streams = append(streams, s)
373
s, err := pool.GetConnect(addr)
374
if err != ErrTooMuchSmuxStreams {
377
for _, s := range streams {
378
pool.PutConnect(s, true)
382
func TestParallel(t *testing.T) {
384
addr, stop, err := setupServer(t)
391
wg := sync.WaitGroup{}
393
for i := 0; i < par; i++ {
394
stream, _ := pool.GetConnect(addr)
395
go func(s *smux.Stream) {
396
buf := make([]byte, 20)
397
for j := 0; j < messages; j++ {
398
msg := fmt.Sprintf("hello%v", j)
400
if _, err := s.Read(buf); err != nil {
405
pool.PutConnect(s, false)
412
func TestParallelV2(t *testing.T) {
413
poolV2 := newPoolV2()
414
addr, stop, err := setupServerV2(t)
421
wg := sync.WaitGroup{}
423
for i := 0; i < par; i++ {
424
stream, _ := poolV2.GetConnect(addr)
425
go func(s *smux.Stream) {
426
buf := make([]byte, 20)
427
for j := 0; j < messages; j++ {
428
msg := fmt.Sprintf("hello%v", j)
430
if _, err := s.Read(buf); err != nil {
435
poolV2.PutConnect(s, false)
442
func TestSmuxConnectPool_GetStat(t *testing.T) {
443
addr, stop, err := setupServer(t)
449
for _, streamCnt := range []int{1, 100, 10000} {
450
streams := make([]*smux.Stream, streamCnt)
451
for i := 0; i < streamCnt; i++ {
452
streams[i], err = pool.GetConnect(addr)
457
stat := pool.GetStat()
458
output, _ := json.MarshalIndent(stat, "", "\t")
459
t.Logf("streamCnt:%d, stat:%s", streamCnt, string(output))
460
if stat.TotalStreams != streamCnt {
461
t.Fatal("stat.TotalStreams not correct!")
463
if stat.TotalSessions > pool.cfg.ConnsPerAddr {
464
t.Fatal("too much connections!")
466
for _, s := range streams {
467
pool.PutConnect(s, true)
469
stat = pool.GetStat()
470
if stat.TotalStreams > 0 {
471
t.Fatal("total streams not clean after closed")
476
func TestConcurrentPutConn(t *testing.T) {
478
addr, stop, err := setupServer(t)
484
streams := make([]*smux.Stream, 0, numStreams)
485
var wg sync.WaitGroup
487
for i := 0; i < numStreams; i++ {
488
stream, _ := pool.GetConnect(addr)
489
streams = append(streams, stream)
491
for _, stream := range streams {
492
go func(stream *smux.Stream) {
493
pool.PutConnect(stream, true)
500
func TestConcurrentGetConn(t *testing.T) {
502
addr, stop, err := setupServer(t)
508
streamsCh := make(chan *smux.Stream, numStreams)
509
var wg sync.WaitGroup
512
for i := 0; i < numStreams; i++ {
515
stream, err := pool.GetConnect(addr)
524
if len(streamsCh) != numStreams {
525
t.Fatalf("stream ch not correct, want %d, got %d", numStreams, len(streamsCh))
528
stat := pool.GetStat()
529
output, _ := json.MarshalIndent(stat, "", "\t")
530
t.Logf("streamCnt:%d, stat:%s", numStreams, string(output))
532
if stat.TotalStreams != numStreams {
533
t.Fatalf("stat total streams not right, want %d, got %d", numStreams, stat.TotalStreams)
536
if stat.TotalSessions > pool.cfg.ConnsPerAddr {
537
t.Fatalf("sessions is too big, expect %d, got %d", pool.cfg.ConnsPerAddr, stat.TotalSessions)
541
for stream := range streamsCh {
542
pool.PutConnect(stream, true)
546
func TestConcurrentGetPutConn(t *testing.T) {
548
addr, stop, err := setupServer(t)
554
var wg sync.WaitGroup
557
for i := 0; i < numStreams; i++ {
560
stream, err := pool.GetConnect(addr)
565
time.Sleep(time.Millisecond * time.Duration(rand.Intn(10)+1))
566
pool.PutConnect(stream, true)
570
stat := pool.GetStat()
571
output, _ := json.MarshalIndent(stat, "", "\t")
572
t.Logf("streamCnt:%d, stat:%s", numStreams, string(output))
574
if stat.TotalSessions > pool.cfg.ConnsPerAddr {
575
t.Fatalf("sessions is too big, expect %d, got %d", pool.cfg.ConnsPerAddr, stat.TotalSessions)
581
func TestTinyReadBuffer(t *testing.T) {
583
addr, stop, err := setupServer(t)
588
stream, err := pool.GetConnect(addr)
592
defer pool.PutConnect(stream, true)
594
tinybuf := make([]byte, 6)
597
for i := 0; i < N; i++ {
598
msg := fmt.Sprintf("hello%v", i)
600
nsent, err := stream.Write([]byte(msg))
602
t.Fatal("cannot write")
606
if n, err := stream.Read(tinybuf); err == nil {
608
received += string(tinybuf[:n])
610
t.Fatal("cannot read with tiny buffer")
614
if sent != received {
615
t.Fatal("data mimatch")
619
func TestKeepAliveTimeout(t *testing.T) {
620
addr, stop, err := setupServer(t)
626
cfg := DefaultSmuxConnPoolConfig()
627
cfg.StreamIdleTimeout = int64(sec * time.Second)
629
errAllowed := make(chan error, par/2)
630
notifyError := func(err error) {
633
case errAllowed <- err:
638
pool := NewSmuxConnectPool(cfg)
639
wg := sync.WaitGroup{}
641
for i := 0; i < par; i++ {
644
stream, err := pool.GetConnect(addr)
650
pool.PutConnect(stream, err != nil)
652
buf := make([]byte, 10)
653
_, err = stream.Write(buf)
658
_, err = stream.Read(buf)
666
time.Sleep(sec * time.Second)
668
for pool.GetStat().TotalStreams > 0 || pool.GetStat().TotalSessions > 0 {
669
out, _ := json.MarshalIndent(pool.GetStat(), "", "\t")
671
t.Fatalf("(%v), still not clean after sec(%v)", string(out), failedSec)
674
time.Sleep(time.Second)
677
t.Log("test session timeout success")
680
func TestServerEcho(t *testing.T) {
682
ln, err := net.Listen("tcp", "localhost:0")
688
err := func() error {
689
conn, err := ln.Accept()
694
session, err := smux.Server(conn, nil)
698
defer session.Close()
699
buf := make([]byte, 10)
700
stream, err := session.AcceptStream()
705
for i := 0; i < 100; i++ {
706
msg := fmt.Sprintf("hello%v", i)
707
stream.Write([]byte(msg))
708
n, err := stream.Read(buf)
712
if got := string(buf[:n]); got != msg {
713
return fmt.Errorf("got: %q, want: %q", got, msg)
722
if stream, err := pool.GetConnect(ln.Addr().String()); err == nil {
723
buf := make([]byte, 65536)
725
n, err := stream.Read(buf)
729
stream.Write(buf[:n])
731
pool.PutConnect(stream, true)
737
func TestSendWithoutRecv(t *testing.T) {
739
addr, stop, err := setupServer(t)
744
stream, err := pool.GetConnect(addr)
749
for i := 0; i < N; i++ {
750
msg := fmt.Sprintf("hello%v", i)
751
stream.Write([]byte(msg))
753
buf := make([]byte, 1)
754
if _, err := stream.Read(buf); err != nil {
757
pool.PutConnect(stream, true)
760
func BenchmarkGetConn(b *testing.B) {
762
addr, stop, err := setupServer(b)
767
created := make([]*smux.Stream, 0, b.N)
768
for i := 0; i < b.N; i++ {
769
if stream, err := pool.GetConnect(addr); err == nil {
773
created = append(created, stream)
778
for _, s := range created {
779
pool.PutConnect(s, true)
783
func BenchmarkParallelGetConn(b *testing.B) {
785
addr, stop, err := setupServer(b)
790
closeCh := make(chan struct{})
791
closeOnce := sync.Once{}
792
created := make(chan *smux.Stream, par)
793
wg := sync.WaitGroup{}
803
for i := 0; i < par; i++ {
811
if stream, err := pool.GetConnect(addr); err == nil {
821
for stream := range created {
824
closeOnce.Do(func() {
828
pool.PutConnect(stream, true)
832
func BenchmarkConnSmux(b *testing.B) {
833
cs, ss, recycle, err := getSmuxStreamPair()
841
func BenchmarkConnTCP(b *testing.B) {
842
cs, ss, err := getTCPConnectionPair()
851
func getSmuxStreamPair() (cs *smux.Stream, ss *smux.Stream, recycle func(), err error) {
854
lst, err = net.Listen("tcp", "localhost:0")
856
return nil, nil, nil, err
859
done := make(chan error)
866
conn, rerr = lst.Accept()
870
s, rerr = smux.Server(conn, DefaultSmuxConfig())
874
ss, rerr = s.AcceptStream()
878
cs, err = pool.GetConnect(lst.Addr().String())
880
return nil, nil, nil, err
884
return nil, nil, nil, err
886
return cs, ss, func() {
887
pool.PutConnect(cs, false)
892
func getTCPConnectionPair() (net.Conn, net.Conn, error) {
893
lst, err := net.Listen("tcp", "localhost:0")
901
done := make(chan struct{})
903
conn0, err0 = lst.Accept()
907
conn1, err := net.Dial("tcp", lst.Addr().String())
914
return nil, nil, err0
916
return conn0, conn1, nil
919
func bench(b *testing.B, rd io.Reader, wr io.Writer) {
920
buf := make([]byte, 128*1024)
921
buf2 := make([]byte, 128*1024)
922
b.SetBytes(128 * 1024)
926
var wg sync.WaitGroup
932
n, _ := rd.Read(buf2)
934
if count == 128*1024*b.N {
939
for i := 0; i < b.N; i++ {