cubefs
1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Transport code's client connection pooling.
6
7package http28
9import (10"context"11"crypto/tls"12"errors"13"net/http"14"sync"15)
16
17// ClientConnPool manages a pool of HTTP/2 client connections.
18type ClientConnPool interface {19// GetClientConn returns a specific HTTP/2 connection (usually20// a TLS-TCP connection) to an HTTP/2 server. On success, the21// returned ClientConn accounts for the upcoming RoundTrip22// call, so the caller should not omit it. If the caller needs23// to, ClientConn.RoundTrip can be called with a bogus24// new(http.Request) to release the stream reservation.25GetClientConn(req *http.Request, addr string) (*ClientConn, error)26MarkDead(*ClientConn)27}
28
29// clientConnPoolIdleCloser is the interface implemented by ClientConnPool
30// implementations which can close their idle connections.
31type clientConnPoolIdleCloser interface {32ClientConnPool
33closeIdleConnections()34}
35
36var (37_ clientConnPoolIdleCloser = (*clientConnPool)(nil)38_ clientConnPoolIdleCloser = noDialClientConnPool{}39)
40
41// TODO: use singleflight for dialing and addConnCalls?
42type clientConnPool struct {43t *Transport44
45mu sync.Mutex // TODO: maybe switch to RWMutex46// TODO: add support for sharing conns based on cert names47// (e.g. share conn for googleapis.com and appspot.com)48conns map[string][]*ClientConn // key is host:port49dialing map[string]*dialCall // currently in-flight dials50keys map[*ClientConn][]string51addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls52}
53
54func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {55return p.getClientConn(req, addr, dialOnMiss)56}
57
58const (59dialOnMiss = true60noDialOnMiss = false61)
62
63func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {64// TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?65if isConnectionCloseRequest(req) && dialOnMiss {66// It gets its own connection.67traceGetConn(req, addr)68const singleUse = true69cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)70if err != nil {71return nil, err72}73return cc, nil74}75for {76p.mu.Lock()77for _, cc := range p.conns[addr] {78if cc.ReserveNewRequest() {79// When a connection is presented to us by the net/http package,80// the GetConn hook has already been called.81// Don't call it a second time here.82if !cc.getConnCalled {83traceGetConn(req, addr)84}85cc.getConnCalled = false86p.mu.Unlock()87return cc, nil88}89}90if !dialOnMiss {91p.mu.Unlock()92return nil, ErrNoCachedConn93}94traceGetConn(req, addr)95call := p.getStartDialLocked(req.Context(), addr)96p.mu.Unlock()97<-call.done98if shouldRetryDial(call, req) {99continue100}101cc, err := call.res, call.err102if err != nil {103return nil, err104}105if cc.ReserveNewRequest() {106return cc, nil107}108}109}
110
111// dialCall is an in-flight Transport dial call to a host.
112type dialCall struct {113_ incomparable114p *clientConnPool115// the context associated with the request116// that created this dialCall117ctx context.Context118done chan struct{} // closed when done119res *ClientConn // valid after done is closed120err error // valid after done is closed121}
122
123// requires p.mu is held.
124func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {125if call, ok := p.dialing[addr]; ok {126// A dial is already in-flight. Don't start another.127return call128}129call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}130if p.dialing == nil {131p.dialing = make(map[string]*dialCall)132}133p.dialing[addr] = call134go call.dial(call.ctx, addr)135return call136}
137
138// run in its own goroutine.
139func (c *dialCall) dial(ctx context.Context, addr string) {140const singleUse = false // shared conn141c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)142
143c.p.mu.Lock()144delete(c.p.dialing, addr)145if c.err == nil {146c.p.addConnLocked(addr, c.res)147}148c.p.mu.Unlock()149
150close(c.done)151}
152
153// addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
154// already exist. It coalesces concurrent calls with the same key.
155// This is used by the http1 Transport code when it creates a new connection. Because
156// the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
157// the protocol), it can get into a situation where it has multiple TLS connections.
158// This code decides which ones live or die.
159// The return value used is whether c was used.
160// c is never closed.
161func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {162p.mu.Lock()163for _, cc := range p.conns[key] {164if cc.CanTakeNewRequest() {165p.mu.Unlock()166return false, nil167}168}169call, dup := p.addConnCalls[key]170if !dup {171if p.addConnCalls == nil {172p.addConnCalls = make(map[string]*addConnCall)173}174call = &addConnCall{175p: p,176done: make(chan struct{}),177}178p.addConnCalls[key] = call179go call.run(t, key, c)180}181p.mu.Unlock()182
183<-call.done184if call.err != nil {185return false, call.err186}187return !dup, nil188}
189
190type addConnCall struct {191_ incomparable192p *clientConnPool193done chan struct{} // closed when done194err error195}
196
197func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {198cc, err := t.NewClientConn(tc)199
200p := c.p201p.mu.Lock()202if err != nil {203c.err = err204} else {205cc.getConnCalled = true // already called by the net/http package206p.addConnLocked(key, cc)207}208delete(p.addConnCalls, key)209p.mu.Unlock()210close(c.done)211}
212
213// p.mu must be held
214func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {215for _, v := range p.conns[key] {216if v == cc {217return218}219}220if p.conns == nil {221p.conns = make(map[string][]*ClientConn)222}223if p.keys == nil {224p.keys = make(map[*ClientConn][]string)225}226p.conns[key] = append(p.conns[key], cc)227p.keys[cc] = append(p.keys[cc], key)228}
229
230func (p *clientConnPool) MarkDead(cc *ClientConn) {231p.mu.Lock()232defer p.mu.Unlock()233for _, key := range p.keys[cc] {234vv, ok := p.conns[key]235if !ok {236continue237}238newList := filterOutClientConn(vv, cc)239if len(newList) > 0 {240p.conns[key] = newList241} else {242delete(p.conns, key)243}244}245delete(p.keys, cc)246}
247
248func (p *clientConnPool) closeIdleConnections() {249p.mu.Lock()250defer p.mu.Unlock()251// TODO: don't close a cc if it was just added to the pool252// milliseconds ago and has never been used. There's currently253// a small race window with the HTTP/1 Transport's integration254// where it can add an idle conn just before using it, and255// somebody else can concurrently call CloseIdleConns and256// break some caller's RoundTrip.257for _, vv := range p.conns {258for _, cc := range vv {259cc.closeIfIdle()260}261}262}
263
264func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {265out := in[:0]266for _, v := range in {267if v != exclude {268out = append(out, v)269}270}271// If we filtered it out, zero out the last item to prevent272// the GC from seeing it.273if len(in) != len(out) {274in[len(in)-1] = nil275}276return out277}
278
279// noDialClientConnPool is an implementation of http2.ClientConnPool
280// which never dials. We let the HTTP/1.1 client dial and use its TLS
281// connection instead.
282type noDialClientConnPool struct{ *clientConnPool }283
284func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {285return p.getClientConn(req, addr, noDialOnMiss)286}
287
288// shouldRetryDial reports whether the current request should
289// retry dialing after the call finished unsuccessfully, for example
290// if the dial was canceled because of a context cancellation or
291// deadline expiry.
292func shouldRetryDial(call *dialCall, req *http.Request) bool {293if call.err == nil {294// No error, no need to retry295return false296}297if call.ctx == req.Context() {298// If the call has the same context as the request, the dial299// should not be retried, since any cancellation will have come300// from this request.301return false302}303if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {304// If the call error is not because of a context cancellation or a deadline expiry,305// the dial should not be retried.306return false307}308// Only retry if the error is a context cancellation error or deadline expiry309// and the context associated with the call was canceled or expired.310return call.ctx.Err() != nil311}
312