podman

Форк
0
/x
/
client_conn_pool.go 
311 строк · 8.4 Кб
1
// Copyright 2015 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Transport code's client connection pooling.
6

7
package http2
8

9
import (
10
	"context"
11
	"crypto/tls"
12
	"errors"
13
	"net/http"
14
	"sync"
15
)
16

17
// ClientConnPool manages a pool of HTTP/2 client connections.
18
type ClientConnPool interface {
19
	// GetClientConn returns a specific HTTP/2 connection (usually
20
	// a TLS-TCP connection) to an HTTP/2 server. On success, the
21
	// returned ClientConn accounts for the upcoming RoundTrip
22
	// call, so the caller should not omit it. If the caller needs
23
	// to, ClientConn.RoundTrip can be called with a bogus
24
	// new(http.Request) to release the stream reservation.
25
	GetClientConn(req *http.Request, addr string) (*ClientConn, error)
26
	MarkDead(*ClientConn)
27
}
28

29
// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
30
// implementations which can close their idle connections.
31
type clientConnPoolIdleCloser interface {
32
	ClientConnPool
33
	closeIdleConnections()
34
}
35

36
var (
37
	_ clientConnPoolIdleCloser = (*clientConnPool)(nil)
38
	_ clientConnPoolIdleCloser = noDialClientConnPool{}
39
)
40

41
// TODO: use singleflight for dialing and addConnCalls?
42
type clientConnPool struct {
43
	t *Transport
44

45
	mu sync.Mutex // TODO: maybe switch to RWMutex
46
	// TODO: add support for sharing conns based on cert names
47
	// (e.g. share conn for googleapis.com and appspot.com)
48
	conns        map[string][]*ClientConn // key is host:port
49
	dialing      map[string]*dialCall     // currently in-flight dials
50
	keys         map[*ClientConn][]string
51
	addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
52
}
53

54
func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
55
	return p.getClientConn(req, addr, dialOnMiss)
56
}
57

58
const (
59
	dialOnMiss   = true
60
	noDialOnMiss = false
61
)
62

63
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
64
	// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
65
	if isConnectionCloseRequest(req) && dialOnMiss {
66
		// It gets its own connection.
67
		traceGetConn(req, addr)
68
		const singleUse = true
69
		cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
70
		if err != nil {
71
			return nil, err
72
		}
73
		return cc, nil
74
	}
75
	for {
76
		p.mu.Lock()
77
		for _, cc := range p.conns[addr] {
78
			if cc.ReserveNewRequest() {
79
				// When a connection is presented to us by the net/http package,
80
				// the GetConn hook has already been called.
81
				// Don't call it a second time here.
82
				if !cc.getConnCalled {
83
					traceGetConn(req, addr)
84
				}
85
				cc.getConnCalled = false
86
				p.mu.Unlock()
87
				return cc, nil
88
			}
89
		}
90
		if !dialOnMiss {
91
			p.mu.Unlock()
92
			return nil, ErrNoCachedConn
93
		}
94
		traceGetConn(req, addr)
95
		call := p.getStartDialLocked(req.Context(), addr)
96
		p.mu.Unlock()
97
		<-call.done
98
		if shouldRetryDial(call, req) {
99
			continue
100
		}
101
		cc, err := call.res, call.err
102
		if err != nil {
103
			return nil, err
104
		}
105
		if cc.ReserveNewRequest() {
106
			return cc, nil
107
		}
108
	}
109
}
110

111
// dialCall is an in-flight Transport dial call to a host.
112
type dialCall struct {
113
	_ incomparable
114
	p *clientConnPool
115
	// the context associated with the request
116
	// that created this dialCall
117
	ctx  context.Context
118
	done chan struct{} // closed when done
119
	res  *ClientConn   // valid after done is closed
120
	err  error         // valid after done is closed
121
}
122

123
// requires p.mu is held.
124
func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
125
	if call, ok := p.dialing[addr]; ok {
126
		// A dial is already in-flight. Don't start another.
127
		return call
128
	}
129
	call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
130
	if p.dialing == nil {
131
		p.dialing = make(map[string]*dialCall)
132
	}
133
	p.dialing[addr] = call
134
	go call.dial(call.ctx, addr)
135
	return call
136
}
137

138
// run in its own goroutine.
139
func (c *dialCall) dial(ctx context.Context, addr string) {
140
	const singleUse = false // shared conn
141
	c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
142

143
	c.p.mu.Lock()
144
	delete(c.p.dialing, addr)
145
	if c.err == nil {
146
		c.p.addConnLocked(addr, c.res)
147
	}
148
	c.p.mu.Unlock()
149

150
	close(c.done)
151
}
152

153
// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
154
// already exist. It coalesces concurrent calls with the same key.
155
// This is used by the http1 Transport code when it creates a new connection. Because
156
// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
157
// the protocol), it can get into a situation where it has multiple TLS connections.
158
// This code decides which ones live or die.
159
// The return value used is whether c was used.
160
// c is never closed.
161
func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
162
	p.mu.Lock()
163
	for _, cc := range p.conns[key] {
164
		if cc.CanTakeNewRequest() {
165
			p.mu.Unlock()
166
			return false, nil
167
		}
168
	}
169
	call, dup := p.addConnCalls[key]
170
	if !dup {
171
		if p.addConnCalls == nil {
172
			p.addConnCalls = make(map[string]*addConnCall)
173
		}
174
		call = &addConnCall{
175
			p:    p,
176
			done: make(chan struct{}),
177
		}
178
		p.addConnCalls[key] = call
179
		go call.run(t, key, c)
180
	}
181
	p.mu.Unlock()
182

183
	<-call.done
184
	if call.err != nil {
185
		return false, call.err
186
	}
187
	return !dup, nil
188
}
189

190
type addConnCall struct {
191
	_    incomparable
192
	p    *clientConnPool
193
	done chan struct{} // closed when done
194
	err  error
195
}
196

197
func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
198
	cc, err := t.NewClientConn(tc)
199

200
	p := c.p
201
	p.mu.Lock()
202
	if err != nil {
203
		c.err = err
204
	} else {
205
		cc.getConnCalled = true // already called by the net/http package
206
		p.addConnLocked(key, cc)
207
	}
208
	delete(p.addConnCalls, key)
209
	p.mu.Unlock()
210
	close(c.done)
211
}
212

213
// p.mu must be held
214
func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
215
	for _, v := range p.conns[key] {
216
		if v == cc {
217
			return
218
		}
219
	}
220
	if p.conns == nil {
221
		p.conns = make(map[string][]*ClientConn)
222
	}
223
	if p.keys == nil {
224
		p.keys = make(map[*ClientConn][]string)
225
	}
226
	p.conns[key] = append(p.conns[key], cc)
227
	p.keys[cc] = append(p.keys[cc], key)
228
}
229

230
func (p *clientConnPool) MarkDead(cc *ClientConn) {
231
	p.mu.Lock()
232
	defer p.mu.Unlock()
233
	for _, key := range p.keys[cc] {
234
		vv, ok := p.conns[key]
235
		if !ok {
236
			continue
237
		}
238
		newList := filterOutClientConn(vv, cc)
239
		if len(newList) > 0 {
240
			p.conns[key] = newList
241
		} else {
242
			delete(p.conns, key)
243
		}
244
	}
245
	delete(p.keys, cc)
246
}
247

248
func (p *clientConnPool) closeIdleConnections() {
249
	p.mu.Lock()
250
	defer p.mu.Unlock()
251
	// TODO: don't close a cc if it was just added to the pool
252
	// milliseconds ago and has never been used. There's currently
253
	// a small race window with the HTTP/1 Transport's integration
254
	// where it can add an idle conn just before using it, and
255
	// somebody else can concurrently call CloseIdleConns and
256
	// break some caller's RoundTrip.
257
	for _, vv := range p.conns {
258
		for _, cc := range vv {
259
			cc.closeIfIdle()
260
		}
261
	}
262
}
263

264
func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
265
	out := in[:0]
266
	for _, v := range in {
267
		if v != exclude {
268
			out = append(out, v)
269
		}
270
	}
271
	// If we filtered it out, zero out the last item to prevent
272
	// the GC from seeing it.
273
	if len(in) != len(out) {
274
		in[len(in)-1] = nil
275
	}
276
	return out
277
}
278

279
// noDialClientConnPool is an implementation of http2.ClientConnPool
280
// which never dials. We let the HTTP/1.1 client dial and use its TLS
281
// connection instead.
282
type noDialClientConnPool struct{ *clientConnPool }
283

284
func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
285
	return p.getClientConn(req, addr, noDialOnMiss)
286
}
287

288
// shouldRetryDial reports whether the current request should
289
// retry dialing after the call finished unsuccessfully, for example
290
// if the dial was canceled because of a context cancellation or
291
// deadline expiry.
292
func shouldRetryDial(call *dialCall, req *http.Request) bool {
293
	if call.err == nil {
294
		// No error, no need to retry
295
		return false
296
	}
297
	if call.ctx == req.Context() {
298
		// If the call has the same context as the request, the dial
299
		// should not be retried, since any cancellation will have come
300
		// from this request.
301
		return false
302
	}
303
	if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
304
		// If the call error is not because of a context cancellation or a deadline expiry,
305
		// the dial should not be retried.
306
		return false
307
	}
308
	// Only retry if the error is a context cancellation error or deadline expiry
309
	// and the context associated with the call was canceled or expired.
310
	return call.ctx.Err() != nil
311
}
312

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.