kuma

Форк
0
113 строк · 2.9 Кб
1
package client
2

3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
	"time"
8

9
	"google.golang.org/grpc/connectivity"
10

11
	"github.com/kumahq/kuma/pkg/core"
12
)
13

14
var poolLog = core.Log.WithName("intercp").WithName("client").WithName("pool")
15

16
type accessedConn struct {
17
	conn           Conn
18
	url            string
19
	lastAccessTime time.Time
20
}
21

22
// Pool keeps the list of clients to inter-cp servers.
23
// Because the list of inter-cp servers changes in runtime, we need to properly manage the connections to them (initialize, share, close etc.)
24
// Pool helps us to not reimplement this for every inter-cp service (catalog, envoyadmin, etc.)
25
type Pool struct {
26
	newConn      func(string, *TLSConfig) (Conn, error)
27
	idleDeadline time.Duration // the time after which we close the connection if it was not fetched from the pool
28
	now          func() time.Time
29
	connections  map[string]*accessedConn
30
	mut          sync.Mutex
31

32
	tlsCfg *TLSConfig
33
}
34

35
var TLSNotConfigured = errors.New("tls config is not yet set")
36

37
func NewPool(
38
	newConn func(string, *TLSConfig) (Conn, error),
39
	idleDeadline time.Duration,
40
	now func() time.Time,
41
) *Pool {
42
	return &Pool{
43
		newConn:      newConn,
44
		idleDeadline: idleDeadline,
45
		now:          now,
46
		connections:  map[string]*accessedConn{},
47
		mut:          sync.Mutex{},
48
	}
49
}
50

51
func (c *Pool) Client(serverURL string) (Conn, error) {
52
	c.mut.Lock()
53
	defer c.mut.Unlock()
54
	if c.tlsCfg == nil {
55
		return nil, TLSNotConfigured
56
	}
57
	ac, ok := c.connections[serverURL]
58
	createNewConnection := !ok
59
	if ok && ac.conn.GetState() == connectivity.TransientFailure {
60
		createNewConnection = true
61
		poolLog.Info("closing broken connection", "url", serverURL)
62
		if err := ac.conn.Close(); err != nil {
63
			poolLog.Error(err, "cannot close the connection", "url", serverURL)
64
		}
65
	}
66
	if createNewConnection {
67
		poolLog.Info("creating new connection", "url", serverURL)
68
		conn, err := c.newConn(serverURL, c.tlsCfg)
69
		if err != nil {
70
			return nil, err
71
		}
72
		ac = &accessedConn{
73
			conn: conn,
74
			url:  serverURL,
75
		}
76
	}
77
	ac.lastAccessTime = c.now()
78
	c.connections[serverURL] = ac
79
	return ac.conn, nil
80
}
81

82
// SetTLSConfig can configure TLS in runtime.
83
// Because CA of the inter-cp server is managed by the CP in the runtime we cannot configure it when we create the pool.
84
func (c *Pool) SetTLSConfig(tlsCfg *TLSConfig) {
85
	c.mut.Lock()
86
	c.tlsCfg = tlsCfg
87
	c.mut.Unlock()
88
}
89

90
func (c *Pool) StartCleanup(ctx context.Context, ticker *time.Ticker) {
91
	for {
92
		select {
93
		case now := <-ticker.C:
94
			c.cleanup(now)
95
		case <-ctx.Done():
96
			return
97
		}
98
	}
99
}
100

101
func (c *Pool) cleanup(now time.Time) {
102
	c.mut.Lock()
103
	defer c.mut.Unlock()
104
	for url, accessedConn := range c.connections {
105
		if now.Sub(accessedConn.lastAccessTime) > c.idleDeadline {
106
			poolLog.Info("closing connection due to lack of activity", "url", accessedConn.url)
107
			if err := accessedConn.conn.Close(); err != nil {
108
				poolLog.Error(err, "cannot close the connection", "url", accessedConn.url)
109
			}
110
			delete(c.connections, url)
111
		}
112
	}
113
}
114

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.