1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
30
defaultConnectTimeout = 1
33
type ConnectPool struct {
35
pools map[string]*Pool
44
func NewConnectPool() (cp *ConnectPool) {
46
pools: make(map[string]*Pool),
49
timeout: int64(time.Second * ConnectIdleTime),
50
connectTimeout: defaultConnectTimeout,
51
closeCh: make(chan struct{}),
58
func NewConnectPoolWithTimeout(idleConnTimeout time.Duration, connectTimeout int64) (cp *ConnectPool) {
60
pools: make(map[string]*Pool),
63
timeout: int64(idleConnTimeout * time.Second),
64
connectTimeout: connectTimeout,
65
closeCh: make(chan struct{}),
72
func DailTimeOut(target string, timeout time.Duration) (c *net.TCPConn, err error) {
74
connect, err = net.DialTimeout("tcp", target, timeout)
76
conn := connect.(*net.TCPConn)
77
conn.SetKeepAlive(true)
84
func (cp *ConnectPool) GetConnect(targetAddr string) (c *net.TCPConn, err error) {
86
pool, ok := cp.pools[targetAddr]
89
newPool := NewPool(cp.mincap, cp.maxcap, cp.timeout, cp.connectTimeout, targetAddr)
91
pool, ok = cp.pools[targetAddr]
93
// pool = NewPool(cp.mincap, cp.maxcap, cp.timeout, cp.connectTimeout, targetAddr)
95
cp.pools[targetAddr] = pool
100
return pool.GetConnectFromPool()
103
func (cp *ConnectPool) PutConnect(c *net.TCPConn, forceClose bool) {
117
addr := c.RemoteAddr().String()
119
pool, ok := cp.pools[addr]
125
object := &Object{conn: c, idle: time.Now().UnixNano()}
126
pool.PutConnectObjectToPool(object)
129
func (cp *ConnectPool) autoRelease() {
130
timer := time.NewTimer(time.Second)
138
pools := make([]*Pool, 0)
140
for _, pool := range cp.pools {
141
pools = append(pools, pool)
144
for _, pool := range pools {
147
timer.Reset(time.Second)
151
func (cp *ConnectPool) releaseAll() {
152
pools := make([]*Pool, 0)
154
for _, pool := range cp.pools {
155
pools = append(pools, pool)
158
for _, pool := range pools {
163
func (cp *ConnectPool) Close() {
164
cp.closeOnce.Do(func() {
179
func NewPool(min, max int, timeout, connectTimeout int64, target string) (p *Pool) {
184
p.objects = make(chan *Object, max)
186
p.connectTimeout = connectTimeout
191
func (p *Pool) initAllConnect() {
192
for i := 0; i < p.mincap; i++ {
193
c, err := net.Dial("tcp", p.target)
195
conn := c.(*net.TCPConn)
196
conn.SetKeepAlive(true)
197
conn.SetNoDelay(true)
198
o := &Object{conn: conn, idle: time.Now().UnixNano()}
199
p.PutConnectObjectToPool(o)
204
func (p *Pool) PutConnectObjectToPool(o *Object) {
216
func (p *Pool) autoRelease() {
217
connectLen := len(p.objects)
218
for i := 0; i < connectLen; i++ {
220
case o := <-p.objects:
221
if time.Now().UnixNano()-int64(o.idle) > p.timeout {
224
p.PutConnectObjectToPool(o)
232
func (p *Pool) ReleaseAll() {
233
connectLen := len(p.objects)
234
for i := 0; i < connectLen; i++ {
236
case o := <-p.objects:
244
func (p *Pool) NewConnect(target string) (c *net.TCPConn, err error) {
246
connect, err = net.DialTimeout("tcp", p.target, time.Duration(p.connectTimeout)*time.Second)
248
conn := connect.(*net.TCPConn)
249
conn.SetKeepAlive(true)
250
conn.SetNoDelay(true)
256
func (p *Pool) GetConnectFromPool() (c *net.TCPConn, err error) {
260
case o = <-p.objects:
262
return p.NewConnect(p.target)
264
if time.Now().UnixNano()-int64(o.idle) > p.timeout {