cubefs

Форк
0
3152 строки · 87.0 Кб
1
// Copyright 2015 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Transport code.
6

7
package http2
8

9
import (
10
	"bufio"
11
	"bytes"
12
	"compress/gzip"
13
	"context"
14
	"crypto/rand"
15
	"crypto/tls"
16
	"errors"
17
	"fmt"
18
	"io"
19
	"io/fs"
20
	"log"
21
	"math"
22
	mathrand "math/rand"
23
	"net"
24
	"net/http"
25
	"net/http/httptrace"
26
	"net/textproto"
27
	"os"
28
	"sort"
29
	"strconv"
30
	"strings"
31
	"sync"
32
	"sync/atomic"
33
	"time"
34

35
	"golang.org/x/net/http/httpguts"
36
	"golang.org/x/net/http2/hpack"
37
	"golang.org/x/net/idna"
38
)
39

40
const (
41
	// transportDefaultConnFlow is how many connection-level flow control
42
	// tokens we give the server at start-up, past the default 64k.
43
	transportDefaultConnFlow = 1 << 30
44

45
	// transportDefaultStreamFlow is how many stream-level flow
46
	// control tokens we announce to the peer, and how many bytes
47
	// we buffer per stream.
48
	transportDefaultStreamFlow = 4 << 20
49

50
	defaultUserAgent = "Go-http-client/2.0"
51

52
	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
53
	// it's received servers initial SETTINGS frame, which corresponds with the
54
	// spec's minimum recommended value.
55
	initialMaxConcurrentStreams = 100
56

57
	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
58
	// if the server doesn't include one in its initial SETTINGS frame.
59
	defaultMaxConcurrentStreams = 1000
60
)
61

62
// Transport is an HTTP/2 Transport.
63
//
64
// A Transport internally caches connections to servers. It is safe
65
// for concurrent use by multiple goroutines.
66
type Transport struct {
67
	// DialTLSContext specifies an optional dial function with context for
68
	// creating TLS connections for requests.
69
	//
70
	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
71
	//
72
	// If the returned net.Conn has a ConnectionState method like tls.Conn,
73
	// it will be used to set http.Response.TLS.
74
	DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
75

76
	// DialTLS specifies an optional dial function for creating
77
	// TLS connections for requests.
78
	//
79
	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
80
	//
81
	// Deprecated: Use DialTLSContext instead, which allows the transport
82
	// to cancel dials as soon as they are no longer needed.
83
	// If both are set, DialTLSContext takes priority.
84
	DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
85

86
	// TLSClientConfig specifies the TLS configuration to use with
87
	// tls.Client. If nil, the default configuration is used.
88
	TLSClientConfig *tls.Config
89

90
	// ConnPool optionally specifies an alternate connection pool to use.
91
	// If nil, the default is used.
92
	ConnPool ClientConnPool
93

94
	// DisableCompression, if true, prevents the Transport from
95
	// requesting compression with an "Accept-Encoding: gzip"
96
	// request header when the Request contains no existing
97
	// Accept-Encoding value. If the Transport requests gzip on
98
	// its own and gets a gzipped response, it's transparently
99
	// decoded in the Response.Body. However, if the user
100
	// explicitly requested gzip it is not automatically
101
	// uncompressed.
102
	DisableCompression bool
103

104
	// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
105
	// plain-text "http" scheme. Note that this does not enable h2c support.
106
	AllowHTTP bool
107

108
	// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
109
	// send in the initial settings frame. It is how many bytes
110
	// of response headers are allowed. Unlike the http2 spec, zero here
111
	// means to use a default limit (currently 10MB). If you actually
112
	// want to advertise an unlimited value to the peer, Transport
113
	// interprets the highest possible value here (0xffffffff or 1<<32-1)
114
	// to mean no limit.
115
	MaxHeaderListSize uint32
116

117
	// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
118
	// initial settings frame. It is the size in bytes of the largest frame
119
	// payload that the sender is willing to receive. If 0, no setting is
120
	// sent, and the value is provided by the peer, which should be 16384
121
	// according to the spec:
122
	// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
123
	// Values are bounded in the range 16k to 16M.
124
	MaxReadFrameSize uint32
125

126
	// MaxDecoderHeaderTableSize optionally specifies the http2
127
	// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
128
	// informs the remote endpoint of the maximum size of the header compression
129
	// table used to decode header blocks, in octets. If zero, the default value
130
	// of 4096 is used.
131
	MaxDecoderHeaderTableSize uint32
132

133
	// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
134
	// header compression table used for encoding request headers. Received
135
	// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
136
	// the default value of 4096 is used.
137
	MaxEncoderHeaderTableSize uint32
138

139
	// StrictMaxConcurrentStreams controls whether the server's
140
	// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
141
	// globally. If false, new TCP connections are created to the
142
	// server as needed to keep each under the per-connection
143
	// SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
144
	// server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
145
	// a global limit and callers of RoundTrip block when needed,
146
	// waiting for their turn.
147
	StrictMaxConcurrentStreams bool
148

149
	// ReadIdleTimeout is the timeout after which a health check using ping
150
	// frame will be carried out if no frame is received on the connection.
151
	// Note that a ping response will is considered a received frame, so if
152
	// there is no other traffic on the connection, the health check will
153
	// be performed every ReadIdleTimeout interval.
154
	// If zero, no health check is performed.
155
	ReadIdleTimeout time.Duration
156

157
	// PingTimeout is the timeout after which the connection will be closed
158
	// if a response to Ping is not received.
159
	// Defaults to 15s.
160
	PingTimeout time.Duration
161

162
	// WriteByteTimeout is the timeout after which the connection will be
163
	// closed no data can be written to it. The timeout begins when data is
164
	// available to write, and is extended whenever any bytes are written.
165
	WriteByteTimeout time.Duration
166

167
	// CountError, if non-nil, is called on HTTP/2 transport errors.
168
	// It's intended to increment a metric for monitoring, such
169
	// as an expvar or Prometheus metric.
170
	// The errType consists of only ASCII word characters.
171
	CountError func(errType string)
172

173
	// t1, if non-nil, is the standard library Transport using
174
	// this transport. Its settings are used (but not its
175
	// RoundTrip method, etc).
176
	t1 *http.Transport
177

178
	connPoolOnce  sync.Once
179
	connPoolOrDef ClientConnPool // non-nil version of ConnPool
180
}
181

182
func (t *Transport) maxHeaderListSize() uint32 {
183
	if t.MaxHeaderListSize == 0 {
184
		return 10 << 20
185
	}
186
	if t.MaxHeaderListSize == 0xffffffff {
187
		return 0
188
	}
189
	return t.MaxHeaderListSize
190
}
191

192
func (t *Transport) maxFrameReadSize() uint32 {
193
	if t.MaxReadFrameSize == 0 {
194
		return 0 // use the default provided by the peer
195
	}
196
	if t.MaxReadFrameSize < minMaxFrameSize {
197
		return minMaxFrameSize
198
	}
199
	if t.MaxReadFrameSize > maxFrameSize {
200
		return maxFrameSize
201
	}
202
	return t.MaxReadFrameSize
203
}
204

205
func (t *Transport) disableCompression() bool {
206
	return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
207
}
208

209
func (t *Transport) pingTimeout() time.Duration {
210
	if t.PingTimeout == 0 {
211
		return 15 * time.Second
212
	}
213
	return t.PingTimeout
214

215
}
216

217
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
218
// It returns an error if t1 has already been HTTP/2-enabled.
219
//
220
// Use ConfigureTransports instead to configure the HTTP/2 Transport.
221
func ConfigureTransport(t1 *http.Transport) error {
222
	_, err := ConfigureTransports(t1)
223
	return err
224
}
225

226
// ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
227
// It returns a new HTTP/2 Transport for further configuration.
228
// It returns an error if t1 has already been HTTP/2-enabled.
229
func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
230
	return configureTransports(t1)
231
}
232

233
func configureTransports(t1 *http.Transport) (*Transport, error) {
234
	connPool := new(clientConnPool)
235
	t2 := &Transport{
236
		ConnPool: noDialClientConnPool{connPool},
237
		t1:       t1,
238
	}
239
	connPool.t = t2
240
	if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
241
		return nil, err
242
	}
243
	if t1.TLSClientConfig == nil {
244
		t1.TLSClientConfig = new(tls.Config)
245
	}
246
	if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
247
		t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
248
	}
249
	if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
250
		t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
251
	}
252
	upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper {
253
		addr := authorityAddr("https", authority)
254
		if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
255
			go c.Close()
256
			return erringRoundTripper{err}
257
		} else if !used {
258
			// Turns out we don't need this c.
259
			// For example, two goroutines made requests to the same host
260
			// at the same time, both kicking off TCP dials. (since protocol
261
			// was unknown)
262
			go c.Close()
263
		}
264
		return t2
265
	}
266
	if m := t1.TLSNextProto; len(m) == 0 {
267
		t1.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
268
			"h2": upgradeFn,
269
		}
270
	} else {
271
		m["h2"] = upgradeFn
272
	}
273
	return t2, nil
274
}
275

276
func (t *Transport) connPool() ClientConnPool {
277
	t.connPoolOnce.Do(t.initConnPool)
278
	return t.connPoolOrDef
279
}
280

281
func (t *Transport) initConnPool() {
282
	if t.ConnPool != nil {
283
		t.connPoolOrDef = t.ConnPool
284
	} else {
285
		t.connPoolOrDef = &clientConnPool{t: t}
286
	}
287
}
288

289
// ClientConn is the state of a single HTTP/2 client connection to an
290
// HTTP/2 server.
291
type ClientConn struct {
292
	t             *Transport
293
	tconn         net.Conn // usually *tls.Conn, except specialized impls
294
	tconnClosed   bool
295
	tlsState      *tls.ConnectionState // nil only for specialized impls
296
	reused        uint32               // whether conn is being reused; atomic
297
	singleUse     bool                 // whether being used for a single http.Request
298
	getConnCalled bool                 // used by clientConnPool
299

300
	// readLoop goroutine fields:
301
	readerDone chan struct{} // closed on error
302
	readerErr  error         // set before readerDone is closed
303

304
	idleTimeout time.Duration // or 0 for never
305
	idleTimer   *time.Timer
306

307
	mu              sync.Mutex // guards following
308
	cond            *sync.Cond // hold mu; broadcast on flow/closed changes
309
	flow            outflow    // our conn-level flow control quota (cs.outflow is per stream)
310
	inflow          inflow     // peer's conn-level flow control
311
	doNotReuse      bool       // whether conn is marked to not be reused for any future requests
312
	closing         bool
313
	closed          bool
314
	seenSettings    bool                     // true if we've seen a settings frame, false otherwise
315
	wantSettingsAck bool                     // we sent a SETTINGS frame and haven't heard back
316
	goAway          *GoAwayFrame             // if non-nil, the GoAwayFrame we received
317
	goAwayDebug     string                   // goAway frame's debug data, retained as a string
318
	streams         map[uint32]*clientStream // client-initiated
319
	streamsReserved int                      // incr by ReserveNewRequest; decr on RoundTrip
320
	nextStreamID    uint32
321
	pendingRequests int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
322
	pings           map[[8]byte]chan struct{} // in flight ping data to notification channel
323
	br              *bufio.Reader
324
	lastActive      time.Time
325
	lastIdle        time.Time // time last idle
326
	// Settings from peer: (also guarded by wmu)
327
	maxFrameSize           uint32
328
	maxConcurrentStreams   uint32
329
	peerMaxHeaderListSize  uint64
330
	peerMaxHeaderTableSize uint32
331
	initialWindowSize      uint32
332

333
	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
334
	// Write to reqHeaderMu to lock it, read from it to unlock.
335
	// Lock reqmu BEFORE mu or wmu.
336
	reqHeaderMu chan struct{}
337

338
	// wmu is held while writing.
339
	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
340
	// Only acquire both at the same time when changing peer settings.
341
	wmu  sync.Mutex
342
	bw   *bufio.Writer
343
	fr   *Framer
344
	werr error        // first write error that has occurred
345
	hbuf bytes.Buffer // HPACK encoder writes into this
346
	henc *hpack.Encoder
347
}
348

349
// clientStream is the state for a single HTTP/2 stream. One of these
350
// is created for each Transport.RoundTrip call.
351
type clientStream struct {
352
	cc *ClientConn
353

354
	// Fields of Request that we may access even after the response body is closed.
355
	ctx       context.Context
356
	reqCancel <-chan struct{}
357

358
	trace         *httptrace.ClientTrace // or nil
359
	ID            uint32
360
	bufPipe       pipe // buffered pipe with the flow-controlled response payload
361
	requestedGzip bool
362
	isHead        bool
363

364
	abortOnce sync.Once
365
	abort     chan struct{} // closed to signal stream should end immediately
366
	abortErr  error         // set if abort is closed
367

368
	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
369
	donec      chan struct{} // closed after the stream is in the closed state
370
	on100      chan struct{} // buffered; written to if a 100 is received
371

372
	respHeaderRecv chan struct{}  // closed when headers are received
373
	res            *http.Response // set if respHeaderRecv is closed
374

375
	flow        outflow // guarded by cc.mu
376
	inflow      inflow  // guarded by cc.mu
377
	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
378
	readErr     error   // sticky read error; owned by transportResponseBody.Read
379

380
	reqBody              io.ReadCloser
381
	reqBodyContentLength int64         // -1 means unknown
382
	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
383

384
	// owned by writeRequest:
385
	sentEndStream bool // sent an END_STREAM flag to the peer
386
	sentHeaders   bool
387

388
	// owned by clientConnReadLoop:
389
	firstByte    bool  // got the first response byte
390
	pastHeaders  bool  // got first MetaHeadersFrame (actual headers)
391
	pastTrailers bool  // got optional second MetaHeadersFrame (trailers)
392
	num1xx       uint8 // number of 1xx responses seen
393
	readClosed   bool  // peer sent an END_STREAM flag
394
	readAborted  bool  // read loop reset the stream
395

396
	trailer    http.Header  // accumulated trailers
397
	resTrailer *http.Header // client's Response.Trailer
398
}
399

400
var got1xxFuncForTests func(int, textproto.MIMEHeader) error
401

402
// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
403
// if any. It returns nil if not set or if the Go version is too old.
404
func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
405
	if fn := got1xxFuncForTests; fn != nil {
406
		return fn
407
	}
408
	return traceGot1xxResponseFunc(cs.trace)
409
}
410

411
func (cs *clientStream) abortStream(err error) {
412
	cs.cc.mu.Lock()
413
	defer cs.cc.mu.Unlock()
414
	cs.abortStreamLocked(err)
415
}
416

417
func (cs *clientStream) abortStreamLocked(err error) {
418
	cs.abortOnce.Do(func() {
419
		cs.abortErr = err
420
		close(cs.abort)
421
	})
422
	if cs.reqBody != nil {
423
		cs.closeReqBodyLocked()
424
	}
425
	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
426
	if cs.cc.cond != nil {
427
		// Wake up writeRequestBody if it is waiting on flow control.
428
		cs.cc.cond.Broadcast()
429
	}
430
}
431

432
func (cs *clientStream) abortRequestBodyWrite() {
433
	cc := cs.cc
434
	cc.mu.Lock()
435
	defer cc.mu.Unlock()
436
	if cs.reqBody != nil && cs.reqBodyClosed == nil {
437
		cs.closeReqBodyLocked()
438
		cc.cond.Broadcast()
439
	}
440
}
441

442
func (cs *clientStream) closeReqBodyLocked() {
443
	if cs.reqBodyClosed != nil {
444
		return
445
	}
446
	cs.reqBodyClosed = make(chan struct{})
447
	reqBodyClosed := cs.reqBodyClosed
448
	go func() {
449
		cs.reqBody.Close()
450
		close(reqBodyClosed)
451
	}()
452
}
453

454
type stickyErrWriter struct {
455
	conn    net.Conn
456
	timeout time.Duration
457
	err     *error
458
}
459

460
func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
461
	if *sew.err != nil {
462
		return 0, *sew.err
463
	}
464
	for {
465
		if sew.timeout != 0 {
466
			sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
467
		}
468
		nn, err := sew.conn.Write(p[n:])
469
		n += nn
470
		if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
471
			// Keep extending the deadline so long as we're making progress.
472
			continue
473
		}
474
		if sew.timeout != 0 {
475
			sew.conn.SetWriteDeadline(time.Time{})
476
		}
477
		*sew.err = err
478
		return n, err
479
	}
480
}
481

482
// noCachedConnError is the concrete type of ErrNoCachedConn, which
483
// needs to be detected by net/http regardless of whether it's its
484
// bundled version (in h2_bundle.go with a rewritten type name) or
485
// from a user's x/net/http2. As such, as it has a unique method name
486
// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
487
// isNoCachedConnError.
488
type noCachedConnError struct{}
489

490
func (noCachedConnError) IsHTTP2NoCachedConnError() {}
491
func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
492

493
// isNoCachedConnError reports whether err is of type noCachedConnError
494
// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
495
// may coexist in the same running program.
496
func isNoCachedConnError(err error) bool {
497
	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
498
	return ok
499
}
500

501
var ErrNoCachedConn error = noCachedConnError{}
502

503
// RoundTripOpt are options for the Transport.RoundTripOpt method.
504
type RoundTripOpt struct {
505
	// OnlyCachedConn controls whether RoundTripOpt may
506
	// create a new TCP connection. If set true and
507
	// no cached connection is available, RoundTripOpt
508
	// will return ErrNoCachedConn.
509
	OnlyCachedConn bool
510
}
511

512
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
513
	return t.RoundTripOpt(req, RoundTripOpt{})
514
}
515

516
// authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
517
// and returns a host:port. The port 443 is added if needed.
518
func authorityAddr(scheme string, authority string) (addr string) {
519
	host, port, err := net.SplitHostPort(authority)
520
	if err != nil { // authority didn't have a port
521
		port = "443"
522
		if scheme == "http" {
523
			port = "80"
524
		}
525
		host = authority
526
	}
527
	if a, err := idna.ToASCII(host); err == nil {
528
		host = a
529
	}
530
	// IPv6 address literal, without a port:
531
	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
532
		return host + ":" + port
533
	}
534
	return net.JoinHostPort(host, port)
535
}
536

537
var retryBackoffHook func(time.Duration) *time.Timer
538

539
func backoffNewTimer(d time.Duration) *time.Timer {
540
	if retryBackoffHook != nil {
541
		return retryBackoffHook(d)
542
	}
543
	return time.NewTimer(d)
544
}
545

546
// RoundTripOpt is like RoundTrip, but takes options.
547
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
548
	if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
549
		return nil, errors.New("http2: unsupported scheme")
550
	}
551

552
	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
553
	for retry := 0; ; retry++ {
554
		cc, err := t.connPool().GetClientConn(req, addr)
555
		if err != nil {
556
			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
557
			return nil, err
558
		}
559
		reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
560
		traceGotConn(req, cc, reused)
561
		res, err := cc.RoundTrip(req)
562
		if err != nil && retry <= 6 {
563
			if req, err = shouldRetryRequest(req, err); err == nil {
564
				// After the first retry, do exponential backoff with 10% jitter.
565
				if retry == 0 {
566
					t.vlogf("RoundTrip retrying after failure: %v", err)
567
					continue
568
				}
569
				backoff := float64(uint(1) << (uint(retry) - 1))
570
				backoff += backoff * (0.1 * mathrand.Float64())
571
				d := time.Second * time.Duration(backoff)
572
				timer := backoffNewTimer(d)
573
				select {
574
				case <-timer.C:
575
					t.vlogf("RoundTrip retrying after failure: %v", err)
576
					continue
577
				case <-req.Context().Done():
578
					timer.Stop()
579
					err = req.Context().Err()
580
				}
581
			}
582
		}
583
		if err != nil {
584
			t.vlogf("RoundTrip failure: %v", err)
585
			return nil, err
586
		}
587
		return res, nil
588
	}
589
}
590

591
// CloseIdleConnections closes any connections which were previously
592
// connected from previous requests but are now sitting idle.
593
// It does not interrupt any connections currently in use.
594
func (t *Transport) CloseIdleConnections() {
595
	if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
596
		cp.closeIdleConnections()
597
	}
598
}
599

600
var (
601
	errClientConnClosed    = errors.New("http2: client conn is closed")
602
	errClientConnUnusable  = errors.New("http2: client conn not usable")
603
	errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
604
)
605

606
// shouldRetryRequest is called by RoundTrip when a request fails to get
607
// response headers. It is always called with a non-nil error.
608
// It returns either a request to retry (either the same request, or a
609
// modified clone), or an error if the request can't be replayed.
610
func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
611
	if !canRetryError(err) {
612
		return nil, err
613
	}
614
	// If the Body is nil (or http.NoBody), it's safe to reuse
615
	// this request and its Body.
616
	if req.Body == nil || req.Body == http.NoBody {
617
		return req, nil
618
	}
619

620
	// If the request body can be reset back to its original
621
	// state via the optional req.GetBody, do that.
622
	if req.GetBody != nil {
623
		body, err := req.GetBody()
624
		if err != nil {
625
			return nil, err
626
		}
627
		newReq := *req
628
		newReq.Body = body
629
		return &newReq, nil
630
	}
631

632
	// The Request.Body can't reset back to the beginning, but we
633
	// don't seem to have started to read from it yet, so reuse
634
	// the request directly.
635
	if err == errClientConnUnusable {
636
		return req, nil
637
	}
638

639
	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
640
}
641

642
func canRetryError(err error) bool {
643
	if err == errClientConnUnusable || err == errClientConnGotGoAway {
644
		return true
645
	}
646
	if se, ok := err.(StreamError); ok {
647
		if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
648
			// See golang/go#47635, golang/go#42777
649
			return true
650
		}
651
		return se.Code == ErrCodeRefusedStream
652
	}
653
	return false
654
}
655

656
func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
657
	host, _, err := net.SplitHostPort(addr)
658
	if err != nil {
659
		return nil, err
660
	}
661
	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
662
	if err != nil {
663
		return nil, err
664
	}
665
	return t.newClientConn(tconn, singleUse)
666
}
667

668
func (t *Transport) newTLSConfig(host string) *tls.Config {
669
	cfg := new(tls.Config)
670
	if t.TLSClientConfig != nil {
671
		*cfg = *t.TLSClientConfig.Clone()
672
	}
673
	if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
674
		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
675
	}
676
	if cfg.ServerName == "" {
677
		cfg.ServerName = host
678
	}
679
	return cfg
680
}
681

682
func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
683
	if t.DialTLSContext != nil {
684
		return t.DialTLSContext(ctx, network, addr, tlsCfg)
685
	} else if t.DialTLS != nil {
686
		return t.DialTLS(network, addr, tlsCfg)
687
	}
688

689
	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
690
	if err != nil {
691
		return nil, err
692
	}
693
	state := tlsCn.ConnectionState()
694
	if p := state.NegotiatedProtocol; p != NextProtoTLS {
695
		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
696
	}
697
	if !state.NegotiatedProtocolIsMutual {
698
		return nil, errors.New("http2: could not negotiate protocol mutually")
699
	}
700
	return tlsCn, nil
701
}
702

703
// disableKeepAlives reports whether connections should be closed as
704
// soon as possible after handling the first request.
705
func (t *Transport) disableKeepAlives() bool {
706
	return t.t1 != nil && t.t1.DisableKeepAlives
707
}
708

709
func (t *Transport) expectContinueTimeout() time.Duration {
710
	if t.t1 == nil {
711
		return 0
712
	}
713
	return t.t1.ExpectContinueTimeout
714
}
715

716
func (t *Transport) maxDecoderHeaderTableSize() uint32 {
717
	if v := t.MaxDecoderHeaderTableSize; v > 0 {
718
		return v
719
	}
720
	return initialHeaderTableSize
721
}
722

723
func (t *Transport) maxEncoderHeaderTableSize() uint32 {
724
	if v := t.MaxEncoderHeaderTableSize; v > 0 {
725
		return v
726
	}
727
	return initialHeaderTableSize
728
}
729

730
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
731
	return t.newClientConn(c, t.disableKeepAlives())
732
}
733

734
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
735
	cc := &ClientConn{
736
		t:                     t,
737
		tconn:                 c,
738
		readerDone:            make(chan struct{}),
739
		nextStreamID:          1,
740
		maxFrameSize:          16 << 10,                    // spec default
741
		initialWindowSize:     65535,                       // spec default
742
		maxConcurrentStreams:  initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
743
		peerMaxHeaderListSize: 0xffffffffffffffff,          // "infinite", per spec. Use 2^64-1 instead.
744
		streams:               make(map[uint32]*clientStream),
745
		singleUse:             singleUse,
746
		wantSettingsAck:       true,
747
		pings:                 make(map[[8]byte]chan struct{}),
748
		reqHeaderMu:           make(chan struct{}, 1),
749
	}
750
	if d := t.idleConnTimeout(); d != 0 {
751
		cc.idleTimeout = d
752
		cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
753
	}
754
	if VerboseLogs {
755
		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
756
	}
757

758
	cc.cond = sync.NewCond(&cc.mu)
759
	cc.flow.add(int32(initialWindowSize))
760

761
	// TODO: adjust this writer size to account for frame size +
762
	// MTU + crypto/tls record padding.
763
	cc.bw = bufio.NewWriter(stickyErrWriter{
764
		conn:    c,
765
		timeout: t.WriteByteTimeout,
766
		err:     &cc.werr,
767
	})
768
	cc.br = bufio.NewReader(c)
769
	cc.fr = NewFramer(cc.bw, cc.br)
770
	if t.maxFrameReadSize() != 0 {
771
		cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize())
772
	}
773
	if t.CountError != nil {
774
		cc.fr.countError = t.CountError
775
	}
776
	maxHeaderTableSize := t.maxDecoderHeaderTableSize()
777
	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
778
	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
779

780
	cc.henc = hpack.NewEncoder(&cc.hbuf)
781
	cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize())
782
	cc.peerMaxHeaderTableSize = initialHeaderTableSize
783

784
	if t.AllowHTTP {
785
		cc.nextStreamID = 3
786
	}
787

788
	if cs, ok := c.(connectionStater); ok {
789
		state := cs.ConnectionState()
790
		cc.tlsState = &state
791
	}
792

793
	initialSettings := []Setting{
794
		{ID: SettingEnablePush, Val: 0},
795
		{ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
796
	}
797
	if max := t.maxFrameReadSize(); max != 0 {
798
		initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: max})
799
	}
800
	if max := t.maxHeaderListSize(); max != 0 {
801
		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
802
	}
803
	if maxHeaderTableSize != initialHeaderTableSize {
804
		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
805
	}
806

807
	cc.bw.Write(clientPreface)
808
	cc.fr.WriteSettings(initialSettings...)
809
	cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
810
	cc.inflow.init(transportDefaultConnFlow + initialWindowSize)
811
	cc.bw.Flush()
812
	if cc.werr != nil {
813
		cc.Close()
814
		return nil, cc.werr
815
	}
816

817
	go cc.readLoop()
818
	return cc, nil
819
}
820

821
func (cc *ClientConn) healthCheck() {
822
	pingTimeout := cc.t.pingTimeout()
823
	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
824
	// trigger the healthCheck again if there is no frame received.
825
	ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
826
	defer cancel()
827
	cc.vlogf("http2: Transport sending health check")
828
	err := cc.Ping(ctx)
829
	if err != nil {
830
		cc.vlogf("http2: Transport health check failure: %v", err)
831
		cc.closeForLostPing()
832
	} else {
833
		cc.vlogf("http2: Transport health check success")
834
	}
835
}
836

837
// SetDoNotReuse marks cc as not reusable for future HTTP requests.
838
func (cc *ClientConn) SetDoNotReuse() {
839
	cc.mu.Lock()
840
	defer cc.mu.Unlock()
841
	cc.doNotReuse = true
842
}
843

844
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
845
	cc.mu.Lock()
846
	defer cc.mu.Unlock()
847

848
	old := cc.goAway
849
	cc.goAway = f
850

851
	// Merge the previous and current GoAway error frames.
852
	if cc.goAwayDebug == "" {
853
		cc.goAwayDebug = string(f.DebugData())
854
	}
855
	if old != nil && old.ErrCode != ErrCodeNo {
856
		cc.goAway.ErrCode = old.ErrCode
857
	}
858
	last := f.LastStreamID
859
	for streamID, cs := range cc.streams {
860
		if streamID > last {
861
			cs.abortStreamLocked(errClientConnGotGoAway)
862
		}
863
	}
864
}
865

866
// CanTakeNewRequest reports whether the connection can take a new request,
867
// meaning it has not been closed or received or sent a GOAWAY.
868
//
869
// If the caller is going to immediately make a new request on this
870
// connection, use ReserveNewRequest instead.
871
func (cc *ClientConn) CanTakeNewRequest() bool {
872
	cc.mu.Lock()
873
	defer cc.mu.Unlock()
874
	return cc.canTakeNewRequestLocked()
875
}
876

877
// ReserveNewRequest is like CanTakeNewRequest but also reserves a
878
// concurrent stream in cc. The reservation is decremented on the
879
// next call to RoundTrip.
880
func (cc *ClientConn) ReserveNewRequest() bool {
881
	cc.mu.Lock()
882
	defer cc.mu.Unlock()
883
	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
884
		return false
885
	}
886
	cc.streamsReserved++
887
	return true
888
}
889

890
// ClientConnState describes the state of a ClientConn.
891
type ClientConnState struct {
892
	// Closed is whether the connection is closed.
893
	Closed bool
894

895
	// Closing is whether the connection is in the process of
896
	// closing. It may be closing due to shutdown, being a
897
	// single-use connection, being marked as DoNotReuse, or
898
	// having received a GOAWAY frame.
899
	Closing bool
900

901
	// StreamsActive is how many streams are active.
902
	StreamsActive int
903

904
	// StreamsReserved is how many streams have been reserved via
905
	// ClientConn.ReserveNewRequest.
906
	StreamsReserved int
907

908
	// StreamsPending is how many requests have been sent in excess
909
	// of the peer's advertised MaxConcurrentStreams setting and
910
	// are waiting for other streams to complete.
911
	StreamsPending int
912

913
	// MaxConcurrentStreams is how many concurrent streams the
914
	// peer advertised as acceptable. Zero means no SETTINGS
915
	// frame has been received yet.
916
	MaxConcurrentStreams uint32
917

918
	// LastIdle, if non-zero, is when the connection last
919
	// transitioned to idle state.
920
	LastIdle time.Time
921
}
922

923
// State returns a snapshot of cc's state.
924
func (cc *ClientConn) State() ClientConnState {
925
	cc.wmu.Lock()
926
	maxConcurrent := cc.maxConcurrentStreams
927
	if !cc.seenSettings {
928
		maxConcurrent = 0
929
	}
930
	cc.wmu.Unlock()
931

932
	cc.mu.Lock()
933
	defer cc.mu.Unlock()
934
	return ClientConnState{
935
		Closed:               cc.closed,
936
		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
937
		StreamsActive:        len(cc.streams),
938
		StreamsReserved:      cc.streamsReserved,
939
		StreamsPending:       cc.pendingRequests,
940
		LastIdle:             cc.lastIdle,
941
		MaxConcurrentStreams: maxConcurrent,
942
	}
943
}
944

945
// clientConnIdleState describes the suitability of a client
946
// connection to initiate a new RoundTrip request.
947
type clientConnIdleState struct {
948
	canTakeNewRequest bool
949
}
950

951
func (cc *ClientConn) idleState() clientConnIdleState {
952
	cc.mu.Lock()
953
	defer cc.mu.Unlock()
954
	return cc.idleStateLocked()
955
}
956

957
func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
958
	if cc.singleUse && cc.nextStreamID > 1 {
959
		return
960
	}
961
	var maxConcurrentOkay bool
962
	if cc.t.StrictMaxConcurrentStreams {
963
		// We'll tell the caller we can take a new request to
964
		// prevent the caller from dialing a new TCP
965
		// connection, but then we'll block later before
966
		// writing it.
967
		maxConcurrentOkay = true
968
	} else {
969
		maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
970
	}
971

972
	st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
973
		!cc.doNotReuse &&
974
		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
975
		!cc.tooIdleLocked()
976
	return
977
}
978

979
func (cc *ClientConn) canTakeNewRequestLocked() bool {
980
	st := cc.idleStateLocked()
981
	return st.canTakeNewRequest
982
}
983

984
// tooIdleLocked reports whether this connection has been been sitting idle
985
// for too much wall time.
986
func (cc *ClientConn) tooIdleLocked() bool {
987
	// The Round(0) strips the monontonic clock reading so the
988
	// times are compared based on their wall time. We don't want
989
	// to reuse a connection that's been sitting idle during
990
	// VM/laptop suspend if monotonic time was also frozen.
991
	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
992
}
993

994
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
995
// only be called when we're idle, but because we're coming from a new
996
// goroutine, there could be a new request coming in at the same time,
997
// so this simply calls the synchronized closeIfIdle to shut down this
998
// connection. The timer could just call closeIfIdle, but this is more
999
// clear.
1000
func (cc *ClientConn) onIdleTimeout() {
1001
	cc.closeIfIdle()
1002
}
1003

1004
func (cc *ClientConn) closeConn() {
1005
	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
1006
	defer t.Stop()
1007
	cc.tconn.Close()
1008
}
1009

1010
// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
1011
// Try to shut it down more aggressively.
1012
func (cc *ClientConn) forceCloseConn() {
1013
	tc, ok := cc.tconn.(*tls.Conn)
1014
	if !ok {
1015
		return
1016
	}
1017
	if nc := tlsUnderlyingConn(tc); nc != nil {
1018
		nc.Close()
1019
	}
1020
}
1021

1022
func (cc *ClientConn) closeIfIdle() {
1023
	cc.mu.Lock()
1024
	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
1025
		cc.mu.Unlock()
1026
		return
1027
	}
1028
	cc.closed = true
1029
	nextID := cc.nextStreamID
1030
	// TODO: do clients send GOAWAY too? maybe? Just Close:
1031
	cc.mu.Unlock()
1032

1033
	if VerboseLogs {
1034
		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
1035
	}
1036
	cc.closeConn()
1037
}
1038

1039
func (cc *ClientConn) isDoNotReuseAndIdle() bool {
1040
	cc.mu.Lock()
1041
	defer cc.mu.Unlock()
1042
	return cc.doNotReuse && len(cc.streams) == 0
1043
}
1044

1045
var shutdownEnterWaitStateHook = func() {}
1046

1047
// Shutdown gracefully closes the client connection, waiting for running streams to complete.
1048
func (cc *ClientConn) Shutdown(ctx context.Context) error {
1049
	if err := cc.sendGoAway(); err != nil {
1050
		return err
1051
	}
1052
	// Wait for all in-flight streams to complete or connection to close
1053
	done := make(chan struct{})
1054
	cancelled := false // guarded by cc.mu
1055
	go func() {
1056
		cc.mu.Lock()
1057
		defer cc.mu.Unlock()
1058
		for {
1059
			if len(cc.streams) == 0 || cc.closed {
1060
				cc.closed = true
1061
				close(done)
1062
				break
1063
			}
1064
			if cancelled {
1065
				break
1066
			}
1067
			cc.cond.Wait()
1068
		}
1069
	}()
1070
	shutdownEnterWaitStateHook()
1071
	select {
1072
	case <-done:
1073
		cc.closeConn()
1074
		return nil
1075
	case <-ctx.Done():
1076
		cc.mu.Lock()
1077
		// Free the goroutine above
1078
		cancelled = true
1079
		cc.cond.Broadcast()
1080
		cc.mu.Unlock()
1081
		return ctx.Err()
1082
	}
1083
}
1084

1085
func (cc *ClientConn) sendGoAway() error {
1086
	cc.mu.Lock()
1087
	closing := cc.closing
1088
	cc.closing = true
1089
	maxStreamID := cc.nextStreamID
1090
	cc.mu.Unlock()
1091
	if closing {
1092
		// GOAWAY sent already
1093
		return nil
1094
	}
1095

1096
	cc.wmu.Lock()
1097
	defer cc.wmu.Unlock()
1098
	// Send a graceful shutdown frame to server
1099
	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1100
		return err
1101
	}
1102
	if err := cc.bw.Flush(); err != nil {
1103
		return err
1104
	}
1105
	// Prevent new requests
1106
	return nil
1107
}
1108

1109
// closes the client connection immediately. In-flight requests are interrupted.
1110
// err is sent to streams.
1111
func (cc *ClientConn) closeForError(err error) {
1112
	cc.mu.Lock()
1113
	cc.closed = true
1114
	for _, cs := range cc.streams {
1115
		cs.abortStreamLocked(err)
1116
	}
1117
	cc.cond.Broadcast()
1118
	cc.mu.Unlock()
1119
	cc.closeConn()
1120
}
1121

1122
// Close closes the client connection immediately.
1123
//
1124
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
1125
func (cc *ClientConn) Close() error {
1126
	err := errors.New("http2: client connection force closed via ClientConn.Close")
1127
	cc.closeForError(err)
1128
	return nil
1129
}
1130

1131
// closes the client connection immediately. In-flight requests are interrupted.
1132
func (cc *ClientConn) closeForLostPing() {
1133
	err := errors.New("http2: client connection lost")
1134
	if f := cc.t.CountError; f != nil {
1135
		f("conn_close_lost_ping")
1136
	}
1137
	cc.closeForError(err)
1138
}
1139

1140
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
1141
// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
1142
var errRequestCanceled = errors.New("net/http: request canceled")
1143

1144
func commaSeparatedTrailers(req *http.Request) (string, error) {
1145
	keys := make([]string, 0, len(req.Trailer))
1146
	for k := range req.Trailer {
1147
		k = canonicalHeader(k)
1148
		switch k {
1149
		case "Transfer-Encoding", "Trailer", "Content-Length":
1150
			return "", fmt.Errorf("invalid Trailer key %q", k)
1151
		}
1152
		keys = append(keys, k)
1153
	}
1154
	if len(keys) > 0 {
1155
		sort.Strings(keys)
1156
		return strings.Join(keys, ","), nil
1157
	}
1158
	return "", nil
1159
}
1160

1161
func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1162
	if cc.t.t1 != nil {
1163
		return cc.t.t1.ResponseHeaderTimeout
1164
	}
1165
	// No way to do this (yet?) with just an http2.Transport. Probably
1166
	// no need. Request.Cancel this is the new way. We only need to support
1167
	// this for compatibility with the old http.Transport fields when
1168
	// we're doing transparent http2.
1169
	return 0
1170
}
1171

1172
// checkConnHeaders checks whether req has any invalid connection-level headers.
1173
// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
1174
// Certain headers are special-cased as okay but not transmitted later.
1175
func checkConnHeaders(req *http.Request) error {
1176
	if v := req.Header.Get("Upgrade"); v != "" {
1177
		return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
1178
	}
1179
	if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
1180
		return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
1181
	}
1182
	if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !asciiEqualFold(vv[0], "close") && !asciiEqualFold(vv[0], "keep-alive")) {
1183
		return fmt.Errorf("http2: invalid Connection request header: %q", vv)
1184
	}
1185
	return nil
1186
}
1187

1188
// actualContentLength returns a sanitized version of
1189
// req.ContentLength, where 0 actually means zero (not unknown) and -1
1190
// means unknown.
1191
func actualContentLength(req *http.Request) int64 {
1192
	if req.Body == nil || req.Body == http.NoBody {
1193
		return 0
1194
	}
1195
	if req.ContentLength != 0 {
1196
		return req.ContentLength
1197
	}
1198
	return -1
1199
}
1200

1201
func (cc *ClientConn) decrStreamReservations() {
1202
	cc.mu.Lock()
1203
	defer cc.mu.Unlock()
1204
	cc.decrStreamReservationsLocked()
1205
}
1206

1207
func (cc *ClientConn) decrStreamReservationsLocked() {
1208
	if cc.streamsReserved > 0 {
1209
		cc.streamsReserved--
1210
	}
1211
}
1212

1213
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1214
	ctx := req.Context()
1215
	cs := &clientStream{
1216
		cc:                   cc,
1217
		ctx:                  ctx,
1218
		reqCancel:            req.Cancel,
1219
		isHead:               req.Method == "HEAD",
1220
		reqBody:              req.Body,
1221
		reqBodyContentLength: actualContentLength(req),
1222
		trace:                httptrace.ContextClientTrace(ctx),
1223
		peerClosed:           make(chan struct{}),
1224
		abort:                make(chan struct{}),
1225
		respHeaderRecv:       make(chan struct{}),
1226
		donec:                make(chan struct{}),
1227
	}
1228
	go cs.doRequest(req)
1229

1230
	waitDone := func() error {
1231
		select {
1232
		case <-cs.donec:
1233
			return nil
1234
		case <-ctx.Done():
1235
			return ctx.Err()
1236
		case <-cs.reqCancel:
1237
			return errRequestCanceled
1238
		}
1239
	}
1240

1241
	handleResponseHeaders := func() (*http.Response, error) {
1242
		res := cs.res
1243
		if res.StatusCode > 299 {
1244
			// On error or status code 3xx, 4xx, 5xx, etc abort any
1245
			// ongoing write, assuming that the server doesn't care
1246
			// about our request body. If the server replied with 1xx or
1247
			// 2xx, however, then assume the server DOES potentially
1248
			// want our body (e.g. full-duplex streaming:
1249
			// golang.org/issue/13444). If it turns out the server
1250
			// doesn't, they'll RST_STREAM us soon enough. This is a
1251
			// heuristic to avoid adding knobs to Transport. Hopefully
1252
			// we can keep it.
1253
			cs.abortRequestBodyWrite()
1254
		}
1255
		res.Request = req
1256
		res.TLS = cc.tlsState
1257
		if res.Body == noBody && actualContentLength(req) == 0 {
1258
			// If there isn't a request or response body still being
1259
			// written, then wait for the stream to be closed before
1260
			// RoundTrip returns.
1261
			if err := waitDone(); err != nil {
1262
				return nil, err
1263
			}
1264
		}
1265
		return res, nil
1266
	}
1267

1268
	for {
1269
		select {
1270
		case <-cs.respHeaderRecv:
1271
			return handleResponseHeaders()
1272
		case <-cs.abort:
1273
			select {
1274
			case <-cs.respHeaderRecv:
1275
				// If both cs.respHeaderRecv and cs.abort are signaling,
1276
				// pick respHeaderRecv. The server probably wrote the
1277
				// response and immediately reset the stream.
1278
				// golang.org/issue/49645
1279
				return handleResponseHeaders()
1280
			default:
1281
				waitDone()
1282
				return nil, cs.abortErr
1283
			}
1284
		case <-ctx.Done():
1285
			err := ctx.Err()
1286
			cs.abortStream(err)
1287
			return nil, err
1288
		case <-cs.reqCancel:
1289
			cs.abortStream(errRequestCanceled)
1290
			return nil, errRequestCanceled
1291
		}
1292
	}
1293
}
1294

1295
// doRequest runs for the duration of the request lifetime.
1296
//
1297
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
1298
func (cs *clientStream) doRequest(req *http.Request) {
1299
	err := cs.writeRequest(req)
1300
	cs.cleanupWriteRequest(err)
1301
}
1302

1303
// writeRequest sends a request.
1304
//
1305
// It returns nil after the request is written, the response read,
1306
// and the request stream is half-closed by the peer.
1307
//
1308
// It returns non-nil if the request ends otherwise.
1309
// If the returned error is StreamError, the error Code may be used in resetting the stream.
1310
func (cs *clientStream) writeRequest(req *http.Request) (err error) {
1311
	cc := cs.cc
1312
	ctx := cs.ctx
1313

1314
	if err := checkConnHeaders(req); err != nil {
1315
		return err
1316
	}
1317

1318
	// Acquire the new-request lock by writing to reqHeaderMu.
1319
	// This lock guards the critical section covering allocating a new stream ID
1320
	// (requires mu) and creating the stream (requires wmu).
1321
	if cc.reqHeaderMu == nil {
1322
		panic("RoundTrip on uninitialized ClientConn") // for tests
1323
	}
1324
	select {
1325
	case cc.reqHeaderMu <- struct{}{}:
1326
	case <-cs.reqCancel:
1327
		return errRequestCanceled
1328
	case <-ctx.Done():
1329
		return ctx.Err()
1330
	}
1331

1332
	cc.mu.Lock()
1333
	if cc.idleTimer != nil {
1334
		cc.idleTimer.Stop()
1335
	}
1336
	cc.decrStreamReservationsLocked()
1337
	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1338
		cc.mu.Unlock()
1339
		<-cc.reqHeaderMu
1340
		return err
1341
	}
1342
	cc.addStreamLocked(cs) // assigns stream ID
1343
	if isConnectionCloseRequest(req) {
1344
		cc.doNotReuse = true
1345
	}
1346
	cc.mu.Unlock()
1347

1348
	// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
1349
	if !cc.t.disableCompression() &&
1350
		req.Header.Get("Accept-Encoding") == "" &&
1351
		req.Header.Get("Range") == "" &&
1352
		!cs.isHead {
1353
		// Request gzip only, not deflate. Deflate is ambiguous and
1354
		// not as universally supported anyway.
1355
		// See: https://zlib.net/zlib_faq.html#faq39
1356
		//
1357
		// Note that we don't request this for HEAD requests,
1358
		// due to a bug in nginx:
1359
		//   http://trac.nginx.org/nginx/ticket/358
1360
		//   https://golang.org/issue/5522
1361
		//
1362
		// We don't request gzip if the request is for a range, since
1363
		// auto-decoding a portion of a gzipped document will just fail
1364
		// anyway. See https://golang.org/issue/8923
1365
		cs.requestedGzip = true
1366
	}
1367

1368
	continueTimeout := cc.t.expectContinueTimeout()
1369
	if continueTimeout != 0 {
1370
		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1371
			continueTimeout = 0
1372
		} else {
1373
			cs.on100 = make(chan struct{}, 1)
1374
		}
1375
	}
1376

1377
	// Past this point (where we send request headers), it is possible for
1378
	// RoundTrip to return successfully. Since the RoundTrip contract permits
1379
	// the caller to "mutate or reuse" the Request after closing the Response's Body,
1380
	// we must take care when referencing the Request from here on.
1381
	err = cs.encodeAndWriteHeaders(req)
1382
	<-cc.reqHeaderMu
1383
	if err != nil {
1384
		return err
1385
	}
1386

1387
	hasBody := cs.reqBodyContentLength != 0
1388
	if !hasBody {
1389
		cs.sentEndStream = true
1390
	} else {
1391
		if continueTimeout != 0 {
1392
			traceWait100Continue(cs.trace)
1393
			timer := time.NewTimer(continueTimeout)
1394
			select {
1395
			case <-timer.C:
1396
				err = nil
1397
			case <-cs.on100:
1398
				err = nil
1399
			case <-cs.abort:
1400
				err = cs.abortErr
1401
			case <-ctx.Done():
1402
				err = ctx.Err()
1403
			case <-cs.reqCancel:
1404
				err = errRequestCanceled
1405
			}
1406
			timer.Stop()
1407
			if err != nil {
1408
				traceWroteRequest(cs.trace, err)
1409
				return err
1410
			}
1411
		}
1412

1413
		if err = cs.writeRequestBody(req); err != nil {
1414
			if err != errStopReqBodyWrite {
1415
				traceWroteRequest(cs.trace, err)
1416
				return err
1417
			}
1418
		} else {
1419
			cs.sentEndStream = true
1420
		}
1421
	}
1422

1423
	traceWroteRequest(cs.trace, err)
1424

1425
	var respHeaderTimer <-chan time.Time
1426
	var respHeaderRecv chan struct{}
1427
	if d := cc.responseHeaderTimeout(); d != 0 {
1428
		timer := time.NewTimer(d)
1429
		defer timer.Stop()
1430
		respHeaderTimer = timer.C
1431
		respHeaderRecv = cs.respHeaderRecv
1432
	}
1433
	// Wait until the peer half-closes its end of the stream,
1434
	// or until the request is aborted (via context, error, or otherwise),
1435
	// whichever comes first.
1436
	for {
1437
		select {
1438
		case <-cs.peerClosed:
1439
			return nil
1440
		case <-respHeaderTimer:
1441
			return errTimeout
1442
		case <-respHeaderRecv:
1443
			respHeaderRecv = nil
1444
			respHeaderTimer = nil // keep waiting for END_STREAM
1445
		case <-cs.abort:
1446
			return cs.abortErr
1447
		case <-ctx.Done():
1448
			return ctx.Err()
1449
		case <-cs.reqCancel:
1450
			return errRequestCanceled
1451
		}
1452
	}
1453
}
1454

1455
func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
1456
	cc := cs.cc
1457
	ctx := cs.ctx
1458

1459
	cc.wmu.Lock()
1460
	defer cc.wmu.Unlock()
1461

1462
	// If the request was canceled while waiting for cc.mu, just quit.
1463
	select {
1464
	case <-cs.abort:
1465
		return cs.abortErr
1466
	case <-ctx.Done():
1467
		return ctx.Err()
1468
	case <-cs.reqCancel:
1469
		return errRequestCanceled
1470
	default:
1471
	}
1472

1473
	// Encode headers.
1474
	//
1475
	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
1476
	// sent by writeRequestBody below, along with any Trailers,
1477
	// again in form HEADERS{1}, CONTINUATION{0,})
1478
	trailers, err := commaSeparatedTrailers(req)
1479
	if err != nil {
1480
		return err
1481
	}
1482
	hasTrailers := trailers != ""
1483
	contentLen := actualContentLength(req)
1484
	hasBody := contentLen != 0
1485
	hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
1486
	if err != nil {
1487
		return err
1488
	}
1489

1490
	// Write the request.
1491
	endStream := !hasBody && !hasTrailers
1492
	cs.sentHeaders = true
1493
	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1494
	traceWroteHeaders(cs.trace)
1495
	return err
1496
}
1497

1498
// cleanupWriteRequest performs post-request tasks.
1499
//
1500
// If err (the result of writeRequest) is non-nil and the stream is not closed,
1501
// cleanupWriteRequest will send a reset to the peer.
1502
func (cs *clientStream) cleanupWriteRequest(err error) {
1503
	cc := cs.cc
1504

1505
	if cs.ID == 0 {
1506
		// We were canceled before creating the stream, so return our reservation.
1507
		cc.decrStreamReservations()
1508
	}
1509

1510
	// TODO: write h12Compare test showing whether
1511
	// Request.Body is closed by the Transport,
1512
	// and in multiple cases: server replies <=299 and >299
1513
	// while still writing request body
1514
	cc.mu.Lock()
1515
	mustCloseBody := false
1516
	if cs.reqBody != nil && cs.reqBodyClosed == nil {
1517
		mustCloseBody = true
1518
		cs.reqBodyClosed = make(chan struct{})
1519
	}
1520
	bodyClosed := cs.reqBodyClosed
1521
	cc.mu.Unlock()
1522
	if mustCloseBody {
1523
		cs.reqBody.Close()
1524
		close(bodyClosed)
1525
	}
1526
	if bodyClosed != nil {
1527
		<-bodyClosed
1528
	}
1529

1530
	if err != nil && cs.sentEndStream {
1531
		// If the connection is closed immediately after the response is read,
1532
		// we may be aborted before finishing up here. If the stream was closed
1533
		// cleanly on both sides, there is no error.
1534
		select {
1535
		case <-cs.peerClosed:
1536
			err = nil
1537
		default:
1538
		}
1539
	}
1540
	if err != nil {
1541
		cs.abortStream(err) // possibly redundant, but harmless
1542
		if cs.sentHeaders {
1543
			if se, ok := err.(StreamError); ok {
1544
				if se.Cause != errFromPeer {
1545
					cc.writeStreamReset(cs.ID, se.Code, err)
1546
				}
1547
			} else {
1548
				cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
1549
			}
1550
		}
1551
		cs.bufPipe.CloseWithError(err) // no-op if already closed
1552
	} else {
1553
		if cs.sentHeaders && !cs.sentEndStream {
1554
			cc.writeStreamReset(cs.ID, ErrCodeNo, nil)
1555
		}
1556
		cs.bufPipe.CloseWithError(errRequestCanceled)
1557
	}
1558
	if cs.ID != 0 {
1559
		cc.forgetStreamID(cs.ID)
1560
	}
1561

1562
	cc.wmu.Lock()
1563
	werr := cc.werr
1564
	cc.wmu.Unlock()
1565
	if werr != nil {
1566
		cc.Close()
1567
	}
1568

1569
	close(cs.donec)
1570
}
1571

1572
// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
1573
// Must hold cc.mu.
1574
func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1575
	for {
1576
		cc.lastActive = time.Now()
1577
		if cc.closed || !cc.canTakeNewRequestLocked() {
1578
			return errClientConnUnusable
1579
		}
1580
		cc.lastIdle = time.Time{}
1581
		if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
1582
			return nil
1583
		}
1584
		cc.pendingRequests++
1585
		cc.cond.Wait()
1586
		cc.pendingRequests--
1587
		select {
1588
		case <-cs.abort:
1589
			return cs.abortErr
1590
		default:
1591
		}
1592
	}
1593
}
1594

1595
// requires cc.wmu be held
1596
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1597
	first := true // first frame written (HEADERS is first, then CONTINUATION)
1598
	for len(hdrs) > 0 && cc.werr == nil {
1599
		chunk := hdrs
1600
		if len(chunk) > maxFrameSize {
1601
			chunk = chunk[:maxFrameSize]
1602
		}
1603
		hdrs = hdrs[len(chunk):]
1604
		endHeaders := len(hdrs) == 0
1605
		if first {
1606
			cc.fr.WriteHeaders(HeadersFrameParam{
1607
				StreamID:      streamID,
1608
				BlockFragment: chunk,
1609
				EndStream:     endStream,
1610
				EndHeaders:    endHeaders,
1611
			})
1612
			first = false
1613
		} else {
1614
			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1615
		}
1616
	}
1617
	cc.bw.Flush()
1618
	return cc.werr
1619
}
1620

1621
// internal error values; they don't escape to callers
1622
var (
1623
	// abort request body write; don't send cancel
1624
	errStopReqBodyWrite = errors.New("http2: aborting request body write")
1625

1626
	// abort request body write, but send stream reset of cancel.
1627
	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1628

1629
	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1630
)
1631

1632
// frameScratchBufferLen returns the length of a buffer to use for
1633
// outgoing request bodies to read/write to/from.
1634
//
1635
// It returns max(1, min(peer's advertised max frame size,
1636
// Request.ContentLength+1, 512KB)).
1637
func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1638
	const max = 512 << 10
1639
	n := int64(maxFrameSize)
1640
	if n > max {
1641
		n = max
1642
	}
1643
	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1644
		// Add an extra byte past the declared content-length to
1645
		// give the caller's Request.Body io.Reader a chance to
1646
		// give us more bytes than they declared, so we can catch it
1647
		// early.
1648
		n = cl + 1
1649
	}
1650
	if n < 1 {
1651
		return 1
1652
	}
1653
	return int(n) // doesn't truncate; max is 512K
1654
}
1655

1656
var bufPool sync.Pool // of *[]byte
1657

1658
func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
1659
	cc := cs.cc
1660
	body := cs.reqBody
1661
	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
1662

1663
	hasTrailers := req.Trailer != nil
1664
	remainLen := cs.reqBodyContentLength
1665
	hasContentLen := remainLen != -1
1666

1667
	cc.mu.Lock()
1668
	maxFrameSize := int(cc.maxFrameSize)
1669
	cc.mu.Unlock()
1670

1671
	// Scratch buffer for reading into & writing from.
1672
	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1673
	var buf []byte
1674
	if bp, ok := bufPool.Get().(*[]byte); ok && len(*bp) >= scratchLen {
1675
		defer bufPool.Put(bp)
1676
		buf = *bp
1677
	} else {
1678
		buf = make([]byte, scratchLen)
1679
		defer bufPool.Put(&buf)
1680
	}
1681

1682
	var sawEOF bool
1683
	for !sawEOF {
1684
		n, err := body.Read(buf)
1685
		if hasContentLen {
1686
			remainLen -= int64(n)
1687
			if remainLen == 0 && err == nil {
1688
				// The request body's Content-Length was predeclared and
1689
				// we just finished reading it all, but the underlying io.Reader
1690
				// returned the final chunk with a nil error (which is one of
1691
				// the two valid things a Reader can do at EOF). Because we'd prefer
1692
				// to send the END_STREAM bit early, double-check that we're actually
1693
				// at EOF. Subsequent reads should return (0, EOF) at this point.
1694
				// If either value is different, we return an error in one of two ways below.
1695
				var scratch [1]byte
1696
				var n1 int
1697
				n1, err = body.Read(scratch[:])
1698
				remainLen -= int64(n1)
1699
			}
1700
			if remainLen < 0 {
1701
				err = errReqBodyTooLong
1702
				return err
1703
			}
1704
		}
1705
		if err != nil {
1706
			cc.mu.Lock()
1707
			bodyClosed := cs.reqBodyClosed != nil
1708
			cc.mu.Unlock()
1709
			switch {
1710
			case bodyClosed:
1711
				return errStopReqBodyWrite
1712
			case err == io.EOF:
1713
				sawEOF = true
1714
				err = nil
1715
			default:
1716
				return err
1717
			}
1718
		}
1719

1720
		remain := buf[:n]
1721
		for len(remain) > 0 && err == nil {
1722
			var allowed int32
1723
			allowed, err = cs.awaitFlowControl(len(remain))
1724
			if err != nil {
1725
				return err
1726
			}
1727
			cc.wmu.Lock()
1728
			data := remain[:allowed]
1729
			remain = remain[allowed:]
1730
			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1731
			err = cc.fr.WriteData(cs.ID, sentEnd, data)
1732
			if err == nil {
1733
				// TODO(bradfitz): this flush is for latency, not bandwidth.
1734
				// Most requests won't need this. Make this opt-in or
1735
				// opt-out?  Use some heuristic on the body type? Nagel-like
1736
				// timers?  Based on 'n'? Only last chunk of this for loop,
1737
				// unless flow control tokens are low? For now, always.
1738
				// If we change this, see comment below.
1739
				err = cc.bw.Flush()
1740
			}
1741
			cc.wmu.Unlock()
1742
		}
1743
		if err != nil {
1744
			return err
1745
		}
1746
	}
1747

1748
	if sentEnd {
1749
		// Already sent END_STREAM (which implies we have no
1750
		// trailers) and flushed, because currently all
1751
		// WriteData frames above get a flush. So we're done.
1752
		return nil
1753
	}
1754

1755
	// Since the RoundTrip contract permits the caller to "mutate or reuse"
1756
	// a request after the Response's Body is closed, verify that this hasn't
1757
	// happened before accessing the trailers.
1758
	cc.mu.Lock()
1759
	trailer := req.Trailer
1760
	err = cs.abortErr
1761
	cc.mu.Unlock()
1762
	if err != nil {
1763
		return err
1764
	}
1765

1766
	cc.wmu.Lock()
1767
	defer cc.wmu.Unlock()
1768
	var trls []byte
1769
	if len(trailer) > 0 {
1770
		trls, err = cc.encodeTrailers(trailer)
1771
		if err != nil {
1772
			return err
1773
		}
1774
	}
1775

1776
	// Two ways to send END_STREAM: either with trailers, or
1777
	// with an empty DATA frame.
1778
	if len(trls) > 0 {
1779
		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1780
	} else {
1781
		err = cc.fr.WriteData(cs.ID, true, nil)
1782
	}
1783
	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1784
		err = ferr
1785
	}
1786
	return err
1787
}
1788

1789
// awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
1790
// control tokens from the server.
1791
// It returns either the non-zero number of tokens taken or an error
1792
// if the stream is dead.
1793
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1794
	cc := cs.cc
1795
	ctx := cs.ctx
1796
	cc.mu.Lock()
1797
	defer cc.mu.Unlock()
1798
	for {
1799
		if cc.closed {
1800
			return 0, errClientConnClosed
1801
		}
1802
		if cs.reqBodyClosed != nil {
1803
			return 0, errStopReqBodyWrite
1804
		}
1805
		select {
1806
		case <-cs.abort:
1807
			return 0, cs.abortErr
1808
		case <-ctx.Done():
1809
			return 0, ctx.Err()
1810
		case <-cs.reqCancel:
1811
			return 0, errRequestCanceled
1812
		default:
1813
		}
1814
		if a := cs.flow.available(); a > 0 {
1815
			take := a
1816
			if int(take) > maxBytes {
1817

1818
				take = int32(maxBytes) // can't truncate int; take is int32
1819
			}
1820
			if take > int32(cc.maxFrameSize) {
1821
				take = int32(cc.maxFrameSize)
1822
			}
1823
			cs.flow.take(take)
1824
			return take, nil
1825
		}
1826
		cc.cond.Wait()
1827
	}
1828
}
1829

1830
var errNilRequestURL = errors.New("http2: Request.URI is nil")
1831

1832
// requires cc.wmu be held.
1833
func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
1834
	cc.hbuf.Reset()
1835
	if req.URL == nil {
1836
		return nil, errNilRequestURL
1837
	}
1838

1839
	host := req.Host
1840
	if host == "" {
1841
		host = req.URL.Host
1842
	}
1843
	host, err := httpguts.PunycodeHostPort(host)
1844
	if err != nil {
1845
		return nil, err
1846
	}
1847

1848
	var path string
1849
	if req.Method != "CONNECT" {
1850
		path = req.URL.RequestURI()
1851
		if !validPseudoPath(path) {
1852
			orig := path
1853
			path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
1854
			if !validPseudoPath(path) {
1855
				if req.URL.Opaque != "" {
1856
					return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
1857
				} else {
1858
					return nil, fmt.Errorf("invalid request :path %q", orig)
1859
				}
1860
			}
1861
		}
1862
	}
1863

1864
	// Check for any invalid headers and return an error before we
1865
	// potentially pollute our hpack state. (We want to be able to
1866
	// continue to reuse the hpack encoder for future requests)
1867
	for k, vv := range req.Header {
1868
		if !httpguts.ValidHeaderFieldName(k) {
1869
			return nil, fmt.Errorf("invalid HTTP header name %q", k)
1870
		}
1871
		for _, v := range vv {
1872
			if !httpguts.ValidHeaderFieldValue(v) {
1873
				// Don't include the value in the error, because it may be sensitive.
1874
				return nil, fmt.Errorf("invalid HTTP header value for header %q", k)
1875
			}
1876
		}
1877
	}
1878

1879
	enumerateHeaders := func(f func(name, value string)) {
1880
		// 8.1.2.3 Request Pseudo-Header Fields
1881
		// The :path pseudo-header field includes the path and query parts of the
1882
		// target URI (the path-absolute production and optionally a '?' character
1883
		// followed by the query production (see Sections 3.3 and 3.4 of
1884
		// [RFC3986]).
1885
		f(":authority", host)
1886
		m := req.Method
1887
		if m == "" {
1888
			m = http.MethodGet
1889
		}
1890
		f(":method", m)
1891
		if req.Method != "CONNECT" {
1892
			f(":path", path)
1893
			f(":scheme", req.URL.Scheme)
1894
		}
1895
		if trailers != "" {
1896
			f("trailer", trailers)
1897
		}
1898

1899
		var didUA bool
1900
		for k, vv := range req.Header {
1901
			if asciiEqualFold(k, "host") || asciiEqualFold(k, "content-length") {
1902
				// Host is :authority, already sent.
1903
				// Content-Length is automatic, set below.
1904
				continue
1905
			} else if asciiEqualFold(k, "connection") ||
1906
				asciiEqualFold(k, "proxy-connection") ||
1907
				asciiEqualFold(k, "transfer-encoding") ||
1908
				asciiEqualFold(k, "upgrade") ||
1909
				asciiEqualFold(k, "keep-alive") {
1910
				// Per 8.1.2.2 Connection-Specific Header
1911
				// Fields, don't send connection-specific
1912
				// fields. We have already checked if any
1913
				// are error-worthy so just ignore the rest.
1914
				continue
1915
			} else if asciiEqualFold(k, "user-agent") {
1916
				// Match Go's http1 behavior: at most one
1917
				// User-Agent. If set to nil or empty string,
1918
				// then omit it. Otherwise if not mentioned,
1919
				// include the default (below).
1920
				didUA = true
1921
				if len(vv) < 1 {
1922
					continue
1923
				}
1924
				vv = vv[:1]
1925
				if vv[0] == "" {
1926
					continue
1927
				}
1928
			} else if asciiEqualFold(k, "cookie") {
1929
				// Per 8.1.2.5 To allow for better compression efficiency, the
1930
				// Cookie header field MAY be split into separate header fields,
1931
				// each with one or more cookie-pairs.
1932
				for _, v := range vv {
1933
					for {
1934
						p := strings.IndexByte(v, ';')
1935
						if p < 0 {
1936
							break
1937
						}
1938
						f("cookie", v[:p])
1939
						p++
1940
						// strip space after semicolon if any.
1941
						for p+1 <= len(v) && v[p] == ' ' {
1942
							p++
1943
						}
1944
						v = v[p:]
1945
					}
1946
					if len(v) > 0 {
1947
						f("cookie", v)
1948
					}
1949
				}
1950
				continue
1951
			}
1952

1953
			for _, v := range vv {
1954
				f(k, v)
1955
			}
1956
		}
1957
		if shouldSendReqContentLength(req.Method, contentLength) {
1958
			f("content-length", strconv.FormatInt(contentLength, 10))
1959
		}
1960
		if addGzipHeader {
1961
			f("accept-encoding", "gzip")
1962
		}
1963
		if !didUA {
1964
			f("user-agent", defaultUserAgent)
1965
		}
1966
	}
1967

1968
	// Do a first pass over the headers counting bytes to ensure
1969
	// we don't exceed cc.peerMaxHeaderListSize. This is done as a
1970
	// separate pass before encoding the headers to prevent
1971
	// modifying the hpack state.
1972
	hlSize := uint64(0)
1973
	enumerateHeaders(func(name, value string) {
1974
		hf := hpack.HeaderField{Name: name, Value: value}
1975
		hlSize += uint64(hf.Size())
1976
	})
1977

1978
	if hlSize > cc.peerMaxHeaderListSize {
1979
		return nil, errRequestHeaderListSize
1980
	}
1981

1982
	trace := httptrace.ContextClientTrace(req.Context())
1983
	traceHeaders := traceHasWroteHeaderField(trace)
1984

1985
	// Header list size is ok. Write the headers.
1986
	enumerateHeaders(func(name, value string) {
1987
		name, ascii := lowerHeader(name)
1988
		if !ascii {
1989
			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
1990
			// field names have to be ASCII characters (just as in HTTP/1.x).
1991
			return
1992
		}
1993
		cc.writeHeader(name, value)
1994
		if traceHeaders {
1995
			traceWroteHeaderField(trace, name, value)
1996
		}
1997
	})
1998

1999
	return cc.hbuf.Bytes(), nil
2000
}
2001

2002
// shouldSendReqContentLength reports whether the http2.Transport should send
2003
// a "content-length" request header. This logic is basically a copy of the net/http
2004
// transferWriter.shouldSendContentLength.
2005
// The contentLength is the corrected contentLength (so 0 means actually 0, not unknown).
2006
// -1 means unknown.
2007
func shouldSendReqContentLength(method string, contentLength int64) bool {
2008
	if contentLength > 0 {
2009
		return true
2010
	}
2011
	if contentLength < 0 {
2012
		return false
2013
	}
2014
	// For zero bodies, whether we send a content-length depends on the method.
2015
	// It also kinda doesn't matter for http2 either way, with END_STREAM.
2016
	switch method {
2017
	case "POST", "PUT", "PATCH":
2018
		return true
2019
	default:
2020
		return false
2021
	}
2022
}
2023

2024
// requires cc.wmu be held.
2025
func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
2026
	cc.hbuf.Reset()
2027

2028
	hlSize := uint64(0)
2029
	for k, vv := range trailer {
2030
		for _, v := range vv {
2031
			hf := hpack.HeaderField{Name: k, Value: v}
2032
			hlSize += uint64(hf.Size())
2033
		}
2034
	}
2035
	if hlSize > cc.peerMaxHeaderListSize {
2036
		return nil, errRequestHeaderListSize
2037
	}
2038

2039
	for k, vv := range trailer {
2040
		lowKey, ascii := lowerHeader(k)
2041
		if !ascii {
2042
			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
2043
			// field names have to be ASCII characters (just as in HTTP/1.x).
2044
			continue
2045
		}
2046
		// Transfer-Encoding, etc.. have already been filtered at the
2047
		// start of RoundTrip
2048
		for _, v := range vv {
2049
			cc.writeHeader(lowKey, v)
2050
		}
2051
	}
2052
	return cc.hbuf.Bytes(), nil
2053
}
2054

2055
func (cc *ClientConn) writeHeader(name, value string) {
2056
	if VerboseLogs {
2057
		log.Printf("http2: Transport encoding header %q = %q", name, value)
2058
	}
2059
	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
2060
}
2061

2062
type resAndError struct {
2063
	_   incomparable
2064
	res *http.Response
2065
	err error
2066
}
2067

2068
// requires cc.mu be held.
2069
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
2070
	cs.flow.add(int32(cc.initialWindowSize))
2071
	cs.flow.setConnFlow(&cc.flow)
2072
	cs.inflow.init(transportDefaultStreamFlow)
2073
	cs.ID = cc.nextStreamID
2074
	cc.nextStreamID += 2
2075
	cc.streams[cs.ID] = cs
2076
	if cs.ID == 0 {
2077
		panic("assigned stream ID 0")
2078
	}
2079
}
2080

2081
func (cc *ClientConn) forgetStreamID(id uint32) {
2082
	cc.mu.Lock()
2083
	slen := len(cc.streams)
2084
	delete(cc.streams, id)
2085
	if len(cc.streams) != slen-1 {
2086
		panic("forgetting unknown stream id")
2087
	}
2088
	cc.lastActive = time.Now()
2089
	if len(cc.streams) == 0 && cc.idleTimer != nil {
2090
		cc.idleTimer.Reset(cc.idleTimeout)
2091
		cc.lastIdle = time.Now()
2092
	}
2093
	// Wake up writeRequestBody via clientStream.awaitFlowControl and
2094
	// wake up RoundTrip if there is a pending request.
2095
	cc.cond.Broadcast()
2096

2097
	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
2098
	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
2099
		if VerboseLogs {
2100
			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
2101
		}
2102
		cc.closed = true
2103
		defer cc.closeConn()
2104
	}
2105

2106
	cc.mu.Unlock()
2107
}
2108

2109
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
2110
type clientConnReadLoop struct {
2111
	_  incomparable
2112
	cc *ClientConn
2113
}
2114

2115
// readLoop runs in its own goroutine and reads and dispatches frames.
2116
func (cc *ClientConn) readLoop() {
2117
	rl := &clientConnReadLoop{cc: cc}
2118
	defer rl.cleanup()
2119
	cc.readerErr = rl.run()
2120
	if ce, ok := cc.readerErr.(ConnectionError); ok {
2121
		cc.wmu.Lock()
2122
		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
2123
		cc.wmu.Unlock()
2124
	}
2125
}
2126

2127
// GoAwayError is returned by the Transport when the server closes the
2128
// TCP connection after sending a GOAWAY frame.
2129
type GoAwayError struct {
2130
	LastStreamID uint32
2131
	ErrCode      ErrCode
2132
	DebugData    string
2133
}
2134

2135
func (e GoAwayError) Error() string {
2136
	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
2137
		e.LastStreamID, e.ErrCode, e.DebugData)
2138
}
2139

2140
func isEOFOrNetReadError(err error) bool {
2141
	if err == io.EOF {
2142
		return true
2143
	}
2144
	ne, ok := err.(*net.OpError)
2145
	return ok && ne.Op == "read"
2146
}
2147

2148
func (rl *clientConnReadLoop) cleanup() {
2149
	cc := rl.cc
2150
	cc.t.connPool().MarkDead(cc)
2151
	defer cc.closeConn()
2152
	defer close(cc.readerDone)
2153

2154
	if cc.idleTimer != nil {
2155
		cc.idleTimer.Stop()
2156
	}
2157

2158
	// Close any response bodies if the server closes prematurely.
2159
	// TODO: also do this if we've written the headers but not
2160
	// gotten a response yet.
2161
	err := cc.readerErr
2162
	cc.mu.Lock()
2163
	if cc.goAway != nil && isEOFOrNetReadError(err) {
2164
		err = GoAwayError{
2165
			LastStreamID: cc.goAway.LastStreamID,
2166
			ErrCode:      cc.goAway.ErrCode,
2167
			DebugData:    cc.goAwayDebug,
2168
		}
2169
	} else if err == io.EOF {
2170
		err = io.ErrUnexpectedEOF
2171
	}
2172
	cc.closed = true
2173

2174
	for _, cs := range cc.streams {
2175
		select {
2176
		case <-cs.peerClosed:
2177
			// The server closed the stream before closing the conn,
2178
			// so no need to interrupt it.
2179
		default:
2180
			cs.abortStreamLocked(err)
2181
		}
2182
	}
2183
	cc.cond.Broadcast()
2184
	cc.mu.Unlock()
2185
}
2186

2187
// countReadFrameError calls Transport.CountError with a string
2188
// representing err.
2189
func (cc *ClientConn) countReadFrameError(err error) {
2190
	f := cc.t.CountError
2191
	if f == nil || err == nil {
2192
		return
2193
	}
2194
	if ce, ok := err.(ConnectionError); ok {
2195
		errCode := ErrCode(ce)
2196
		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2197
		return
2198
	}
2199
	if errors.Is(err, io.EOF) {
2200
		f("read_frame_eof")
2201
		return
2202
	}
2203
	if errors.Is(err, io.ErrUnexpectedEOF) {
2204
		f("read_frame_unexpected_eof")
2205
		return
2206
	}
2207
	if errors.Is(err, ErrFrameTooLarge) {
2208
		f("read_frame_too_large")
2209
		return
2210
	}
2211
	f("read_frame_other")
2212
}
2213

2214
func (rl *clientConnReadLoop) run() error {
2215
	cc := rl.cc
2216
	gotSettings := false
2217
	readIdleTimeout := cc.t.ReadIdleTimeout
2218
	var t *time.Timer
2219
	if readIdleTimeout != 0 {
2220
		t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2221
		defer t.Stop()
2222
	}
2223
	for {
2224
		f, err := cc.fr.ReadFrame()
2225
		if t != nil {
2226
			t.Reset(readIdleTimeout)
2227
		}
2228
		if err != nil {
2229
			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2230
		}
2231
		if se, ok := err.(StreamError); ok {
2232
			if cs := rl.streamByID(se.StreamID); cs != nil {
2233
				if se.Cause == nil {
2234
					se.Cause = cc.fr.errDetail
2235
				}
2236
				rl.endStreamError(cs, se)
2237
			}
2238
			continue
2239
		} else if err != nil {
2240
			cc.countReadFrameError(err)
2241
			return err
2242
		}
2243
		if VerboseLogs {
2244
			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2245
		}
2246
		if !gotSettings {
2247
			if _, ok := f.(*SettingsFrame); !ok {
2248
				cc.logf("protocol error: received %T before a SETTINGS frame", f)
2249
				return ConnectionError(ErrCodeProtocol)
2250
			}
2251
			gotSettings = true
2252
		}
2253

2254
		switch f := f.(type) {
2255
		case *MetaHeadersFrame:
2256
			err = rl.processHeaders(f)
2257
		case *DataFrame:
2258
			err = rl.processData(f)
2259
		case *GoAwayFrame:
2260
			err = rl.processGoAway(f)
2261
		case *RSTStreamFrame:
2262
			err = rl.processResetStream(f)
2263
		case *SettingsFrame:
2264
			err = rl.processSettings(f)
2265
		case *PushPromiseFrame:
2266
			err = rl.processPushPromise(f)
2267
		case *WindowUpdateFrame:
2268
			err = rl.processWindowUpdate(f)
2269
		case *PingFrame:
2270
			err = rl.processPing(f)
2271
		default:
2272
			cc.logf("Transport: unhandled response frame type %T", f)
2273
		}
2274
		if err != nil {
2275
			if VerboseLogs {
2276
				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2277
			}
2278
			return err
2279
		}
2280
	}
2281
}
2282

2283
func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2284
	cs := rl.streamByID(f.StreamID)
2285
	if cs == nil {
2286
		// We'd get here if we canceled a request while the
2287
		// server had its response still in flight. So if this
2288
		// was just something we canceled, ignore it.
2289
		return nil
2290
	}
2291
	if cs.readClosed {
2292
		rl.endStreamError(cs, StreamError{
2293
			StreamID: f.StreamID,
2294
			Code:     ErrCodeProtocol,
2295
			Cause:    errors.New("protocol error: headers after END_STREAM"),
2296
		})
2297
		return nil
2298
	}
2299
	if !cs.firstByte {
2300
		if cs.trace != nil {
2301
			// TODO(bradfitz): move first response byte earlier,
2302
			// when we first read the 9 byte header, not waiting
2303
			// until all the HEADERS+CONTINUATION frames have been
2304
			// merged. This works for now.
2305
			traceFirstResponseByte(cs.trace)
2306
		}
2307
		cs.firstByte = true
2308
	}
2309
	if !cs.pastHeaders {
2310
		cs.pastHeaders = true
2311
	} else {
2312
		return rl.processTrailers(cs, f)
2313
	}
2314

2315
	res, err := rl.handleResponse(cs, f)
2316
	if err != nil {
2317
		if _, ok := err.(ConnectionError); ok {
2318
			return err
2319
		}
2320
		// Any other error type is a stream error.
2321
		rl.endStreamError(cs, StreamError{
2322
			StreamID: f.StreamID,
2323
			Code:     ErrCodeProtocol,
2324
			Cause:    err,
2325
		})
2326
		return nil // return nil from process* funcs to keep conn alive
2327
	}
2328
	if res == nil {
2329
		// (nil, nil) special case. See handleResponse docs.
2330
		return nil
2331
	}
2332
	cs.resTrailer = &res.Trailer
2333
	cs.res = res
2334
	close(cs.respHeaderRecv)
2335
	if f.StreamEnded() {
2336
		rl.endStream(cs)
2337
	}
2338
	return nil
2339
}
2340

2341
// may return error types nil, or ConnectionError. Any other error value
2342
// is a StreamError of type ErrCodeProtocol. The returned error in that case
2343
// is the detail.
2344
//
2345
// As a special case, handleResponse may return (nil, nil) to skip the
2346
// frame (currently only used for 1xx responses).
2347
func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
2348
	if f.Truncated {
2349
		return nil, errResponseHeaderListSize
2350
	}
2351

2352
	status := f.PseudoValue("status")
2353
	if status == "" {
2354
		return nil, errors.New("malformed response from server: missing status pseudo header")
2355
	}
2356
	statusCode, err := strconv.Atoi(status)
2357
	if err != nil {
2358
		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2359
	}
2360

2361
	regularFields := f.RegularFields()
2362
	strs := make([]string, len(regularFields))
2363
	header := make(http.Header, len(regularFields))
2364
	res := &http.Response{
2365
		Proto:      "HTTP/2.0",
2366
		ProtoMajor: 2,
2367
		Header:     header,
2368
		StatusCode: statusCode,
2369
		Status:     status + " " + http.StatusText(statusCode),
2370
	}
2371
	for _, hf := range regularFields {
2372
		key := canonicalHeader(hf.Name)
2373
		if key == "Trailer" {
2374
			t := res.Trailer
2375
			if t == nil {
2376
				t = make(http.Header)
2377
				res.Trailer = t
2378
			}
2379
			foreachHeaderElement(hf.Value, func(v string) {
2380
				t[canonicalHeader(v)] = nil
2381
			})
2382
		} else {
2383
			vv := header[key]
2384
			if vv == nil && len(strs) > 0 {
2385
				// More than likely this will be a single-element key.
2386
				// Most headers aren't multi-valued.
2387
				// Set the capacity on strs[0] to 1, so any future append
2388
				// won't extend the slice into the other strings.
2389
				vv, strs = strs[:1:1], strs[1:]
2390
				vv[0] = hf.Value
2391
				header[key] = vv
2392
			} else {
2393
				header[key] = append(vv, hf.Value)
2394
			}
2395
		}
2396
	}
2397

2398
	if statusCode >= 100 && statusCode <= 199 {
2399
		if f.StreamEnded() {
2400
			return nil, errors.New("1xx informational response with END_STREAM flag")
2401
		}
2402
		cs.num1xx++
2403
		const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
2404
		if cs.num1xx > max1xxResponses {
2405
			return nil, errors.New("http2: too many 1xx informational responses")
2406
		}
2407
		if fn := cs.get1xxTraceFunc(); fn != nil {
2408
			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2409
				return nil, err
2410
			}
2411
		}
2412
		if statusCode == 100 {
2413
			traceGot100Continue(cs.trace)
2414
			select {
2415
			case cs.on100 <- struct{}{}:
2416
			default:
2417
			}
2418
		}
2419
		cs.pastHeaders = false // do it all again
2420
		return nil, nil
2421
	}
2422

2423
	res.ContentLength = -1
2424
	if clens := res.Header["Content-Length"]; len(clens) == 1 {
2425
		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2426
			res.ContentLength = int64(cl)
2427
		} else {
2428
			// TODO: care? unlike http/1, it won't mess up our framing, so it's
2429
			// more safe smuggling-wise to ignore.
2430
		}
2431
	} else if len(clens) > 1 {
2432
		// TODO: care? unlike http/1, it won't mess up our framing, so it's
2433
		// more safe smuggling-wise to ignore.
2434
	} else if f.StreamEnded() && !cs.isHead {
2435
		res.ContentLength = 0
2436
	}
2437

2438
	if cs.isHead {
2439
		res.Body = noBody
2440
		return res, nil
2441
	}
2442

2443
	if f.StreamEnded() {
2444
		if res.ContentLength > 0 {
2445
			res.Body = missingBody{}
2446
		} else {
2447
			res.Body = noBody
2448
		}
2449
		return res, nil
2450
	}
2451

2452
	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2453
	cs.bytesRemain = res.ContentLength
2454
	res.Body = transportResponseBody{cs}
2455

2456
	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2457
		res.Header.Del("Content-Encoding")
2458
		res.Header.Del("Content-Length")
2459
		res.ContentLength = -1
2460
		res.Body = &gzipReader{body: res.Body}
2461
		res.Uncompressed = true
2462
	}
2463
	return res, nil
2464
}
2465

2466
func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2467
	if cs.pastTrailers {
2468
		// Too many HEADERS frames for this stream.
2469
		return ConnectionError(ErrCodeProtocol)
2470
	}
2471
	cs.pastTrailers = true
2472
	if !f.StreamEnded() {
2473
		// We expect that any headers for trailers also
2474
		// has END_STREAM.
2475
		return ConnectionError(ErrCodeProtocol)
2476
	}
2477
	if len(f.PseudoFields()) > 0 {
2478
		// No pseudo header fields are defined for trailers.
2479
		// TODO: ConnectionError might be overly harsh? Check.
2480
		return ConnectionError(ErrCodeProtocol)
2481
	}
2482

2483
	trailer := make(http.Header)
2484
	for _, hf := range f.RegularFields() {
2485
		key := canonicalHeader(hf.Name)
2486
		trailer[key] = append(trailer[key], hf.Value)
2487
	}
2488
	cs.trailer = trailer
2489

2490
	rl.endStream(cs)
2491
	return nil
2492
}
2493

2494
// transportResponseBody is the concrete type of Transport.RoundTrip's
2495
// Response.Body. It is an io.ReadCloser.
2496
type transportResponseBody struct {
2497
	cs *clientStream
2498
}
2499

2500
func (b transportResponseBody) Read(p []byte) (n int, err error) {
2501
	cs := b.cs
2502
	cc := cs.cc
2503

2504
	if cs.readErr != nil {
2505
		return 0, cs.readErr
2506
	}
2507
	n, err = b.cs.bufPipe.Read(p)
2508
	if cs.bytesRemain != -1 {
2509
		if int64(n) > cs.bytesRemain {
2510
			n = int(cs.bytesRemain)
2511
			if err == nil {
2512
				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2513
				cs.abortStream(err)
2514
			}
2515
			cs.readErr = err
2516
			return int(cs.bytesRemain), err
2517
		}
2518
		cs.bytesRemain -= int64(n)
2519
		if err == io.EOF && cs.bytesRemain > 0 {
2520
			err = io.ErrUnexpectedEOF
2521
			cs.readErr = err
2522
			return n, err
2523
		}
2524
	}
2525
	if n == 0 {
2526
		// No flow control tokens to send back.
2527
		return
2528
	}
2529

2530
	cc.mu.Lock()
2531
	connAdd := cc.inflow.add(n)
2532
	var streamAdd int32
2533
	if err == nil { // No need to refresh if the stream is over or failed.
2534
		streamAdd = cs.inflow.add(n)
2535
	}
2536
	cc.mu.Unlock()
2537

2538
	if connAdd != 0 || streamAdd != 0 {
2539
		cc.wmu.Lock()
2540
		defer cc.wmu.Unlock()
2541
		if connAdd != 0 {
2542
			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2543
		}
2544
		if streamAdd != 0 {
2545
			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2546
		}
2547
		cc.bw.Flush()
2548
	}
2549
	return
2550
}
2551

2552
var errClosedResponseBody = errors.New("http2: response body closed")
2553

2554
func (b transportResponseBody) Close() error {
2555
	cs := b.cs
2556
	cc := cs.cc
2557

2558
	unread := cs.bufPipe.Len()
2559
	if unread > 0 {
2560
		cc.mu.Lock()
2561
		// Return connection-level flow control.
2562
		connAdd := cc.inflow.add(unread)
2563
		cc.mu.Unlock()
2564

2565
		// TODO(dneil): Acquiring this mutex can block indefinitely.
2566
		// Move flow control return to a goroutine?
2567
		cc.wmu.Lock()
2568
		// Return connection-level flow control.
2569
		if connAdd > 0 {
2570
			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2571
		}
2572
		cc.bw.Flush()
2573
		cc.wmu.Unlock()
2574
	}
2575

2576
	cs.bufPipe.BreakWithError(errClosedResponseBody)
2577
	cs.abortStream(errClosedResponseBody)
2578

2579
	select {
2580
	case <-cs.donec:
2581
	case <-cs.ctx.Done():
2582
		// See golang/go#49366: The net/http package can cancel the
2583
		// request context after the response body is fully read.
2584
		// Don't treat this as an error.
2585
		return nil
2586
	case <-cs.reqCancel:
2587
		return errRequestCanceled
2588
	}
2589
	return nil
2590
}
2591

2592
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2593
	cc := rl.cc
2594
	cs := rl.streamByID(f.StreamID)
2595
	data := f.Data()
2596
	if cs == nil {
2597
		cc.mu.Lock()
2598
		neverSent := cc.nextStreamID
2599
		cc.mu.Unlock()
2600
		if f.StreamID >= neverSent {
2601
			// We never asked for this.
2602
			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2603
			return ConnectionError(ErrCodeProtocol)
2604
		}
2605
		// We probably did ask for this, but canceled. Just ignore it.
2606
		// TODO: be stricter here? only silently ignore things which
2607
		// we canceled, but not things which were closed normally
2608
		// by the peer? Tough without accumulating too much state.
2609

2610
		// But at least return their flow control:
2611
		if f.Length > 0 {
2612
			cc.mu.Lock()
2613
			ok := cc.inflow.take(f.Length)
2614
			connAdd := cc.inflow.add(int(f.Length))
2615
			cc.mu.Unlock()
2616
			if !ok {
2617
				return ConnectionError(ErrCodeFlowControl)
2618
			}
2619
			if connAdd > 0 {
2620
				cc.wmu.Lock()
2621
				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2622
				cc.bw.Flush()
2623
				cc.wmu.Unlock()
2624
			}
2625
		}
2626
		return nil
2627
	}
2628
	if cs.readClosed {
2629
		cc.logf("protocol error: received DATA after END_STREAM")
2630
		rl.endStreamError(cs, StreamError{
2631
			StreamID: f.StreamID,
2632
			Code:     ErrCodeProtocol,
2633
		})
2634
		return nil
2635
	}
2636
	if !cs.firstByte {
2637
		cc.logf("protocol error: received DATA before a HEADERS frame")
2638
		rl.endStreamError(cs, StreamError{
2639
			StreamID: f.StreamID,
2640
			Code:     ErrCodeProtocol,
2641
		})
2642
		return nil
2643
	}
2644
	if f.Length > 0 {
2645
		if cs.isHead && len(data) > 0 {
2646
			cc.logf("protocol error: received DATA on a HEAD request")
2647
			rl.endStreamError(cs, StreamError{
2648
				StreamID: f.StreamID,
2649
				Code:     ErrCodeProtocol,
2650
			})
2651
			return nil
2652
		}
2653
		// Check connection-level flow control.
2654
		cc.mu.Lock()
2655
		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2656
			cc.mu.Unlock()
2657
			return ConnectionError(ErrCodeFlowControl)
2658
		}
2659
		// Return any padded flow control now, since we won't
2660
		// refund it later on body reads.
2661
		var refund int
2662
		if pad := int(f.Length) - len(data); pad > 0 {
2663
			refund += pad
2664
		}
2665

2666
		didReset := false
2667
		var err error
2668
		if len(data) > 0 {
2669
			if _, err = cs.bufPipe.Write(data); err != nil {
2670
				// Return len(data) now if the stream is already closed,
2671
				// since data will never be read.
2672
				didReset = true
2673
				refund += len(data)
2674
			}
2675
		}
2676

2677
		sendConn := cc.inflow.add(refund)
2678
		var sendStream int32
2679
		if !didReset {
2680
			sendStream = cs.inflow.add(refund)
2681
		}
2682
		cc.mu.Unlock()
2683

2684
		if sendConn > 0 || sendStream > 0 {
2685
			cc.wmu.Lock()
2686
			if sendConn > 0 {
2687
				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2688
			}
2689
			if sendStream > 0 {
2690
				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2691
			}
2692
			cc.bw.Flush()
2693
			cc.wmu.Unlock()
2694
		}
2695

2696
		if err != nil {
2697
			rl.endStreamError(cs, err)
2698
			return nil
2699
		}
2700
	}
2701

2702
	if f.StreamEnded() {
2703
		rl.endStream(cs)
2704
	}
2705
	return nil
2706
}
2707

2708
func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2709
	// TODO: check that any declared content-length matches, like
2710
	// server.go's (*stream).endStream method.
2711
	if !cs.readClosed {
2712
		cs.readClosed = true
2713
		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
2714
		// race condition: The caller can read io.EOF from Response.Body
2715
		// and close the body before we close cs.peerClosed, causing
2716
		// cleanupWriteRequest to send a RST_STREAM.
2717
		rl.cc.mu.Lock()
2718
		defer rl.cc.mu.Unlock()
2719
		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2720
		close(cs.peerClosed)
2721
	}
2722
}
2723

2724
func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2725
	cs.readAborted = true
2726
	cs.abortStream(err)
2727
}
2728

2729
func (rl *clientConnReadLoop) streamByID(id uint32) *clientStream {
2730
	rl.cc.mu.Lock()
2731
	defer rl.cc.mu.Unlock()
2732
	cs := rl.cc.streams[id]
2733
	if cs != nil && !cs.readAborted {
2734
		return cs
2735
	}
2736
	return nil
2737
}
2738

2739
func (cs *clientStream) copyTrailers() {
2740
	for k, vv := range cs.trailer {
2741
		t := cs.resTrailer
2742
		if *t == nil {
2743
			*t = make(http.Header)
2744
		}
2745
		(*t)[k] = vv
2746
	}
2747
}
2748

2749
func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2750
	cc := rl.cc
2751
	cc.t.connPool().MarkDead(cc)
2752
	if f.ErrCode != 0 {
2753
		// TODO: deal with GOAWAY more. particularly the error code
2754
		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2755
		if fn := cc.t.CountError; fn != nil {
2756
			fn("recv_goaway_" + f.ErrCode.stringToken())
2757
		}
2758
	}
2759
	cc.setGoAway(f)
2760
	return nil
2761
}
2762

2763
func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2764
	cc := rl.cc
2765
	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
2766
	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
2767
	cc.wmu.Lock()
2768
	defer cc.wmu.Unlock()
2769

2770
	if err := rl.processSettingsNoWrite(f); err != nil {
2771
		return err
2772
	}
2773
	if !f.IsAck() {
2774
		cc.fr.WriteSettingsAck()
2775
		cc.bw.Flush()
2776
	}
2777
	return nil
2778
}
2779

2780
func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2781
	cc := rl.cc
2782
	cc.mu.Lock()
2783
	defer cc.mu.Unlock()
2784

2785
	if f.IsAck() {
2786
		if cc.wantSettingsAck {
2787
			cc.wantSettingsAck = false
2788
			return nil
2789
		}
2790
		return ConnectionError(ErrCodeProtocol)
2791
	}
2792

2793
	var seenMaxConcurrentStreams bool
2794
	err := f.ForeachSetting(func(s Setting) error {
2795
		switch s.ID {
2796
		case SettingMaxFrameSize:
2797
			cc.maxFrameSize = s.Val
2798
		case SettingMaxConcurrentStreams:
2799
			cc.maxConcurrentStreams = s.Val
2800
			seenMaxConcurrentStreams = true
2801
		case SettingMaxHeaderListSize:
2802
			cc.peerMaxHeaderListSize = uint64(s.Val)
2803
		case SettingInitialWindowSize:
2804
			// Values above the maximum flow-control
2805
			// window size of 2^31-1 MUST be treated as a
2806
			// connection error (Section 5.4.1) of type
2807
			// FLOW_CONTROL_ERROR.
2808
			if s.Val > math.MaxInt32 {
2809
				return ConnectionError(ErrCodeFlowControl)
2810
			}
2811

2812
			// Adjust flow control of currently-open
2813
			// frames by the difference of the old initial
2814
			// window size and this one.
2815
			delta := int32(s.Val) - int32(cc.initialWindowSize)
2816
			for _, cs := range cc.streams {
2817
				cs.flow.add(delta)
2818
			}
2819
			cc.cond.Broadcast()
2820

2821
			cc.initialWindowSize = s.Val
2822
		case SettingHeaderTableSize:
2823
			cc.henc.SetMaxDynamicTableSize(s.Val)
2824
			cc.peerMaxHeaderTableSize = s.Val
2825
		default:
2826
			cc.vlogf("Unhandled Setting: %v", s)
2827
		}
2828
		return nil
2829
	})
2830
	if err != nil {
2831
		return err
2832
	}
2833

2834
	if !cc.seenSettings {
2835
		if !seenMaxConcurrentStreams {
2836
			// This was the servers initial SETTINGS frame and it
2837
			// didn't contain a MAX_CONCURRENT_STREAMS field so
2838
			// increase the number of concurrent streams this
2839
			// connection can establish to our default.
2840
			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2841
		}
2842
		cc.seenSettings = true
2843
	}
2844

2845
	return nil
2846
}
2847

2848
func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2849
	cc := rl.cc
2850
	cs := rl.streamByID(f.StreamID)
2851
	if f.StreamID != 0 && cs == nil {
2852
		return nil
2853
	}
2854

2855
	cc.mu.Lock()
2856
	defer cc.mu.Unlock()
2857

2858
	fl := &cc.flow
2859
	if cs != nil {
2860
		fl = &cs.flow
2861
	}
2862
	if !fl.add(int32(f.Increment)) {
2863
		return ConnectionError(ErrCodeFlowControl)
2864
	}
2865
	cc.cond.Broadcast()
2866
	return nil
2867
}
2868

2869
func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2870
	cs := rl.streamByID(f.StreamID)
2871
	if cs == nil {
2872
		// TODO: return error if server tries to RST_STREAM an idle stream
2873
		return nil
2874
	}
2875
	serr := streamError(cs.ID, f.ErrCode)
2876
	serr.Cause = errFromPeer
2877
	if f.ErrCode == ErrCodeProtocol {
2878
		rl.cc.SetDoNotReuse()
2879
	}
2880
	if fn := cs.cc.t.CountError; fn != nil {
2881
		fn("recv_rststream_" + f.ErrCode.stringToken())
2882
	}
2883
	cs.abortStream(serr)
2884

2885
	cs.bufPipe.CloseWithError(serr)
2886
	return nil
2887
}
2888

2889
// Ping sends a PING frame to the server and waits for the ack.
2890
func (cc *ClientConn) Ping(ctx context.Context) error {
2891
	c := make(chan struct{})
2892
	// Generate a random payload
2893
	var p [8]byte
2894
	for {
2895
		if _, err := rand.Read(p[:]); err != nil {
2896
			return err
2897
		}
2898
		cc.mu.Lock()
2899
		// check for dup before insert
2900
		if _, found := cc.pings[p]; !found {
2901
			cc.pings[p] = c
2902
			cc.mu.Unlock()
2903
			break
2904
		}
2905
		cc.mu.Unlock()
2906
	}
2907
	errc := make(chan error, 1)
2908
	go func() {
2909
		cc.wmu.Lock()
2910
		defer cc.wmu.Unlock()
2911
		if err := cc.fr.WritePing(false, p); err != nil {
2912
			errc <- err
2913
			return
2914
		}
2915
		if err := cc.bw.Flush(); err != nil {
2916
			errc <- err
2917
			return
2918
		}
2919
	}()
2920
	select {
2921
	case <-c:
2922
		return nil
2923
	case err := <-errc:
2924
		return err
2925
	case <-ctx.Done():
2926
		return ctx.Err()
2927
	case <-cc.readerDone:
2928
		// connection closed
2929
		return cc.readerErr
2930
	}
2931
}
2932

2933
func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
2934
	if f.IsAck() {
2935
		cc := rl.cc
2936
		cc.mu.Lock()
2937
		defer cc.mu.Unlock()
2938
		// If ack, notify listener if any
2939
		if c, ok := cc.pings[f.Data]; ok {
2940
			close(c)
2941
			delete(cc.pings, f.Data)
2942
		}
2943
		return nil
2944
	}
2945
	cc := rl.cc
2946
	cc.wmu.Lock()
2947
	defer cc.wmu.Unlock()
2948
	if err := cc.fr.WritePing(true, f.Data); err != nil {
2949
		return err
2950
	}
2951
	return cc.bw.Flush()
2952
}
2953

2954
func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
2955
	// We told the peer we don't want them.
2956
	// Spec says:
2957
	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
2958
	// setting of the peer endpoint is set to 0. An endpoint that
2959
	// has set this setting and has received acknowledgement MUST
2960
	// treat the receipt of a PUSH_PROMISE frame as a connection
2961
	// error (Section 5.4.1) of type PROTOCOL_ERROR."
2962
	return ConnectionError(ErrCodeProtocol)
2963
}
2964

2965
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
2966
	// TODO: map err to more interesting error codes, once the
2967
	// HTTP community comes up with some. But currently for
2968
	// RST_STREAM there's no equivalent to GOAWAY frame's debug
2969
	// data, and the error codes are all pretty vague ("cancel").
2970
	cc.wmu.Lock()
2971
	cc.fr.WriteRSTStream(streamID, code)
2972
	cc.bw.Flush()
2973
	cc.wmu.Unlock()
2974
}
2975

2976
var (
2977
	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
2978
	errRequestHeaderListSize  = errors.New("http2: request header list larger than peer's advertised limit")
2979
)
2980

2981
func (cc *ClientConn) logf(format string, args ...interface{}) {
2982
	cc.t.logf(format, args...)
2983
}
2984

2985
func (cc *ClientConn) vlogf(format string, args ...interface{}) {
2986
	cc.t.vlogf(format, args...)
2987
}
2988

2989
func (t *Transport) vlogf(format string, args ...interface{}) {
2990
	if VerboseLogs {
2991
		t.logf(format, args...)
2992
	}
2993
}
2994

2995
func (t *Transport) logf(format string, args ...interface{}) {
2996
	log.Printf(format, args...)
2997
}
2998

2999
var noBody io.ReadCloser = noBodyReader{}
3000

3001
type noBodyReader struct{}
3002

3003
func (noBodyReader) Close() error             { return nil }
3004
func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
3005

3006
type missingBody struct{}
3007

3008
func (missingBody) Close() error             { return nil }
3009
func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
3010

3011
func strSliceContains(ss []string, s string) bool {
3012
	for _, v := range ss {
3013
		if v == s {
3014
			return true
3015
		}
3016
	}
3017
	return false
3018
}
3019

3020
type erringRoundTripper struct{ err error }
3021

3022
func (rt erringRoundTripper) RoundTripErr() error                             { return rt.err }
3023
func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
3024

3025
// gzipReader wraps a response body so it can lazily
3026
// call gzip.NewReader on the first call to Read
3027
type gzipReader struct {
3028
	_    incomparable
3029
	body io.ReadCloser // underlying Response.Body
3030
	zr   *gzip.Reader  // lazily-initialized gzip reader
3031
	zerr error         // sticky error
3032
}
3033

3034
func (gz *gzipReader) Read(p []byte) (n int, err error) {
3035
	if gz.zerr != nil {
3036
		return 0, gz.zerr
3037
	}
3038
	if gz.zr == nil {
3039
		gz.zr, err = gzip.NewReader(gz.body)
3040
		if err != nil {
3041
			gz.zerr = err
3042
			return 0, err
3043
		}
3044
	}
3045
	return gz.zr.Read(p)
3046
}
3047

3048
func (gz *gzipReader) Close() error {
3049
	if err := gz.body.Close(); err != nil {
3050
		return err
3051
	}
3052
	gz.zerr = fs.ErrClosed
3053
	return nil
3054
}
3055

3056
type errorReader struct{ err error }
3057

3058
func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
3059

3060
// isConnectionCloseRequest reports whether req should use its own
3061
// connection for a single request and then close the connection.
3062
func isConnectionCloseRequest(req *http.Request) bool {
3063
	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3064
}
3065

3066
// registerHTTPSProtocol calls Transport.RegisterProtocol but
3067
// converting panics into errors.
3068
func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
3069
	defer func() {
3070
		if e := recover(); e != nil {
3071
			err = fmt.Errorf("%v", e)
3072
		}
3073
	}()
3074
	t.RegisterProtocol("https", rt)
3075
	return nil
3076
}
3077

3078
// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
3079
// if there's already has a cached connection to the host.
3080
// (The field is exported so it can be accessed via reflect from net/http; tested
3081
// by TestNoDialH2RoundTripperType)
3082
type noDialH2RoundTripper struct{ *Transport }
3083

3084
func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
3085
	res, err := rt.Transport.RoundTrip(req)
3086
	if isNoCachedConnError(err) {
3087
		return nil, http.ErrSkipAltProtocol
3088
	}
3089
	return res, err
3090
}
3091

3092
func (t *Transport) idleConnTimeout() time.Duration {
3093
	if t.t1 != nil {
3094
		return t.t1.IdleConnTimeout
3095
	}
3096
	return 0
3097
}
3098

3099
func traceGetConn(req *http.Request, hostPort string) {
3100
	trace := httptrace.ContextClientTrace(req.Context())
3101
	if trace == nil || trace.GetConn == nil {
3102
		return
3103
	}
3104
	trace.GetConn(hostPort)
3105
}
3106

3107
func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
3108
	trace := httptrace.ContextClientTrace(req.Context())
3109
	if trace == nil || trace.GotConn == nil {
3110
		return
3111
	}
3112
	ci := httptrace.GotConnInfo{Conn: cc.tconn}
3113
	ci.Reused = reused
3114
	cc.mu.Lock()
3115
	ci.WasIdle = len(cc.streams) == 0 && reused
3116
	if ci.WasIdle && !cc.lastActive.IsZero() {
3117
		ci.IdleTime = time.Since(cc.lastActive)
3118
	}
3119
	cc.mu.Unlock()
3120

3121
	trace.GotConn(ci)
3122
}
3123

3124
func traceWroteHeaders(trace *httptrace.ClientTrace) {
3125
	if trace != nil && trace.WroteHeaders != nil {
3126
		trace.WroteHeaders()
3127
	}
3128
}
3129

3130
func traceGot100Continue(trace *httptrace.ClientTrace) {
3131
	if trace != nil && trace.Got100Continue != nil {
3132
		trace.Got100Continue()
3133
	}
3134
}
3135

3136
func traceWait100Continue(trace *httptrace.ClientTrace) {
3137
	if trace != nil && trace.Wait100Continue != nil {
3138
		trace.Wait100Continue()
3139
	}
3140
}
3141

3142
func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3143
	if trace != nil && trace.WroteRequest != nil {
3144
		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3145
	}
3146
}
3147

3148
func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3149
	if trace != nil && trace.GotFirstResponseByte != nil {
3150
		trace.GotFirstResponseByte()
3151
	}
3152
}
3153

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.