cubefs

Форк
0
/
smux_conn_pool_test.go 
943 строки · 17.9 Кб
1
package util
2

3
import (
4
	"bytes"
5
	"encoding/json"
6
	"fmt"
7
	"io"
8
	"log"
9
	"math/rand"
10
	"net"
11
	"net/http"
12
	"sync"
13
	"testing"
14
	"time"
15

16
	"github.com/xtaci/smux"
17
)
18

19
func init() {
20
	go func() {
21
		log.Println(http.ListenAndServe("0.0.0.0:9090", nil))
22
	}()
23
}
24

25
// setupServer starts new server listening on a random localhost port and
26
// returns address of the server, function to stop the server, new client
27
// connection to this server or an error.
28
func setupServer(tb testing.TB) (addr string, stopfunc func(), err error) {
29
	ln, err := net.Listen("tcp", "localhost:0")
30
	if err != nil {
31
		return "", nil, err
32
	}
33
	go func() {
34
		for {
35
			conn, err := ln.Accept()
36
			if err != nil {
37
				return
38
			}
39
			go handleConnection(conn)
40
		}
41
	}()
42
	return ln.Addr().String(), func() { ln.Close() }, nil
43
}
44

45
func handleConnection(conn net.Conn) {
46
	session, _ := smux.Server(conn, nil)
47
	for {
48
		if stream, err := session.AcceptStream(); err == nil {
49
			go func(s io.ReadWriteCloser) {
50
				buf := make([]byte, 65536)
51
				for {
52
					n, err := s.Read(buf)
53
					if err != nil {
54
						return
55
					}
56
					s.Write(buf[:n])
57
				}
58
			}(stream)
59
		} else {
60
			return
61
		}
62
	}
63
}
64

65
// setupServer starts new server listening on a random localhost port and
66
// returns address of the server, function to stop the server, new client
67
// connection to this server or an error.
68
func setupServerV2(tb testing.TB) (addr string, stopfunc func(), err error) {
69
	ln, err := net.Listen("tcp", "localhost:0")
70
	if err != nil {
71
		return "", nil, err
72
	}
73
	go func() {
74
		for {
75
			conn, err := ln.Accept()
76
			if err != nil {
77
				return
78
			}
79
			go handleConnectionV2(conn)
80
		}
81
	}()
82
	return ln.Addr().String(), func() { ln.Close() }, nil
83
}
84

85
func handleConnectionV2(conn net.Conn) {
86
	config := smux.DefaultConfig()
87
	config.Version = 2
88
	session, _ := smux.Server(conn, config)
89
	for {
90
		if stream, err := session.AcceptStream(); err == nil {
91
			go func(s io.ReadWriteCloser) {
92
				buf := make([]byte, 65536)
93
				for {
94
					n, err := s.Read(buf)
95
					if err != nil {
96
						return
97
					}
98
					s.Write(buf[:n])
99
				}
100
			}(stream)
101
		} else {
102
			return
103
		}
104
	}
105
}
106

107
func newConfig() *SmuxConnPoolConfig {
108
	cfg := DefaultSmuxConnPoolConfig()
109
	cfg.TotalStreams = 1000000
110
	return cfg
111
}
112

113
func newPool() *SmuxConnectPool {
114
	return NewSmuxConnectPool(newConfig())
115
}
116

117
func newPoolV2() *SmuxConnectPool {
118
	cfg := newConfig()
119
	cfg.TotalStreams = 1000000
120
	cfg.Version = 2
121
	return NewSmuxConnectPool(cfg)
122
}
123

124
func TestShiftAddrPort(t *testing.T) {
125
	if ans := ShiftAddrPort(":17010", 100); ans != ":17110" {
126
		t.Fatal()
127
	}
128
	if ans := ShiftAddrPort("1.0.0.0:0", 100); ans != "1.0.0.0:100" {
129
		t.Fatal()
130
	}
131
}
132

133
func TestVerifySmuxPoolConfig(t *testing.T) {
134
	cfg := DefaultSmuxConnPoolConfig()
135
	if err := VerifySmuxPoolConfig(cfg); err != nil {
136
		t.Fatal(err)
137
	}
138
}
139

140
func TestEcho(t *testing.T) {
141
	pool := newPool()
142
	addr, stop, err := setupServer(t)
143
	if err != nil {
144
		t.Fatal(err)
145
	}
146
	defer stop()
147
	stream, err := pool.GetConnect(addr)
148
	if err != nil {
149
		t.Fatal(err)
150
	}
151
	defer pool.PutConnect(stream, true)
152
	const N = 100
153
	buf := make([]byte, 10)
154
	var sent string
155
	var received string
156
	for i := 0; i < N; i++ {
157
		msg := fmt.Sprintf("hello%v", i)
158
		stream.Write([]byte(msg))
159
		sent += msg
160
		if n, err := stream.Read(buf); err != nil {
161
			t.Fatal(err)
162
		} else {
163
			received += string(buf[:n])
164
		}
165
	}
166
	if sent != received {
167
		t.Fatal("data mimatch")
168
	}
169
}
170

171
func TestWriteTo(t *testing.T) {
172
	pool := newPool()
173
	const N = 1 << 20
174
	// server
175
	ln, err := net.Listen("tcp", "localhost:0")
176
	if err != nil {
177
		t.Fatal(err)
178
	}
179
	defer ln.Close()
180

181
	go func() {
182
		conn, err := ln.Accept()
183
		if err != nil {
184
			return
185
		}
186
		session, _ := smux.Server(conn, nil)
187
		for {
188
			if stream, err := session.AcceptStream(); err == nil {
189
				go func(s io.ReadWriteCloser) {
190
					numBytes := 0
191
					buf := make([]byte, 65536)
192
					for {
193
						n, err := s.Read(buf)
194
						if err != nil {
195
							return
196
						}
197
						s.Write(buf[:n])
198
						numBytes += n
199

200
						if numBytes == N {
201
							s.Close()
202
							return
203
						}
204
					}
205
				}(stream)
206
			} else {
207
				return
208
			}
209
		}
210
	}()
211

212
	addr := ln.Addr().String()
213
	stream, err := pool.GetConnect(addr)
214
	defer pool.PutConnect(stream, true)
215
	if err != nil {
216
		t.Fatal(err)
217
	}
218

219
	sndbuf := make([]byte, N)
220
	for i := range sndbuf {
221
		sndbuf[i] = byte(rand.Int())
222
	}
223

224
	go stream.Write(sndbuf)
225

226
	var rcvbuf bytes.Buffer
227
	nw, ew := stream.WriteTo(&rcvbuf)
228
	if ew != io.EOF {
229
		t.Fatal(ew)
230
	}
231

232
	if nw != N {
233
		t.Fatal("WriteTo nw mismatch", nw)
234
	}
235

236
	if !bytes.Equal(sndbuf, rcvbuf.Bytes()) {
237
		t.Fatal("mismatched echo bytes")
238
	}
239
}
240

241
func TestWriteToV2(t *testing.T) {
242
	poolV2 := newPoolV2()
243
	config := smux.DefaultConfig()
244
	config.Version = 2
245
	const N = 1 << 20
246
	// server
247
	ln, err := net.Listen("tcp", "localhost:0")
248
	if err != nil {
249
		t.Fatal(err)
250
	}
251
	defer ln.Close()
252

253
	go func() {
254
		conn, err := ln.Accept()
255
		if err != nil {
256
			return
257
		}
258
		session, _ := smux.Server(conn, config)
259
		for {
260
			if stream, err := session.AcceptStream(); err == nil {
261
				go func(s io.ReadWriteCloser) {
262
					numBytes := 0
263
					buf := make([]byte, 65536)
264
					for {
265
						n, err := s.Read(buf)
266
						if err != nil {
267
							return
268
						}
269
						s.Write(buf[:n])
270
						numBytes += n
271

272
						if numBytes == N {
273
							s.Close()
274
							return
275
						}
276
					}
277
				}(stream)
278
			} else {
279
				return
280
			}
281
		}
282
	}()
283

284
	addr := ln.Addr().String()
285
	stream, err := poolV2.GetConnect(addr)
286
	if err != nil {
287
		t.Fatal(err)
288
	}
289
	defer poolV2.PutConnect(stream, true)
290

291
	// client
292
	sndbuf := make([]byte, N)
293
	for i := range sndbuf {
294
		sndbuf[i] = byte(rand.Int())
295
	}
296

297
	go stream.Write(sndbuf)
298

299
	var rcvbuf bytes.Buffer
300
	nw, ew := stream.WriteTo(&rcvbuf)
301
	if ew != io.EOF {
302
		t.Fatal(ew)
303
	}
304

305
	if nw != N {
306
		t.Fatal("WriteTo nw mismatch", nw)
307
	}
308

309
	if !bytes.Equal(sndbuf, rcvbuf.Bytes()) {
310
		t.Fatal("mismatched echo bytes")
311
	}
312
}
313

314
func TestSpeed(t *testing.T) {
315
	pool := newPool()
316
	addr, stop, err := setupServer(t)
317
	if err != nil {
318
		t.Fatal(err)
319
	}
320
	defer stop()
321
	stream, err := pool.GetConnect(addr)
322
	if err != nil {
323
		t.Fatal(err)
324
	}
325
	t.Log(stream.LocalAddr(), stream.RemoteAddr())
326

327
	start := time.Now()
328
	var wg sync.WaitGroup
329
	wg.Add(1)
330
	go func() {
331
		buf := make([]byte, 1024*1024)
332
		nrecv := 0
333
		for {
334
			n, err := stream.Read(buf)
335
			if err != nil {
336
				t.Error(err)
337
				break
338
			} else {
339
				nrecv += n
340
				if nrecv == 4096*4096 {
341
					break
342
				}
343
			}
344
		}
345
		pool.PutConnect(stream, true)
346
		t.Log("time for 16MB rtt", time.Since(start))
347
		wg.Done()
348
	}()
349
	msg := make([]byte, 8192)
350
	for i := 0; i < 2048; i++ {
351
		stream.Write(msg)
352
	}
353
	wg.Wait()
354
}
355

356
func TestTokenLimit(t *testing.T) {
357
	cfg := newConfig()
358
	cfg.TotalStreams = 1000
359
	pool := NewSmuxConnectPool(cfg)
360
	addr, stop, err := setupServer(t)
361
	if err != nil {
362
		t.Fatal(err)
363
	}
364
	defer stop()
365
	streams := make([]*smux.Stream, 0, 1001)
366
	for i := 0; i < 1000; i++ {
367
		s, err := pool.GetConnect(addr)
368
		if err != nil {
369
			t.Fatal(err)
370
		}
371
		streams = append(streams, s)
372
	}
373
	s, err := pool.GetConnect(addr)
374
	if err != ErrTooMuchSmuxStreams {
375
		t.Fatal(s, err)
376
	}
377
	for _, s := range streams {
378
		pool.PutConnect(s, true)
379
	}
380
}
381

382
func TestParallel(t *testing.T) {
383
	pool := newPool()
384
	addr, stop, err := setupServer(t)
385
	if err != nil {
386
		t.Fatal(err)
387
	}
388
	defer stop()
389
	par := 1000
390
	messages := 10
391
	wg := sync.WaitGroup{}
392
	wg.Add(par)
393
	for i := 0; i < par; i++ {
394
		stream, _ := pool.GetConnect(addr)
395
		go func(s *smux.Stream) {
396
			buf := make([]byte, 20)
397
			for j := 0; j < messages; j++ {
398
				msg := fmt.Sprintf("hello%v", j)
399
				s.Write([]byte(msg))
400
				if _, err := s.Read(buf); err != nil {
401
					t.Log(err)
402
					break
403
				}
404
			}
405
			pool.PutConnect(s, false)
406
			wg.Done()
407
		}(stream)
408
	}
409
	wg.Wait()
410
}
411

412
func TestParallelV2(t *testing.T) {
413
	poolV2 := newPoolV2()
414
	addr, stop, err := setupServerV2(t)
415
	if err != nil {
416
		t.Fatal(err)
417
	}
418
	defer stop()
419
	par := 1000
420
	messages := 10
421
	wg := sync.WaitGroup{}
422
	wg.Add(par)
423
	for i := 0; i < par; i++ {
424
		stream, _ := poolV2.GetConnect(addr)
425
		go func(s *smux.Stream) {
426
			buf := make([]byte, 20)
427
			for j := 0; j < messages; j++ {
428
				msg := fmt.Sprintf("hello%v", j)
429
				s.Write([]byte(msg))
430
				if _, err := s.Read(buf); err != nil {
431
					t.Log(err)
432
					break
433
				}
434
			}
435
			poolV2.PutConnect(s, false)
436
			wg.Done()
437
		}(stream)
438
	}
439
	wg.Wait()
440
}
441

442
func TestSmuxConnectPool_GetStat(t *testing.T) {
443
	addr, stop, err := setupServer(t)
444
	if err != nil {
445
		t.Fatal(err)
446
	}
447
	defer stop()
448
	pool := newPool()
449
	for _, streamCnt := range []int{1, 100, 10000} {
450
		streams := make([]*smux.Stream, streamCnt)
451
		for i := 0; i < streamCnt; i++ {
452
			streams[i], err = pool.GetConnect(addr)
453
			if err != nil {
454
				t.Fatal(err)
455
			}
456
		}
457
		stat := pool.GetStat()
458
		output, _ := json.MarshalIndent(stat, "", "\t")
459
		t.Logf("streamCnt:%d, stat:%s", streamCnt, string(output))
460
		if stat.TotalStreams != streamCnt {
461
			t.Fatal("stat.TotalStreams not correct!")
462
		}
463
		if stat.TotalSessions > pool.cfg.ConnsPerAddr {
464
			t.Fatal("too much connections!")
465
		}
466
		for _, s := range streams {
467
			pool.PutConnect(s, true)
468
		}
469
		stat = pool.GetStat()
470
		if stat.TotalStreams > 0 {
471
			t.Fatal("total streams not clean after closed")
472
		}
473
	}
474
}
475

476
func TestConcurrentPutConn(t *testing.T) {
477
	pool := newPool()
478
	addr, stop, err := setupServer(t)
479
	if err != nil {
480
		t.Fatal(err)
481
	}
482
	defer stop()
483
	numStreams := 300
484
	streams := make([]*smux.Stream, 0, numStreams)
485
	var wg sync.WaitGroup
486
	wg.Add(numStreams)
487
	for i := 0; i < numStreams; i++ {
488
		stream, _ := pool.GetConnect(addr)
489
		streams = append(streams, stream)
490
	}
491
	for _, stream := range streams {
492
		go func(stream *smux.Stream) {
493
			pool.PutConnect(stream, true)
494
			wg.Done()
495
		}(stream)
496
	}
497
	wg.Wait()
498
}
499

500
func TestConcurrentGetConn(t *testing.T) {
501
	pool := newPool()
502
	addr, stop, err := setupServer(t)
503
	if err != nil {
504
		t.Fatal(err)
505
	}
506
	defer stop()
507
	numStreams := 10000
508
	streamsCh := make(chan *smux.Stream, numStreams)
509
	var wg sync.WaitGroup
510
	wg.Add(numStreams)
511

512
	for i := 0; i < numStreams; i++ {
513
		go func() {
514
			defer wg.Done()
515
			stream, err := pool.GetConnect(addr)
516
			if err != nil {
517
				t.Fatal(err)
518
			}
519
			streamsCh <- stream
520
		}()
521
	}
522

523
	wg.Wait()
524
	if len(streamsCh) != numStreams {
525
		t.Fatalf("stream ch not correct, want %d, got %d", numStreams, len(streamsCh))
526
	}
527

528
	stat := pool.GetStat()
529
	output, _ := json.MarshalIndent(stat, "", "\t")
530
	t.Logf("streamCnt:%d, stat:%s", numStreams, string(output))
531

532
	if stat.TotalStreams != numStreams {
533
		t.Fatalf("stat total streams not right, want %d, got %d", numStreams, stat.TotalStreams)
534
	}
535

536
	if stat.TotalSessions > pool.cfg.ConnsPerAddr {
537
		t.Fatalf("sessions is too big, expect %d, got %d", pool.cfg.ConnsPerAddr, stat.TotalSessions)
538
	}
539

540
	close(streamsCh)
541
	for stream := range streamsCh {
542
		pool.PutConnect(stream, true)
543
	}
544
}
545

546
func TestConcurrentGetPutConn(t *testing.T) {
547
	pool := newPool()
548
	addr, stop, err := setupServer(t)
549
	if err != nil {
550
		t.Fatal(err)
551
	}
552
	defer stop()
553
	numStreams := 10000
554
	var wg sync.WaitGroup
555
	wg.Add(numStreams)
556

557
	for i := 0; i < numStreams; i++ {
558
		go func() {
559
			defer wg.Done()
560
			stream, err := pool.GetConnect(addr)
561
			if err != nil {
562
				t.Fatal(err)
563
			}
564

565
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(10)+1))
566
			pool.PutConnect(stream, true)
567
		}()
568
	}
569

570
	stat := pool.GetStat()
571
	output, _ := json.MarshalIndent(stat, "", "\t")
572
	t.Logf("streamCnt:%d, stat:%s", numStreams, string(output))
573

574
	if stat.TotalSessions > pool.cfg.ConnsPerAddr {
575
		t.Fatalf("sessions is too big, expect %d, got %d", pool.cfg.ConnsPerAddr, stat.TotalSessions)
576
	}
577

578
	wg.Wait()
579
}
580

581
func TestTinyReadBuffer(t *testing.T) {
582
	pool := newPool()
583
	addr, stop, err := setupServer(t)
584
	if err != nil {
585
		t.Fatal(err)
586
	}
587
	defer stop()
588
	stream, err := pool.GetConnect(addr)
589
	if err != nil {
590
		t.Fatal(err)
591
	}
592
	defer pool.PutConnect(stream, true)
593
	const N = 100
594
	tinybuf := make([]byte, 6)
595
	var sent string
596
	var received string
597
	for i := 0; i < N; i++ {
598
		msg := fmt.Sprintf("hello%v", i)
599
		sent += msg
600
		nsent, err := stream.Write([]byte(msg))
601
		if err != nil {
602
			t.Fatal("cannot write")
603
		}
604
		nrecv := 0
605
		for nrecv < nsent {
606
			if n, err := stream.Read(tinybuf); err == nil {
607
				nrecv += n
608
				received += string(tinybuf[:n])
609
			} else {
610
				t.Fatal("cannot read with tiny buffer")
611
			}
612
		}
613
	}
614
	if sent != received {
615
		t.Fatal("data mimatch")
616
	}
617
}
618

619
func TestKeepAliveTimeout(t *testing.T) {
620
	addr, stop, err := setupServer(t)
621
	if err != nil {
622
		t.Fatal(err)
623
	}
624
	defer stop()
625
	const sec = 5
626
	cfg := DefaultSmuxConnPoolConfig()
627
	cfg.StreamIdleTimeout = int64(sec * time.Second)
628
	par := 1000
629
	errAllowed := make(chan error, par/2)
630
	notifyError := func(err error) {
631
		t.Log(err)
632
		select {
633
		case errAllowed <- err:
634
		default:
635
			t.Fatal(err)
636
		}
637
	}
638
	pool := NewSmuxConnectPool(cfg)
639
	wg := sync.WaitGroup{}
640
	wg.Add(par)
641
	for i := 0; i < par; i++ {
642
		go func() {
643
			defer wg.Done()
644
			stream, err := pool.GetConnect(addr)
645
			if err != nil {
646
				notifyError(err)
647
				return
648
			}
649
			defer func() {
650
				pool.PutConnect(stream, err != nil)
651
			}()
652
			buf := make([]byte, 10)
653
			_, err = stream.Write(buf)
654
			if err != nil {
655
				notifyError(err)
656
				return
657
			}
658
			_, err = stream.Read(buf)
659
			if err != nil {
660
				notifyError(err)
661
				return
662
			}
663
		}()
664
	}
665
	wg.Wait()
666
	time.Sleep(sec * time.Second)
667
	failedSec := 0
668
	for pool.GetStat().TotalStreams > 0 || pool.GetStat().TotalSessions > 0 {
669
		out, _ := json.MarshalIndent(pool.GetStat(), "", "\t")
670
		if failedSec > sec {
671
			t.Fatalf("(%v), still not clean after sec(%v)", string(out), failedSec)
672
		}
673
		failedSec++
674
		time.Sleep(time.Second)
675
	}
676

677
	t.Log("test session timeout success")
678
}
679

680
func TestServerEcho(t *testing.T) {
681
	pool := newPool()
682
	ln, err := net.Listen("tcp", "localhost:0")
683
	if err != nil {
684
		t.Fatal(err)
685
	}
686
	defer ln.Close()
687
	go func() {
688
		err := func() error {
689
			conn, err := ln.Accept()
690
			if err != nil {
691
				return err
692
			}
693
			defer conn.Close()
694
			session, err := smux.Server(conn, nil)
695
			if err != nil {
696
				return err
697
			}
698
			defer session.Close()
699
			buf := make([]byte, 10)
700
			stream, err := session.AcceptStream()
701
			if err != nil {
702
				return err
703
			}
704
			defer stream.Close()
705
			for i := 0; i < 100; i++ {
706
				msg := fmt.Sprintf("hello%v", i)
707
				stream.Write([]byte(msg))
708
				n, err := stream.Read(buf)
709
				if err != nil {
710
					return err
711
				}
712
				if got := string(buf[:n]); got != msg {
713
					return fmt.Errorf("got: %q, want: %q", got, msg)
714
				}
715
			}
716
			return nil
717
		}()
718
		if err != nil {
719
			t.Error(err)
720
		}
721
	}()
722
	if stream, err := pool.GetConnect(ln.Addr().String()); err == nil {
723
		buf := make([]byte, 65536)
724
		for {
725
			n, err := stream.Read(buf)
726
			if err != nil {
727
				break
728
			}
729
			stream.Write(buf[:n])
730
		}
731
		pool.PutConnect(stream, true)
732
	} else {
733
		t.Fatal(err)
734
	}
735
}
736

737
func TestSendWithoutRecv(t *testing.T) {
738
	pool := newPool()
739
	addr, stop, err := setupServer(t)
740
	if err != nil {
741
		t.Fatal(err)
742
	}
743
	defer stop()
744
	stream, err := pool.GetConnect(addr)
745
	if err != nil {
746
		t.Fatal(err)
747
	}
748
	const N = 100
749
	for i := 0; i < N; i++ {
750
		msg := fmt.Sprintf("hello%v", i)
751
		stream.Write([]byte(msg))
752
	}
753
	buf := make([]byte, 1)
754
	if _, err := stream.Read(buf); err != nil {
755
		t.Fatal(err)
756
	}
757
	pool.PutConnect(stream, true)
758
}
759

760
func BenchmarkGetConn(b *testing.B) {
761
	pool := newPool()
762
	addr, stop, err := setupServer(b)
763
	if err != nil {
764
		b.Fatal(err)
765
	}
766
	defer stop()
767
	created := make([]*smux.Stream, 0, b.N)
768
	for i := 0; i < b.N; i++ {
769
		if stream, err := pool.GetConnect(addr); err == nil {
770
			if stream == nil {
771
				panic("!!!")
772
			}
773
			created = append(created, stream)
774
		} else {
775
			b.Fatal(err)
776
		}
777
	}
778
	for _, s := range created {
779
		pool.PutConnect(s, true)
780
	}
781
}
782

783
func BenchmarkParallelGetConn(b *testing.B) {
784
	pool := newPool()
785
	addr, stop, err := setupServer(b)
786
	if err != nil {
787
		b.Fatal(err)
788
	}
789
	par := 32
790
	closeCh := make(chan struct{})
791
	closeOnce := sync.Once{}
792
	created := make(chan *smux.Stream, par)
793
	wg := sync.WaitGroup{}
794
	wg.Add(par)
795
	go func() {
796
		for range closeCh {
797
			wg.Wait()
798
			close(created)
799
			stop()
800
			return
801
		}
802
	}()
803
	for i := 0; i < par; i++ {
804
		go func() {
805
			defer wg.Done()
806
			for {
807
				select {
808
				case <-closeCh:
809
					return
810
				default:
811
					if stream, err := pool.GetConnect(addr); err == nil {
812
						created <- stream
813
					} else {
814
						panic(err)
815
					}
816
				}
817
			}
818
		}()
819
	}
820
	n := 0
821
	for stream := range created {
822
		n++
823
		if n >= b.N {
824
			closeOnce.Do(func() {
825
				close(closeCh)
826
			})
827
		}
828
		pool.PutConnect(stream, true)
829
	}
830
}
831

832
func BenchmarkConnSmux(b *testing.B) {
833
	cs, ss, recycle, err := getSmuxStreamPair()
834
	if err != nil {
835
		b.Fatal(err)
836
	}
837
	defer recycle()
838
	bench(b, cs, ss)
839
}
840

841
func BenchmarkConnTCP(b *testing.B) {
842
	cs, ss, err := getTCPConnectionPair()
843
	if err != nil {
844
		b.Fatal(err)
845
	}
846
	defer cs.Close()
847
	defer ss.Close()
848
	bench(b, cs, ss)
849
}
850

851
func getSmuxStreamPair() (cs *smux.Stream, ss *smux.Stream, recycle func(), err error) {
852
	pool := newPool()
853
	var lst net.Listener
854
	lst, err = net.Listen("tcp", "localhost:0")
855
	if err != nil {
856
		return nil, nil, nil, err
857
	}
858
	defer lst.Close()
859
	done := make(chan error)
860
	go func() {
861
		var (
862
			conn net.Conn
863
			s    *smux.Session
864
			rerr error
865
		)
866
		conn, rerr = lst.Accept()
867
		if rerr != nil {
868
			done <- rerr
869
		}
870
		s, rerr = smux.Server(conn, DefaultSmuxConfig())
871
		if rerr != nil {
872
			done <- rerr
873
		}
874
		ss, rerr = s.AcceptStream()
875
		done <- rerr
876
		close(done)
877
	}()
878
	cs, err = pool.GetConnect(lst.Addr().String())
879
	if err != nil {
880
		return nil, nil, nil, err
881
	}
882
	err = <-done
883
	if err != nil {
884
		return nil, nil, nil, err
885
	}
886
	return cs, ss, func() {
887
		pool.PutConnect(cs, false)
888
		ss.Close()
889
	}, err
890
}
891

892
func getTCPConnectionPair() (net.Conn, net.Conn, error) {
893
	lst, err := net.Listen("tcp", "localhost:0")
894
	if err != nil {
895
		return nil, nil, err
896
	}
897
	defer lst.Close()
898

899
	var conn0 net.Conn
900
	var err0 error
901
	done := make(chan struct{})
902
	go func() {
903
		conn0, err0 = lst.Accept()
904
		close(done)
905
	}()
906

907
	conn1, err := net.Dial("tcp", lst.Addr().String())
908
	if err != nil {
909
		return nil, nil, err
910
	}
911

912
	<-done
913
	if err0 != nil {
914
		return nil, nil, err0
915
	}
916
	return conn0, conn1, nil
917
}
918

919
func bench(b *testing.B, rd io.Reader, wr io.Writer) {
920
	buf := make([]byte, 128*1024)
921
	buf2 := make([]byte, 128*1024)
922
	b.SetBytes(128 * 1024)
923
	b.ResetTimer()
924
	b.ReportAllocs()
925

926
	var wg sync.WaitGroup
927
	wg.Add(1)
928
	go func() {
929
		defer wg.Done()
930
		count := 0
931
		for {
932
			n, _ := rd.Read(buf2)
933
			count += n
934
			if count == 128*1024*b.N {
935
				return
936
			}
937
		}
938
	}()
939
	for i := 0; i < b.N; i++ {
940
		wr.Write(buf)
941
	}
942
	wg.Wait()
943
}
944

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.