9
"google.golang.org/grpc/connectivity"
11
"github.com/kumahq/kuma/pkg/core"
14
var poolLog = core.Log.WithName("intercp").WithName("client").WithName("pool")
16
type accessedConn struct {
19
lastAccessTime time.Time
26
newConn func(string, *TLSConfig) (Conn, error)
27
idleDeadline time.Duration
29
connections map[string]*accessedConn
35
var TLSNotConfigured = errors.New("tls config is not yet set")
38
newConn func(string, *TLSConfig) (Conn, error),
39
idleDeadline time.Duration,
44
idleDeadline: idleDeadline,
46
connections: map[string]*accessedConn{},
51
func (c *Pool) Client(serverURL string) (Conn, error) {
55
return nil, TLSNotConfigured
57
ac, ok := c.connections[serverURL]
58
createNewConnection := !ok
59
if ok && ac.conn.GetState() == connectivity.TransientFailure {
60
createNewConnection = true
61
poolLog.Info("closing broken connection", "url", serverURL)
62
if err := ac.conn.Close(); err != nil {
63
poolLog.Error(err, "cannot close the connection", "url", serverURL)
66
if createNewConnection {
67
poolLog.Info("creating new connection", "url", serverURL)
68
conn, err := c.newConn(serverURL, c.tlsCfg)
77
ac.lastAccessTime = c.now()
78
c.connections[serverURL] = ac
84
func (c *Pool) SetTLSConfig(tlsCfg *TLSConfig) {
90
func (c *Pool) StartCleanup(ctx context.Context, ticker *time.Ticker) {
93
case now := <-ticker.C:
101
func (c *Pool) cleanup(now time.Time) {
104
for url, accessedConn := range c.connections {
105
if now.Sub(accessedConn.lastAccessTime) > c.idleDeadline {
106
poolLog.Info("closing connection due to lack of activity", "url", accessedConn.url)
107
if err := accessedConn.conn.Close(); err != nil {
108
poolLog.Error(err, "cannot close the connection", "url", accessedConn.url)
110
delete(c.connections, url)