cubefs

Форк
0
/
conn_pool.go 
271 строка · 5.4 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package util
16

17
import (
18
	"net"
19
	"sync"
20
	"time"
21
)
22

23
type Object struct {
24
	conn *net.TCPConn
25
	idle int64
26
}
27

28
const (
29
	ConnectIdleTime       = 30
30
	defaultConnectTimeout = 1
31
)
32

33
type ConnectPool struct {
34
	sync.RWMutex
35
	pools          map[string]*Pool
36
	mincap         int
37
	maxcap         int
38
	timeout        int64
39
	connectTimeout int64
40
	closeCh        chan struct{}
41
	closeOnce      sync.Once
42
}
43

44
func NewConnectPool() (cp *ConnectPool) {
45
	cp = &ConnectPool{
46
		pools:          make(map[string]*Pool),
47
		mincap:         5,
48
		maxcap:         500,
49
		timeout:        int64(time.Second * ConnectIdleTime),
50
		connectTimeout: defaultConnectTimeout,
51
		closeCh:        make(chan struct{}),
52
	}
53
	go cp.autoRelease()
54

55
	return cp
56
}
57

58
func NewConnectPoolWithTimeout(idleConnTimeout time.Duration, connectTimeout int64) (cp *ConnectPool) {
59
	cp = &ConnectPool{
60
		pools:          make(map[string]*Pool),
61
		mincap:         5,
62
		maxcap:         80,
63
		timeout:        int64(idleConnTimeout * time.Second),
64
		connectTimeout: connectTimeout,
65
		closeCh:        make(chan struct{}),
66
	}
67
	go cp.autoRelease()
68

69
	return cp
70
}
71

72
func DailTimeOut(target string, timeout time.Duration) (c *net.TCPConn, err error) {
73
	var connect net.Conn
74
	connect, err = net.DialTimeout("tcp", target, timeout)
75
	if err == nil {
76
		conn := connect.(*net.TCPConn)
77
		conn.SetKeepAlive(true)
78
		conn.SetNoDelay(true)
79
		c = conn
80
	}
81
	return
82
}
83

84
func (cp *ConnectPool) GetConnect(targetAddr string) (c *net.TCPConn, err error) {
85
	cp.RLock()
86
	pool, ok := cp.pools[targetAddr]
87
	cp.RUnlock()
88
	if !ok {
89
		newPool := NewPool(cp.mincap, cp.maxcap, cp.timeout, cp.connectTimeout, targetAddr)
90
		cp.Lock()
91
		pool, ok = cp.pools[targetAddr]
92
		if !ok {
93
			// pool = NewPool(cp.mincap, cp.maxcap, cp.timeout, cp.connectTimeout, targetAddr)
94
			pool = newPool
95
			cp.pools[targetAddr] = pool
96
		}
97
		cp.Unlock()
98
	}
99

100
	return pool.GetConnectFromPool()
101
}
102

103
func (cp *ConnectPool) PutConnect(c *net.TCPConn, forceClose bool) {
104
	if c == nil {
105
		return
106
	}
107
	if forceClose {
108
		_ = c.Close()
109
		return
110
	}
111
	select {
112
	case <-cp.closeCh:
113
		_ = c.Close()
114
		return
115
	default:
116
	}
117
	addr := c.RemoteAddr().String()
118
	cp.RLock()
119
	pool, ok := cp.pools[addr]
120
	cp.RUnlock()
121
	if !ok {
122
		c.Close()
123
		return
124
	}
125
	object := &Object{conn: c, idle: time.Now().UnixNano()}
126
	pool.PutConnectObjectToPool(object)
127
}
128

129
func (cp *ConnectPool) autoRelease() {
130
	timer := time.NewTimer(time.Second)
131
	for {
132
		select {
133
		case <-cp.closeCh:
134
			timer.Stop()
135
			return
136
		case <-timer.C:
137
		}
138
		pools := make([]*Pool, 0)
139
		cp.RLock()
140
		for _, pool := range cp.pools {
141
			pools = append(pools, pool)
142
		}
143
		cp.RUnlock()
144
		for _, pool := range pools {
145
			pool.autoRelease()
146
		}
147
		timer.Reset(time.Second)
148
	}
149
}
150

151
func (cp *ConnectPool) releaseAll() {
152
	pools := make([]*Pool, 0)
153
	cp.RLock()
154
	for _, pool := range cp.pools {
155
		pools = append(pools, pool)
156
	}
157
	cp.RUnlock()
158
	for _, pool := range pools {
159
		pool.ReleaseAll()
160
	}
161
}
162

163
func (cp *ConnectPool) Close() {
164
	cp.closeOnce.Do(func() {
165
		close(cp.closeCh)
166
		cp.releaseAll()
167
	})
168
}
169

170
type Pool struct {
171
	objects        chan *Object
172
	mincap         int
173
	maxcap         int
174
	target         string
175
	timeout        int64
176
	connectTimeout int64
177
}
178

179
func NewPool(min, max int, timeout, connectTimeout int64, target string) (p *Pool) {
180
	p = new(Pool)
181
	p.mincap = min
182
	p.maxcap = max
183
	p.target = target
184
	p.objects = make(chan *Object, max)
185
	p.timeout = timeout
186
	p.connectTimeout = connectTimeout
187
	p.initAllConnect()
188
	return p
189
}
190

191
func (p *Pool) initAllConnect() {
192
	for i := 0; i < p.mincap; i++ {
193
		c, err := net.Dial("tcp", p.target)
194
		if err == nil {
195
			conn := c.(*net.TCPConn)
196
			conn.SetKeepAlive(true)
197
			conn.SetNoDelay(true)
198
			o := &Object{conn: conn, idle: time.Now().UnixNano()}
199
			p.PutConnectObjectToPool(o)
200
		}
201
	}
202
}
203

204
func (p *Pool) PutConnectObjectToPool(o *Object) {
205
	select {
206
	case p.objects <- o:
207
		return
208
	default:
209
		if o.conn != nil {
210
			o.conn.Close()
211
		}
212
		return
213
	}
214
}
215

216
func (p *Pool) autoRelease() {
217
	connectLen := len(p.objects)
218
	for i := 0; i < connectLen; i++ {
219
		select {
220
		case o := <-p.objects:
221
			if time.Now().UnixNano()-int64(o.idle) > p.timeout {
222
				o.conn.Close()
223
			} else {
224
				p.PutConnectObjectToPool(o)
225
			}
226
		default:
227
			return
228
		}
229
	}
230
}
231

232
func (p *Pool) ReleaseAll() {
233
	connectLen := len(p.objects)
234
	for i := 0; i < connectLen; i++ {
235
		select {
236
		case o := <-p.objects:
237
			o.conn.Close()
238
		default:
239
			return
240
		}
241
	}
242
}
243

244
func (p *Pool) NewConnect(target string) (c *net.TCPConn, err error) {
245
	var connect net.Conn
246
	connect, err = net.DialTimeout("tcp", p.target, time.Duration(p.connectTimeout)*time.Second)
247
	if err == nil {
248
		conn := connect.(*net.TCPConn)
249
		conn.SetKeepAlive(true)
250
		conn.SetNoDelay(true)
251
		c = conn
252
	}
253
	return
254
}
255

256
func (p *Pool) GetConnectFromPool() (c *net.TCPConn, err error) {
257
	var o *Object
258
	for {
259
		select {
260
		case o = <-p.objects:
261
		default:
262
			return p.NewConnect(p.target)
263
		}
264
		if time.Now().UnixNano()-int64(o.idle) > p.timeout {
265
			_ = o.conn.Close()
266
			o = nil
267
			continue
268
		}
269
		return o.conn, nil
270
	}
271
}
272

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.