cubefs

Форк
0
1527 строк · 47.3 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package transport
20

21
import (
22
	"context"
23
	"fmt"
24
	"io"
25
	"math"
26
	"net"
27
	"strconv"
28
	"strings"
29
	"sync"
30
	"sync/atomic"
31
	"time"
32

33
	"golang.org/x/net/http2"
34
	"golang.org/x/net/http2/hpack"
35
	"google.golang.org/grpc/internal/grpcutil"
36
	imetadata "google.golang.org/grpc/internal/metadata"
37
	"google.golang.org/grpc/internal/transport/networktype"
38

39
	"google.golang.org/grpc/codes"
40
	"google.golang.org/grpc/credentials"
41
	"google.golang.org/grpc/internal"
42
	"google.golang.org/grpc/internal/channelz"
43
	"google.golang.org/grpc/internal/syscall"
44
	"google.golang.org/grpc/keepalive"
45
	"google.golang.org/grpc/metadata"
46
	"google.golang.org/grpc/peer"
47
	"google.golang.org/grpc/resolver"
48
	"google.golang.org/grpc/stats"
49
	"google.golang.org/grpc/status"
50
)
51

52
// clientConnectionCounter counts the number of connections a client has
53
// initiated (equal to the number of http2Clients created). Must be accessed
54
// atomically.
55
var clientConnectionCounter uint64
56

57
// http2Client implements the ClientTransport interface with HTTP2.
58
type http2Client struct {
59
	lastRead   int64 // Keep this field 64-bit aligned. Accessed atomically.
60
	ctx        context.Context
61
	cancel     context.CancelFunc
62
	ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
63
	userAgent  string
64
	md         metadata.MD
65
	conn       net.Conn // underlying communication channel
66
	loopy      *loopyWriter
67
	remoteAddr net.Addr
68
	localAddr  net.Addr
69
	authInfo   credentials.AuthInfo // auth info about the connection
70

71
	readerDone chan struct{} // sync point to enable testing.
72
	writerDone chan struct{} // sync point to enable testing.
73
	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
74
	// that the server sent GoAway on this transport.
75
	goAway chan struct{}
76

77
	framer *framer
78
	// controlBuf delivers all the control related tasks (e.g., window
79
	// updates, reset streams, and various settings) to the controller.
80
	controlBuf *controlBuffer
81
	fc         *trInFlow
82
	// The scheme used: https if TLS is on, http otherwise.
83
	scheme string
84

85
	isSecure bool
86

87
	perRPCCreds []credentials.PerRPCCredentials
88

89
	kp               keepalive.ClientParameters
90
	keepaliveEnabled bool
91

92
	statsHandler stats.Handler
93

94
	initialWindowSize int32
95

96
	// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
97
	maxSendHeaderListSize *uint32
98

99
	bdpEst *bdpEstimator
100
	// onPrefaceReceipt is a callback that client transport calls upon
101
	// receiving server preface to signal that a succefull HTTP2
102
	// connection was established.
103
	onPrefaceReceipt func()
104

105
	maxConcurrentStreams  uint32
106
	streamQuota           int64
107
	streamsQuotaAvailable chan struct{}
108
	waitingStreams        uint32
109
	nextID                uint32
110

111
	mu            sync.Mutex // guard the following variables
112
	state         transportState
113
	activeStreams map[uint32]*Stream
114
	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
115
	prevGoAwayID uint32
116
	// goAwayReason records the http2.ErrCode and debug data received with the
117
	// GoAway frame.
118
	goAwayReason GoAwayReason
119
	// A condition variable used to signal when the keepalive goroutine should
120
	// go dormant. The condition for dormancy is based on the number of active
121
	// streams and the `PermitWithoutStream` keepalive client parameter. And
122
	// since the number of active streams is guarded by the above mutex, we use
123
	// the same for this condition variable as well.
124
	kpDormancyCond *sync.Cond
125
	// A boolean to track whether the keepalive goroutine is dormant or not.
126
	// This is checked before attempting to signal the above condition
127
	// variable.
128
	kpDormant bool
129

130
	// Fields below are for channelz metric collection.
131
	channelzID int64 // channelz unique identification number
132
	czData     *channelzData
133

134
	onGoAway func(GoAwayReason)
135
	onClose  func()
136

137
	bufferPool *bufferPool
138

139
	connectionID uint64
140
}
141

142
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
143
	address := addr.Addr
144
	networkType, ok := networktype.Get(addr)
145
	if fn != nil {
146
		if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
147
			// For backward compatibility, if the user dialed "unix:///path",
148
			// the passthrough resolver would be used and the user's custom
149
			// dialer would see "unix:///path". Since the unix resolver is used
150
			// and the address is now "/path", prepend "unix://" so the user's
151
			// custom dialer sees the same address.
152
			return fn(ctx, "unix://"+address)
153
		}
154
		return fn(ctx, address)
155
	}
156
	if !ok {
157
		networkType, address = parseDialTarget(address)
158
	}
159
	if networkType == "tcp" && useProxy {
160
		return proxyDial(ctx, address, grpcUA)
161
	}
162
	return (&net.Dialer{}).DialContext(ctx, networkType, address)
163
}
164

165
func isTemporary(err error) bool {
166
	switch err := err.(type) {
167
	case interface {
168
		Temporary() bool
169
	}:
170
		return err.Temporary()
171
	case interface {
172
		Timeout() bool
173
	}:
174
		// Timeouts may be resolved upon retry, and are thus treated as
175
		// temporary.
176
		return err.Timeout()
177
	}
178
	return true
179
}
180

181
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
182
// and starts to receive messages on it. Non-nil error returns if construction
183
// fails.
184
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
185
	scheme := "http"
186
	ctx, cancel := context.WithCancel(ctx)
187
	defer func() {
188
		if err != nil {
189
			cancel()
190
		}
191
	}()
192

193
	conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
194
	if err != nil {
195
		if opts.FailOnNonTempDialError {
196
			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
197
		}
198
		return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
199
	}
200
	// Any further errors will close the underlying connection
201
	defer func(conn net.Conn) {
202
		if err != nil {
203
			conn.Close()
204
		}
205
	}(conn)
206
	kp := opts.KeepaliveParams
207
	// Validate keepalive parameters.
208
	if kp.Time == 0 {
209
		kp.Time = defaultClientKeepaliveTime
210
	}
211
	if kp.Timeout == 0 {
212
		kp.Timeout = defaultClientKeepaliveTimeout
213
	}
214
	keepaliveEnabled := false
215
	if kp.Time != infinity {
216
		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
217
			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
218
		}
219
		keepaliveEnabled = true
220
	}
221
	var (
222
		isSecure bool
223
		authInfo credentials.AuthInfo
224
	)
225
	transportCreds := opts.TransportCredentials
226
	perRPCCreds := opts.PerRPCCredentials
227

228
	if b := opts.CredsBundle; b != nil {
229
		if t := b.TransportCredentials(); t != nil {
230
			transportCreds = t
231
		}
232
		if t := b.PerRPCCredentials(); t != nil {
233
			perRPCCreds = append(perRPCCreds, t)
234
		}
235
	}
236
	if transportCreds != nil {
237
		// gRPC, resolver, balancer etc. can specify arbitrary data in the
238
		// Attributes field of resolver.Address, which is shoved into connectCtx
239
		// and passed to the credential handshaker. This makes it possible for
240
		// address specific arbitrary data to reach the credential handshaker.
241
		contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context)
242
		connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
243
		conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
244
		if err != nil {
245
			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
246
		}
247
		for _, cd := range perRPCCreds {
248
			if cd.RequireTransportSecurity() {
249
				if ci, ok := authInfo.(interface {
250
					GetCommonAuthInfo() credentials.CommonAuthInfo
251
				}); ok {
252
					secLevel := ci.GetCommonAuthInfo().SecurityLevel
253
					if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
254
						return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
255
					}
256
				}
257
			}
258
		}
259
		isSecure = true
260
		if transportCreds.Info().SecurityProtocol == "tls" {
261
			scheme = "https"
262
		}
263
	}
264
	dynamicWindow := true
265
	icwz := int32(initialWindowSize)
266
	if opts.InitialConnWindowSize >= defaultWindowSize {
267
		icwz = opts.InitialConnWindowSize
268
		dynamicWindow = false
269
	}
270
	writeBufSize := opts.WriteBufferSize
271
	readBufSize := opts.ReadBufferSize
272
	maxHeaderListSize := defaultClientMaxHeaderListSize
273
	if opts.MaxHeaderListSize != nil {
274
		maxHeaderListSize = *opts.MaxHeaderListSize
275
	}
276
	t := &http2Client{
277
		ctx:                   ctx,
278
		ctxDone:               ctx.Done(), // Cache Done chan.
279
		cancel:                cancel,
280
		userAgent:             opts.UserAgent,
281
		conn:                  conn,
282
		remoteAddr:            conn.RemoteAddr(),
283
		localAddr:             conn.LocalAddr(),
284
		authInfo:              authInfo,
285
		readerDone:            make(chan struct{}),
286
		writerDone:            make(chan struct{}),
287
		goAway:                make(chan struct{}),
288
		framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
289
		fc:                    &trInFlow{limit: uint32(icwz)},
290
		scheme:                scheme,
291
		activeStreams:         make(map[uint32]*Stream),
292
		isSecure:              isSecure,
293
		perRPCCreds:           perRPCCreds,
294
		kp:                    kp,
295
		statsHandler:          opts.StatsHandler,
296
		initialWindowSize:     initialWindowSize,
297
		onPrefaceReceipt:      onPrefaceReceipt,
298
		nextID:                1,
299
		maxConcurrentStreams:  defaultMaxStreamsClient,
300
		streamQuota:           defaultMaxStreamsClient,
301
		streamsQuotaAvailable: make(chan struct{}, 1),
302
		czData:                new(channelzData),
303
		onGoAway:              onGoAway,
304
		onClose:               onClose,
305
		keepaliveEnabled:      keepaliveEnabled,
306
		bufferPool:            newBufferPool(),
307
	}
308

309
	if md, ok := addr.Metadata.(*metadata.MD); ok {
310
		t.md = *md
311
	} else if md := imetadata.Get(addr); md != nil {
312
		t.md = md
313
	}
314
	t.controlBuf = newControlBuffer(t.ctxDone)
315
	if opts.InitialWindowSize >= defaultWindowSize {
316
		t.initialWindowSize = opts.InitialWindowSize
317
		dynamicWindow = false
318
	}
319
	if dynamicWindow {
320
		t.bdpEst = &bdpEstimator{
321
			bdp:               initialWindowSize,
322
			updateFlowControl: t.updateFlowControl,
323
		}
324
	}
325
	if t.statsHandler != nil {
326
		t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
327
			RemoteAddr: t.remoteAddr,
328
			LocalAddr:  t.localAddr,
329
		})
330
		connBegin := &stats.ConnBegin{
331
			Client: true,
332
		}
333
		t.statsHandler.HandleConn(t.ctx, connBegin)
334
	}
335
	if channelz.IsOn() {
336
		t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
337
	}
338
	if t.keepaliveEnabled {
339
		t.kpDormancyCond = sync.NewCond(&t.mu)
340
		go t.keepalive()
341
	}
342
	// Start the reader goroutine for incoming message. Each transport has
343
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
344
	// dispatches the frame to the corresponding stream entity.
345
	go t.reader()
346

347
	// Send connection preface to server.
348
	n, err := t.conn.Write(clientPreface)
349
	if err != nil {
350
		t.Close()
351
		return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
352
	}
353
	if n != len(clientPreface) {
354
		t.Close()
355
		return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
356
	}
357
	var ss []http2.Setting
358

359
	if t.initialWindowSize != defaultWindowSize {
360
		ss = append(ss, http2.Setting{
361
			ID:  http2.SettingInitialWindowSize,
362
			Val: uint32(t.initialWindowSize),
363
		})
364
	}
365
	if opts.MaxHeaderListSize != nil {
366
		ss = append(ss, http2.Setting{
367
			ID:  http2.SettingMaxHeaderListSize,
368
			Val: *opts.MaxHeaderListSize,
369
		})
370
	}
371
	err = t.framer.fr.WriteSettings(ss...)
372
	if err != nil {
373
		t.Close()
374
		return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
375
	}
376
	// Adjust the connection flow control window if needed.
377
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
378
		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
379
			t.Close()
380
			return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
381
		}
382
	}
383

384
	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
385

386
	if err := t.framer.writer.Flush(); err != nil {
387
		return nil, err
388
	}
389
	go func() {
390
		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
391
		err := t.loopy.run()
392
		if err != nil {
393
			if logger.V(logLevel) {
394
				logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
395
			}
396
		}
397
		// If it's a connection error, let reader goroutine handle it
398
		// since there might be data in the buffers.
399
		if _, ok := err.(net.Error); !ok {
400
			t.conn.Close()
401
		}
402
		close(t.writerDone)
403
	}()
404
	return t, nil
405
}
406

407
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
408
	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
409
	s := &Stream{
410
		ct:             t,
411
		done:           make(chan struct{}),
412
		method:         callHdr.Method,
413
		sendCompress:   callHdr.SendCompress,
414
		buf:            newRecvBuffer(),
415
		headerChan:     make(chan struct{}),
416
		contentSubtype: callHdr.ContentSubtype,
417
	}
418
	s.wq = newWriteQuota(defaultWriteQuota, s.done)
419
	s.requestRead = func(n int) {
420
		t.adjustWindow(s, uint32(n))
421
	}
422
	// The client side stream context should have exactly the same life cycle with the user provided context.
423
	// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
424
	// So we use the original context here instead of creating a copy.
425
	s.ctx = ctx
426
	s.trReader = &transportReader{
427
		reader: &recvBufferReader{
428
			ctx:     s.ctx,
429
			ctxDone: s.ctx.Done(),
430
			recv:    s.buf,
431
			closeStream: func(err error) {
432
				t.CloseStream(s, err)
433
			},
434
			freeBuffer: t.bufferPool.put,
435
		},
436
		windowHandler: func(n int) {
437
			t.updateWindow(s, uint32(n))
438
		},
439
	}
440
	return s
441
}
442

443
func (t *http2Client) getPeer() *peer.Peer {
444
	return &peer.Peer{
445
		Addr:     t.remoteAddr,
446
		AuthInfo: t.authInfo,
447
	}
448
}
449

450
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
451
	aud := t.createAudience(callHdr)
452
	ri := credentials.RequestInfo{
453
		Method:   callHdr.Method,
454
		AuthInfo: t.authInfo,
455
	}
456
	ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
457
	authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
458
	if err != nil {
459
		return nil, err
460
	}
461
	callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
462
	if err != nil {
463
		return nil, err
464
	}
465
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
466
	// first and create a slice of that exact size.
467
	// Make the slice of certain predictable size to reduce allocations made by append.
468
	hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
469
	hfLen += len(authData) + len(callAuthData)
470
	headerFields := make([]hpack.HeaderField, 0, hfLen)
471
	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
472
	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
473
	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
474
	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
475
	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
476
	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
477
	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
478
	if callHdr.PreviousAttempts > 0 {
479
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
480
	}
481

482
	if callHdr.SendCompress != "" {
483
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
484
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
485
	}
486
	if dl, ok := ctx.Deadline(); ok {
487
		// Send out timeout regardless its value. The server can detect timeout context by itself.
488
		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
489
		timeout := time.Until(dl)
490
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
491
	}
492
	for k, v := range authData {
493
		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
494
	}
495
	for k, v := range callAuthData {
496
		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
497
	}
498
	if b := stats.OutgoingTags(ctx); b != nil {
499
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
500
	}
501
	if b := stats.OutgoingTrace(ctx); b != nil {
502
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
503
	}
504

505
	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
506
		var k string
507
		for k, vv := range md {
508
			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
509
			if isReservedHeader(k) {
510
				continue
511
			}
512
			for _, v := range vv {
513
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
514
			}
515
		}
516
		for _, vv := range added {
517
			for i, v := range vv {
518
				if i%2 == 0 {
519
					k = strings.ToLower(v)
520
					continue
521
				}
522
				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
523
				if isReservedHeader(k) {
524
					continue
525
				}
526
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
527
			}
528
		}
529
	}
530
	for k, vv := range t.md {
531
		if isReservedHeader(k) {
532
			continue
533
		}
534
		for _, v := range vv {
535
			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
536
		}
537
	}
538
	return headerFields, nil
539
}
540

541
func (t *http2Client) createAudience(callHdr *CallHdr) string {
542
	// Create an audience string only if needed.
543
	if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
544
		return ""
545
	}
546
	// Construct URI required to get auth request metadata.
547
	// Omit port if it is the default one.
548
	host := strings.TrimSuffix(callHdr.Host, ":443")
549
	pos := strings.LastIndex(callHdr.Method, "/")
550
	if pos == -1 {
551
		pos = len(callHdr.Method)
552
	}
553
	return "https://" + host + callHdr.Method[:pos]
554
}
555

556
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
557
	if len(t.perRPCCreds) == 0 {
558
		return nil, nil
559
	}
560
	authData := map[string]string{}
561
	for _, c := range t.perRPCCreds {
562
		data, err := c.GetRequestMetadata(ctx, audience)
563
		if err != nil {
564
			if _, ok := status.FromError(err); ok {
565
				return nil, err
566
			}
567

568
			return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
569
		}
570
		for k, v := range data {
571
			// Capital header names are illegal in HTTP/2.
572
			k = strings.ToLower(k)
573
			authData[k] = v
574
		}
575
	}
576
	return authData, nil
577
}
578

579
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
580
	var callAuthData map[string]string
581
	// Check if credentials.PerRPCCredentials were provided via call options.
582
	// Note: if these credentials are provided both via dial options and call
583
	// options, then both sets of credentials will be applied.
584
	if callCreds := callHdr.Creds; callCreds != nil {
585
		if callCreds.RequireTransportSecurity() {
586
			ri, _ := credentials.RequestInfoFromContext(ctx)
587
			if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
588
				return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
589
			}
590
		}
591
		data, err := callCreds.GetRequestMetadata(ctx, audience)
592
		if err != nil {
593
			return nil, status.Errorf(codes.Internal, "transport: %v", err)
594
		}
595
		callAuthData = make(map[string]string, len(data))
596
		for k, v := range data {
597
			// Capital header names are illegal in HTTP/2
598
			k = strings.ToLower(k)
599
			callAuthData[k] = v
600
		}
601
	}
602
	return callAuthData, nil
603
}
604

605
// PerformedIOError wraps an error to indicate IO may have been performed
606
// before the error occurred.
607
type PerformedIOError struct {
608
	Err error
609
}
610

611
// Error implements error.
612
func (p PerformedIOError) Error() string {
613
	return p.Err.Error()
614
}
615

616
// NewStream creates a stream and registers it into the transport as "active"
617
// streams.
618
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
619
	ctx = peer.NewContext(ctx, t.getPeer())
620
	headerFields, err := t.createHeaderFields(ctx, callHdr)
621
	if err != nil {
622
		// We may have performed I/O in the per-RPC creds callback, so do not
623
		// allow transparent retry.
624
		return nil, PerformedIOError{err}
625
	}
626
	s := t.newStream(ctx, callHdr)
627
	cleanup := func(err error) {
628
		if s.swapState(streamDone) == streamDone {
629
			// If it was already done, return.
630
			return
631
		}
632
		// The stream was unprocessed by the server.
633
		atomic.StoreUint32(&s.unprocessed, 1)
634
		s.write(recvMsg{err: err})
635
		close(s.done)
636
		// If headerChan isn't closed, then close it.
637
		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
638
			close(s.headerChan)
639
		}
640
	}
641
	hdr := &headerFrame{
642
		hf:        headerFields,
643
		endStream: false,
644
		initStream: func(id uint32) error {
645
			t.mu.Lock()
646
			if state := t.state; state != reachable {
647
				t.mu.Unlock()
648
				// Do a quick cleanup.
649
				err := error(errStreamDrain)
650
				if state == closing {
651
					err = ErrConnClosing
652
				}
653
				cleanup(err)
654
				return err
655
			}
656
			t.activeStreams[id] = s
657
			if channelz.IsOn() {
658
				atomic.AddInt64(&t.czData.streamsStarted, 1)
659
				atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
660
			}
661
			// If the keepalive goroutine has gone dormant, wake it up.
662
			if t.kpDormant {
663
				t.kpDormancyCond.Signal()
664
			}
665
			t.mu.Unlock()
666
			return nil
667
		},
668
		onOrphaned: cleanup,
669
		wq:         s.wq,
670
	}
671
	firstTry := true
672
	var ch chan struct{}
673
	checkForStreamQuota := func(it interface{}) bool {
674
		if t.streamQuota <= 0 { // Can go negative if server decreases it.
675
			if firstTry {
676
				t.waitingStreams++
677
			}
678
			ch = t.streamsQuotaAvailable
679
			return false
680
		}
681
		if !firstTry {
682
			t.waitingStreams--
683
		}
684
		t.streamQuota--
685
		h := it.(*headerFrame)
686
		h.streamID = t.nextID
687
		t.nextID += 2
688
		s.id = h.streamID
689
		s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
690
		if t.streamQuota > 0 && t.waitingStreams > 0 {
691
			select {
692
			case t.streamsQuotaAvailable <- struct{}{}:
693
			default:
694
			}
695
		}
696
		return true
697
	}
698
	var hdrListSizeErr error
699
	checkForHeaderListSize := func(it interface{}) bool {
700
		if t.maxSendHeaderListSize == nil {
701
			return true
702
		}
703
		hdrFrame := it.(*headerFrame)
704
		var sz int64
705
		for _, f := range hdrFrame.hf {
706
			if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
707
				hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
708
				return false
709
			}
710
		}
711
		return true
712
	}
713
	for {
714
		success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
715
			if !checkForStreamQuota(it) {
716
				return false
717
			}
718
			if !checkForHeaderListSize(it) {
719
				return false
720
			}
721
			return true
722
		}, hdr)
723
		if err != nil {
724
			return nil, err
725
		}
726
		if success {
727
			break
728
		}
729
		if hdrListSizeErr != nil {
730
			return nil, hdrListSizeErr
731
		}
732
		firstTry = false
733
		select {
734
		case <-ch:
735
		case <-s.ctx.Done():
736
			return nil, ContextErr(s.ctx.Err())
737
		case <-t.goAway:
738
			return nil, errStreamDrain
739
		case <-t.ctx.Done():
740
			return nil, ErrConnClosing
741
		}
742
	}
743
	if t.statsHandler != nil {
744
		header, ok := metadata.FromOutgoingContext(ctx)
745
		if ok {
746
			header.Set("user-agent", t.userAgent)
747
		} else {
748
			header = metadata.Pairs("user-agent", t.userAgent)
749
		}
750
		// Note: The header fields are compressed with hpack after this call returns.
751
		// No WireLength field is set here.
752
		outHeader := &stats.OutHeader{
753
			Client:      true,
754
			FullMethod:  callHdr.Method,
755
			RemoteAddr:  t.remoteAddr,
756
			LocalAddr:   t.localAddr,
757
			Compression: callHdr.SendCompress,
758
			Header:      header,
759
		}
760
		t.statsHandler.HandleRPC(s.ctx, outHeader)
761
	}
762
	return s, nil
763
}
764

765
// CloseStream clears the footprint of a stream when the stream is not needed any more.
766
// This must not be executed in reader's goroutine.
767
func (t *http2Client) CloseStream(s *Stream, err error) {
768
	var (
769
		rst     bool
770
		rstCode http2.ErrCode
771
	)
772
	if err != nil {
773
		rst = true
774
		rstCode = http2.ErrCodeCancel
775
	}
776
	t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
777
}
778

779
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
780
	// Set stream status to done.
781
	if s.swapState(streamDone) == streamDone {
782
		// If it was already done, return.  If multiple closeStream calls
783
		// happen simultaneously, wait for the first to finish.
784
		<-s.done
785
		return
786
	}
787
	// status and trailers can be updated here without any synchronization because the stream goroutine will
788
	// only read it after it sees an io.EOF error from read or write and we'll write those errors
789
	// only after updating this.
790
	s.status = st
791
	if len(mdata) > 0 {
792
		s.trailer = mdata
793
	}
794
	if err != nil {
795
		// This will unblock reads eventually.
796
		s.write(recvMsg{err: err})
797
	}
798
	// If headerChan isn't closed, then close it.
799
	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
800
		s.noHeaders = true
801
		close(s.headerChan)
802
	}
803
	cleanup := &cleanupStream{
804
		streamID: s.id,
805
		onWrite: func() {
806
			t.mu.Lock()
807
			if t.activeStreams != nil {
808
				delete(t.activeStreams, s.id)
809
			}
810
			t.mu.Unlock()
811
			if channelz.IsOn() {
812
				if eosReceived {
813
					atomic.AddInt64(&t.czData.streamsSucceeded, 1)
814
				} else {
815
					atomic.AddInt64(&t.czData.streamsFailed, 1)
816
				}
817
			}
818
		},
819
		rst:     rst,
820
		rstCode: rstCode,
821
	}
822
	addBackStreamQuota := func(interface{}) bool {
823
		t.streamQuota++
824
		if t.streamQuota > 0 && t.waitingStreams > 0 {
825
			select {
826
			case t.streamsQuotaAvailable <- struct{}{}:
827
			default:
828
			}
829
		}
830
		return true
831
	}
832
	t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
833
	// This will unblock write.
834
	close(s.done)
835
}
836

837
// Close kicks off the shutdown process of the transport. This should be called
838
// only once on a transport. Once it is called, the transport should not be
839
// accessed any more.
840
//
841
// This method blocks until the addrConn that initiated this transport is
842
// re-connected. This happens because t.onClose() begins reconnect logic at the
843
// addrConn level and blocks until the addrConn is successfully connected.
844
func (t *http2Client) Close() error {
845
	t.mu.Lock()
846
	// Make sure we only Close once.
847
	if t.state == closing {
848
		t.mu.Unlock()
849
		return nil
850
	}
851
	// Call t.onClose before setting the state to closing to prevent the client
852
	// from attempting to create new streams ASAP.
853
	t.onClose()
854
	t.state = closing
855
	streams := t.activeStreams
856
	t.activeStreams = nil
857
	if t.kpDormant {
858
		// If the keepalive goroutine is blocked on this condition variable, we
859
		// should unblock it so that the goroutine eventually exits.
860
		t.kpDormancyCond.Signal()
861
	}
862
	t.mu.Unlock()
863
	t.controlBuf.finish()
864
	t.cancel()
865
	err := t.conn.Close()
866
	if channelz.IsOn() {
867
		channelz.RemoveEntry(t.channelzID)
868
	}
869
	// Notify all active streams.
870
	for _, s := range streams {
871
		t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
872
	}
873
	if t.statsHandler != nil {
874
		connEnd := &stats.ConnEnd{
875
			Client: true,
876
		}
877
		t.statsHandler.HandleConn(t.ctx, connEnd)
878
	}
879
	return err
880
}
881

882
// GracefulClose sets the state to draining, which prevents new streams from
883
// being created and causes the transport to be closed when the last active
884
// stream is closed.  If there are no active streams, the transport is closed
885
// immediately.  This does nothing if the transport is already draining or
886
// closing.
887
func (t *http2Client) GracefulClose() {
888
	t.mu.Lock()
889
	// Make sure we move to draining only from active.
890
	if t.state == draining || t.state == closing {
891
		t.mu.Unlock()
892
		return
893
	}
894
	t.state = draining
895
	active := len(t.activeStreams)
896
	t.mu.Unlock()
897
	if active == 0 {
898
		t.Close()
899
		return
900
	}
901
	t.controlBuf.put(&incomingGoAway{})
902
}
903

904
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
905
// should proceed only if Write returns nil.
906
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
907
	if opts.Last {
908
		// If it's the last message, update stream state.
909
		if !s.compareAndSwapState(streamActive, streamWriteDone) {
910
			return errStreamDone
911
		}
912
	} else if s.getState() != streamActive {
913
		return errStreamDone
914
	}
915
	df := &dataFrame{
916
		streamID:  s.id,
917
		endStream: opts.Last,
918
		h:         hdr,
919
		d:         data,
920
	}
921
	if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
922
		if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
923
			return err
924
		}
925
	}
926
	return t.controlBuf.put(df)
927
}
928

929
func (t *http2Client) getStream(f http2.Frame) *Stream {
930
	t.mu.Lock()
931
	s := t.activeStreams[f.Header().StreamID]
932
	t.mu.Unlock()
933
	return s
934
}
935

936
// adjustWindow sends out extra window update over the initial window size
937
// of stream if the application is requesting data larger in size than
938
// the window.
939
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
940
	if w := s.fc.maybeAdjust(n); w > 0 {
941
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
942
	}
943
}
944

945
// updateWindow adjusts the inbound quota for the stream.
946
// Window updates will be sent out when the cumulative quota
947
// exceeds the corresponding threshold.
948
func (t *http2Client) updateWindow(s *Stream, n uint32) {
949
	if w := s.fc.onRead(n); w > 0 {
950
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
951
	}
952
}
953

954
// updateFlowControl updates the incoming flow control windows
955
// for the transport and the stream based on the current bdp
956
// estimation.
957
func (t *http2Client) updateFlowControl(n uint32) {
958
	t.mu.Lock()
959
	for _, s := range t.activeStreams {
960
		s.fc.newLimit(n)
961
	}
962
	t.mu.Unlock()
963
	updateIWS := func(interface{}) bool {
964
		t.initialWindowSize = int32(n)
965
		return true
966
	}
967
	t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
968
	t.controlBuf.put(&outgoingSettings{
969
		ss: []http2.Setting{
970
			{
971
				ID:  http2.SettingInitialWindowSize,
972
				Val: n,
973
			},
974
		},
975
	})
976
}
977

978
func (t *http2Client) handleData(f *http2.DataFrame) {
979
	size := f.Header().Length
980
	var sendBDPPing bool
981
	if t.bdpEst != nil {
982
		sendBDPPing = t.bdpEst.add(size)
983
	}
984
	// Decouple connection's flow control from application's read.
985
	// An update on connection's flow control should not depend on
986
	// whether user application has read the data or not. Such a
987
	// restriction is already imposed on the stream's flow control,
988
	// and therefore the sender will be blocked anyways.
989
	// Decoupling the connection flow control will prevent other
990
	// active(fast) streams from starving in presence of slow or
991
	// inactive streams.
992
	//
993
	if w := t.fc.onData(size); w > 0 {
994
		t.controlBuf.put(&outgoingWindowUpdate{
995
			streamID:  0,
996
			increment: w,
997
		})
998
	}
999
	if sendBDPPing {
1000
		// Avoid excessive ping detection (e.g. in an L7 proxy)
1001
		// by sending a window update prior to the BDP ping.
1002

1003
		if w := t.fc.reset(); w > 0 {
1004
			t.controlBuf.put(&outgoingWindowUpdate{
1005
				streamID:  0,
1006
				increment: w,
1007
			})
1008
		}
1009

1010
		t.controlBuf.put(bdpPing)
1011
	}
1012
	// Select the right stream to dispatch.
1013
	s := t.getStream(f)
1014
	if s == nil {
1015
		return
1016
	}
1017
	if size > 0 {
1018
		if err := s.fc.onData(size); err != nil {
1019
			t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
1020
			return
1021
		}
1022
		if f.Header().Flags.Has(http2.FlagDataPadded) {
1023
			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1024
				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
1025
			}
1026
		}
1027
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
1028
		// guarantee f.Data() is consumed before the arrival of next frame.
1029
		// Can this copy be eliminated?
1030
		if len(f.Data()) > 0 {
1031
			buffer := t.bufferPool.get()
1032
			buffer.Reset()
1033
			buffer.Write(f.Data())
1034
			s.write(recvMsg{buffer: buffer})
1035
		}
1036
	}
1037
	// The server has closed the stream without sending trailers.  Record that
1038
	// the read direction is closed, and set the status appropriately.
1039
	if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
1040
		t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
1041
	}
1042
}
1043

1044
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
1045
	s := t.getStream(f)
1046
	if s == nil {
1047
		return
1048
	}
1049
	if f.ErrCode == http2.ErrCodeRefusedStream {
1050
		// The stream was unprocessed by the server.
1051
		atomic.StoreUint32(&s.unprocessed, 1)
1052
	}
1053
	statusCode, ok := http2ErrConvTab[f.ErrCode]
1054
	if !ok {
1055
		if logger.V(logLevel) {
1056
			logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
1057
		}
1058
		statusCode = codes.Unknown
1059
	}
1060
	if statusCode == codes.Canceled {
1061
		if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
1062
			// Our deadline was already exceeded, and that was likely the cause
1063
			// of this cancelation.  Alter the status code accordingly.
1064
			statusCode = codes.DeadlineExceeded
1065
		}
1066
	}
1067
	t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
1068
}
1069

1070
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
1071
	if f.IsAck() {
1072
		return
1073
	}
1074
	var maxStreams *uint32
1075
	var ss []http2.Setting
1076
	var updateFuncs []func()
1077
	f.ForeachSetting(func(s http2.Setting) error {
1078
		switch s.ID {
1079
		case http2.SettingMaxConcurrentStreams:
1080
			maxStreams = new(uint32)
1081
			*maxStreams = s.Val
1082
		case http2.SettingMaxHeaderListSize:
1083
			updateFuncs = append(updateFuncs, func() {
1084
				t.maxSendHeaderListSize = new(uint32)
1085
				*t.maxSendHeaderListSize = s.Val
1086
			})
1087
		default:
1088
			ss = append(ss, s)
1089
		}
1090
		return nil
1091
	})
1092
	if isFirst && maxStreams == nil {
1093
		maxStreams = new(uint32)
1094
		*maxStreams = math.MaxUint32
1095
	}
1096
	sf := &incomingSettings{
1097
		ss: ss,
1098
	}
1099
	if maxStreams != nil {
1100
		updateStreamQuota := func() {
1101
			delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1102
			t.maxConcurrentStreams = *maxStreams
1103
			t.streamQuota += delta
1104
			if delta > 0 && t.waitingStreams > 0 {
1105
				close(t.streamsQuotaAvailable) // wake all of them up.
1106
				t.streamsQuotaAvailable = make(chan struct{}, 1)
1107
			}
1108
		}
1109
		updateFuncs = append(updateFuncs, updateStreamQuota)
1110
	}
1111
	t.controlBuf.executeAndPut(func(interface{}) bool {
1112
		for _, f := range updateFuncs {
1113
			f()
1114
		}
1115
		return true
1116
	}, sf)
1117
}
1118

1119
func (t *http2Client) handlePing(f *http2.PingFrame) {
1120
	if f.IsAck() {
1121
		// Maybe it's a BDP ping.
1122
		if t.bdpEst != nil {
1123
			t.bdpEst.calculate(f.Data)
1124
		}
1125
		return
1126
	}
1127
	pingAck := &ping{ack: true}
1128
	copy(pingAck.data[:], f.Data[:])
1129
	t.controlBuf.put(pingAck)
1130
}
1131

1132
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1133
	t.mu.Lock()
1134
	if t.state == closing {
1135
		t.mu.Unlock()
1136
		return
1137
	}
1138
	if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1139
		if logger.V(logLevel) {
1140
			logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1141
		}
1142
	}
1143
	id := f.LastStreamID
1144
	if id > 0 && id%2 != 1 {
1145
		t.mu.Unlock()
1146
		t.Close()
1147
		return
1148
	}
1149
	// A client can receive multiple GoAways from the server (see
1150
	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
1151
	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1152
	// sent after an RTT delay with the ID of the last stream the server will
1153
	// process.
1154
	//
1155
	// Therefore, when we get the first GoAway we don't necessarily close any
1156
	// streams. While in case of second GoAway we close all streams created after
1157
	// the GoAwayId. This way streams that were in-flight while the GoAway from
1158
	// server was being sent don't get killed.
1159
	select {
1160
	case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1161
		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
1162
		if id > t.prevGoAwayID {
1163
			t.mu.Unlock()
1164
			t.Close()
1165
			return
1166
		}
1167
	default:
1168
		t.setGoAwayReason(f)
1169
		close(t.goAway)
1170
		t.controlBuf.put(&incomingGoAway{})
1171
		// Notify the clientconn about the GOAWAY before we set the state to
1172
		// draining, to allow the client to stop attempting to create streams
1173
		// before disallowing new streams on this connection.
1174
		t.onGoAway(t.goAwayReason)
1175
		t.state = draining
1176
	}
1177
	// All streams with IDs greater than the GoAwayId
1178
	// and smaller than the previous GoAway ID should be killed.
1179
	upperLimit := t.prevGoAwayID
1180
	if upperLimit == 0 { // This is the first GoAway Frame.
1181
		upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1182
	}
1183
	for streamID, stream := range t.activeStreams {
1184
		if streamID > id && streamID <= upperLimit {
1185
			// The stream was unprocessed by the server.
1186
			atomic.StoreUint32(&stream.unprocessed, 1)
1187
			t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1188
		}
1189
	}
1190
	t.prevGoAwayID = id
1191
	active := len(t.activeStreams)
1192
	t.mu.Unlock()
1193
	if active == 0 {
1194
		t.Close()
1195
	}
1196
}
1197

1198
// setGoAwayReason sets the value of t.goAwayReason based
1199
// on the GoAway frame received.
1200
// It expects a lock on transport's mutext to be held by
1201
// the caller.
1202
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1203
	t.goAwayReason = GoAwayNoReason
1204
	switch f.ErrCode {
1205
	case http2.ErrCodeEnhanceYourCalm:
1206
		if string(f.DebugData()) == "too_many_pings" {
1207
			t.goAwayReason = GoAwayTooManyPings
1208
		}
1209
	}
1210
}
1211

1212
func (t *http2Client) GetGoAwayReason() GoAwayReason {
1213
	t.mu.Lock()
1214
	defer t.mu.Unlock()
1215
	return t.goAwayReason
1216
}
1217

1218
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1219
	t.controlBuf.put(&incomingWindowUpdate{
1220
		streamID:  f.Header().StreamID,
1221
		increment: f.Increment,
1222
	})
1223
}
1224

1225
// operateHeaders takes action on the decoded headers.
1226
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1227
	s := t.getStream(frame)
1228
	if s == nil {
1229
		return
1230
	}
1231
	endStream := frame.StreamEnded()
1232
	atomic.StoreUint32(&s.bytesReceived, 1)
1233
	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1234

1235
	if !initialHeader && !endStream {
1236
		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1237
		st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1238
		t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1239
		return
1240
	}
1241

1242
	state := &decodeState{}
1243
	// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
1244
	state.data.isGRPC = !initialHeader
1245
	if h2code, err := state.decodeHeader(frame); err != nil {
1246
		t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream)
1247
		return
1248
	}
1249

1250
	isHeader := false
1251
	defer func() {
1252
		if t.statsHandler != nil {
1253
			if isHeader {
1254
				inHeader := &stats.InHeader{
1255
					Client:      true,
1256
					WireLength:  int(frame.Header().Length),
1257
					Header:      s.header.Copy(),
1258
					Compression: s.recvCompress,
1259
				}
1260
				t.statsHandler.HandleRPC(s.ctx, inHeader)
1261
			} else {
1262
				inTrailer := &stats.InTrailer{
1263
					Client:     true,
1264
					WireLength: int(frame.Header().Length),
1265
					Trailer:    s.trailer.Copy(),
1266
				}
1267
				t.statsHandler.HandleRPC(s.ctx, inTrailer)
1268
			}
1269
		}
1270
	}()
1271

1272
	// If headerChan hasn't been closed yet
1273
	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1274
		s.headerValid = true
1275
		if !endStream {
1276
			// HEADERS frame block carries a Response-Headers.
1277
			isHeader = true
1278
			// These values can be set without any synchronization because
1279
			// stream goroutine will read it only after seeing a closed
1280
			// headerChan which we'll close after setting this.
1281
			s.recvCompress = state.data.encoding
1282
			if len(state.data.mdata) > 0 {
1283
				s.header = state.data.mdata
1284
			}
1285
		} else {
1286
			// HEADERS frame block carries a Trailers-Only.
1287
			s.noHeaders = true
1288
		}
1289
		close(s.headerChan)
1290
	}
1291

1292
	if !endStream {
1293
		return
1294
	}
1295

1296
	// if client received END_STREAM from server while stream was still active, send RST_STREAM
1297
	rst := s.getState() == streamActive
1298
	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
1299
}
1300

1301
// reader runs as a separate goroutine in charge of reading data from network
1302
// connection.
1303
//
1304
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1305
// optimal.
1306
// TODO(zhaoq): Check the validity of the incoming frame sequence.
1307
func (t *http2Client) reader() {
1308
	defer close(t.readerDone)
1309
	// Check the validity of server preface.
1310
	frame, err := t.framer.fr.ReadFrame()
1311
	if err != nil {
1312
		t.Close() // this kicks off resetTransport, so must be last before return
1313
		return
1314
	}
1315
	t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1316
	if t.keepaliveEnabled {
1317
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1318
	}
1319
	sf, ok := frame.(*http2.SettingsFrame)
1320
	if !ok {
1321
		t.Close() // this kicks off resetTransport, so must be last before return
1322
		return
1323
	}
1324
	t.onPrefaceReceipt()
1325
	t.handleSettings(sf, true)
1326

1327
	// loop to keep reading incoming messages on this transport.
1328
	for {
1329
		t.controlBuf.throttle()
1330
		frame, err := t.framer.fr.ReadFrame()
1331
		if t.keepaliveEnabled {
1332
			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1333
		}
1334
		if err != nil {
1335
			// Abort an active stream if the http2.Framer returns a
1336
			// http2.StreamError. This can happen only if the server's response
1337
			// is malformed http2.
1338
			if se, ok := err.(http2.StreamError); ok {
1339
				t.mu.Lock()
1340
				s := t.activeStreams[se.StreamID]
1341
				t.mu.Unlock()
1342
				if s != nil {
1343
					// use error detail to provide better err message
1344
					code := http2ErrConvTab[se.Code]
1345
					errorDetail := t.framer.fr.ErrorDetail()
1346
					var msg string
1347
					if errorDetail != nil {
1348
						msg = errorDetail.Error()
1349
					} else {
1350
						msg = "received invalid frame"
1351
					}
1352
					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1353
				}
1354
				continue
1355
			} else {
1356
				// Transport error.
1357
				t.Close()
1358
				return
1359
			}
1360
		}
1361
		switch frame := frame.(type) {
1362
		case *http2.MetaHeadersFrame:
1363
			t.operateHeaders(frame)
1364
		case *http2.DataFrame:
1365
			t.handleData(frame)
1366
		case *http2.RSTStreamFrame:
1367
			t.handleRSTStream(frame)
1368
		case *http2.SettingsFrame:
1369
			t.handleSettings(frame, false)
1370
		case *http2.PingFrame:
1371
			t.handlePing(frame)
1372
		case *http2.GoAwayFrame:
1373
			t.handleGoAway(frame)
1374
		case *http2.WindowUpdateFrame:
1375
			t.handleWindowUpdate(frame)
1376
		default:
1377
			if logger.V(logLevel) {
1378
				logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1379
			}
1380
		}
1381
	}
1382
}
1383

1384
func minTime(a, b time.Duration) time.Duration {
1385
	if a < b {
1386
		return a
1387
	}
1388
	return b
1389
}
1390

1391
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1392
func (t *http2Client) keepalive() {
1393
	p := &ping{data: [8]byte{}}
1394
	// True iff a ping has been sent, and no data has been received since then.
1395
	outstandingPing := false
1396
	// Amount of time remaining before which we should receive an ACK for the
1397
	// last sent ping.
1398
	timeoutLeft := time.Duration(0)
1399
	// Records the last value of t.lastRead before we go block on the timer.
1400
	// This is required to check for read activity since then.
1401
	prevNano := time.Now().UnixNano()
1402
	timer := time.NewTimer(t.kp.Time)
1403
	for {
1404
		select {
1405
		case <-timer.C:
1406
			lastRead := atomic.LoadInt64(&t.lastRead)
1407
			if lastRead > prevNano {
1408
				// There has been read activity since the last time we were here.
1409
				outstandingPing = false
1410
				// Next timer should fire at kp.Time seconds from lastRead time.
1411
				timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1412
				prevNano = lastRead
1413
				continue
1414
			}
1415
			if outstandingPing && timeoutLeft <= 0 {
1416
				t.Close()
1417
				return
1418
			}
1419
			t.mu.Lock()
1420
			if t.state == closing {
1421
				// If the transport is closing, we should exit from the
1422
				// keepalive goroutine here. If not, we could have a race
1423
				// between the call to Signal() from Close() and the call to
1424
				// Wait() here, whereby the keepalive goroutine ends up
1425
				// blocking on the condition variable which will never be
1426
				// signalled again.
1427
				t.mu.Unlock()
1428
				return
1429
			}
1430
			if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1431
				// If a ping was sent out previously (because there were active
1432
				// streams at that point) which wasn't acked and its timeout
1433
				// hadn't fired, but we got here and are about to go dormant,
1434
				// we should make sure that we unconditionally send a ping once
1435
				// we awaken.
1436
				outstandingPing = false
1437
				t.kpDormant = true
1438
				t.kpDormancyCond.Wait()
1439
			}
1440
			t.kpDormant = false
1441
			t.mu.Unlock()
1442

1443
			// We get here either because we were dormant and a new stream was
1444
			// created which unblocked the Wait() call, or because the
1445
			// keepalive timer expired. In both cases, we need to send a ping.
1446
			if !outstandingPing {
1447
				if channelz.IsOn() {
1448
					atomic.AddInt64(&t.czData.kpCount, 1)
1449
				}
1450
				t.controlBuf.put(p)
1451
				timeoutLeft = t.kp.Timeout
1452
				outstandingPing = true
1453
			}
1454
			// The amount of time to sleep here is the minimum of kp.Time and
1455
			// timeoutLeft. This will ensure that we wait only for kp.Time
1456
			// before sending out the next ping (for cases where the ping is
1457
			// acked).
1458
			sleepDuration := minTime(t.kp.Time, timeoutLeft)
1459
			timeoutLeft -= sleepDuration
1460
			timer.Reset(sleepDuration)
1461
		case <-t.ctx.Done():
1462
			if !timer.Stop() {
1463
				<-timer.C
1464
			}
1465
			return
1466
		}
1467
	}
1468
}
1469

1470
func (t *http2Client) Error() <-chan struct{} {
1471
	return t.ctx.Done()
1472
}
1473

1474
func (t *http2Client) GoAway() <-chan struct{} {
1475
	return t.goAway
1476
}
1477

1478
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1479
	s := channelz.SocketInternalMetric{
1480
		StreamsStarted:                  atomic.LoadInt64(&t.czData.streamsStarted),
1481
		StreamsSucceeded:                atomic.LoadInt64(&t.czData.streamsSucceeded),
1482
		StreamsFailed:                   atomic.LoadInt64(&t.czData.streamsFailed),
1483
		MessagesSent:                    atomic.LoadInt64(&t.czData.msgSent),
1484
		MessagesReceived:                atomic.LoadInt64(&t.czData.msgRecv),
1485
		KeepAlivesSent:                  atomic.LoadInt64(&t.czData.kpCount),
1486
		LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1487
		LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1488
		LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1489
		LocalFlowControlWindow:          int64(t.fc.getSize()),
1490
		SocketOptions:                   channelz.GetSocketOption(t.conn),
1491
		LocalAddr:                       t.localAddr,
1492
		RemoteAddr:                      t.remoteAddr,
1493
		// RemoteName :
1494
	}
1495
	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1496
		s.Security = au.GetSecurityValue()
1497
	}
1498
	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1499
	return &s
1500
}
1501

1502
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1503

1504
func (t *http2Client) IncrMsgSent() {
1505
	atomic.AddInt64(&t.czData.msgSent, 1)
1506
	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1507
}
1508

1509
func (t *http2Client) IncrMsgRecv() {
1510
	atomic.AddInt64(&t.czData.msgRecv, 1)
1511
	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1512
}
1513

1514
func (t *http2Client) getOutFlowWindow() int64 {
1515
	resp := make(chan uint32, 1)
1516
	timer := time.NewTimer(time.Second)
1517
	defer timer.Stop()
1518
	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1519
	select {
1520
	case sz := <-resp:
1521
		return int64(sz)
1522
	case <-t.ctxDone:
1523
		return -1
1524
	case <-timer.C:
1525
		return -2
1526
	}
1527
}
1528

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.