cubefs

Форк
0
806 строк · 26.3 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
// Package transport defines and implements message oriented communication
20
// channel to complete various transactions (e.g., an RPC).  It is meant for
21
// grpc-internal usage and is not intended to be imported directly by users.
22
package transport
23

24
import (
25
	"bytes"
26
	"context"
27
	"errors"
28
	"fmt"
29
	"io"
30
	"net"
31
	"sync"
32
	"sync/atomic"
33

34
	"google.golang.org/grpc/codes"
35
	"google.golang.org/grpc/credentials"
36
	"google.golang.org/grpc/keepalive"
37
	"google.golang.org/grpc/metadata"
38
	"google.golang.org/grpc/resolver"
39
	"google.golang.org/grpc/stats"
40
	"google.golang.org/grpc/status"
41
	"google.golang.org/grpc/tap"
42
)
43

44
const logLevel = 2
45

46
type bufferPool struct {
47
	pool sync.Pool
48
}
49

50
func newBufferPool() *bufferPool {
51
	return &bufferPool{
52
		pool: sync.Pool{
53
			New: func() interface{} {
54
				return new(bytes.Buffer)
55
			},
56
		},
57
	}
58
}
59

60
func (p *bufferPool) get() *bytes.Buffer {
61
	return p.pool.Get().(*bytes.Buffer)
62
}
63

64
func (p *bufferPool) put(b *bytes.Buffer) {
65
	p.pool.Put(b)
66
}
67

68
// recvMsg represents the received msg from the transport. All transport
69
// protocol specific info has been removed.
70
type recvMsg struct {
71
	buffer *bytes.Buffer
72
	// nil: received some data
73
	// io.EOF: stream is completed. data is nil.
74
	// other non-nil error: transport failure. data is nil.
75
	err error
76
}
77

78
// recvBuffer is an unbounded channel of recvMsg structs.
79
//
80
// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
81
// holds a channel of recvMsg structs instead of objects implementing "item"
82
// interface. recvBuffer is written to much more often and using strict recvMsg
83
// structs helps avoid allocation in "recvBuffer.put"
84
type recvBuffer struct {
85
	c       chan recvMsg
86
	mu      sync.Mutex
87
	backlog []recvMsg
88
	err     error
89
}
90

91
func newRecvBuffer() *recvBuffer {
92
	b := &recvBuffer{
93
		c: make(chan recvMsg, 1),
94
	}
95
	return b
96
}
97

98
func (b *recvBuffer) put(r recvMsg) {
99
	b.mu.Lock()
100
	if b.err != nil {
101
		b.mu.Unlock()
102
		// An error had occurred earlier, don't accept more
103
		// data or errors.
104
		return
105
	}
106
	b.err = r.err
107
	if len(b.backlog) == 0 {
108
		select {
109
		case b.c <- r:
110
			b.mu.Unlock()
111
			return
112
		default:
113
		}
114
	}
115
	b.backlog = append(b.backlog, r)
116
	b.mu.Unlock()
117
}
118

119
func (b *recvBuffer) load() {
120
	b.mu.Lock()
121
	if len(b.backlog) > 0 {
122
		select {
123
		case b.c <- b.backlog[0]:
124
			b.backlog[0] = recvMsg{}
125
			b.backlog = b.backlog[1:]
126
		default:
127
		}
128
	}
129
	b.mu.Unlock()
130
}
131

132
// get returns the channel that receives a recvMsg in the buffer.
133
//
134
// Upon receipt of a recvMsg, the caller should call load to send another
135
// recvMsg onto the channel if there is any.
136
func (b *recvBuffer) get() <-chan recvMsg {
137
	return b.c
138
}
139

140
// recvBufferReader implements io.Reader interface to read the data from
141
// recvBuffer.
142
type recvBufferReader struct {
143
	closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
144
	ctx         context.Context
145
	ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
146
	recv        *recvBuffer
147
	last        *bytes.Buffer // Stores the remaining data in the previous calls.
148
	err         error
149
	freeBuffer  func(*bytes.Buffer)
150
}
151

152
// Read reads the next len(p) bytes from last. If last is drained, it tries to
153
// read additional data from recv. It blocks if there no additional data available
154
// in recv. If Read returns any non-nil error, it will continue to return that error.
155
func (r *recvBufferReader) Read(p []byte) (n int, err error) {
156
	if r.err != nil {
157
		return 0, r.err
158
	}
159
	if r.last != nil {
160
		// Read remaining data left in last call.
161
		copied, _ := r.last.Read(p)
162
		if r.last.Len() == 0 {
163
			r.freeBuffer(r.last)
164
			r.last = nil
165
		}
166
		return copied, nil
167
	}
168
	if r.closeStream != nil {
169
		n, r.err = r.readClient(p)
170
	} else {
171
		n, r.err = r.read(p)
172
	}
173
	return n, r.err
174
}
175

176
func (r *recvBufferReader) read(p []byte) (n int, err error) {
177
	select {
178
	case <-r.ctxDone:
179
		return 0, ContextErr(r.ctx.Err())
180
	case m := <-r.recv.get():
181
		return r.readAdditional(m, p)
182
	}
183
}
184

185
func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
186
	// If the context is canceled, then closes the stream with nil metadata.
187
	// closeStream writes its error parameter to r.recv as a recvMsg.
188
	// r.readAdditional acts on that message and returns the necessary error.
189
	select {
190
	case <-r.ctxDone:
191
		// Note that this adds the ctx error to the end of recv buffer, and
192
		// reads from the head. This will delay the error until recv buffer is
193
		// empty, thus will delay ctx cancellation in Recv().
194
		//
195
		// It's done this way to fix a race between ctx cancel and trailer. The
196
		// race was, stream.Recv() may return ctx error if ctxDone wins the
197
		// race, but stream.Trailer() may return a non-nil md because the stream
198
		// was not marked as done when trailer is received. This closeStream
199
		// call will mark stream as done, thus fix the race.
200
		//
201
		// TODO: delaying ctx error seems like a unnecessary side effect. What
202
		// we really want is to mark the stream as done, and return ctx error
203
		// faster.
204
		r.closeStream(ContextErr(r.ctx.Err()))
205
		m := <-r.recv.get()
206
		return r.readAdditional(m, p)
207
	case m := <-r.recv.get():
208
		return r.readAdditional(m, p)
209
	}
210
}
211

212
func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
213
	r.recv.load()
214
	if m.err != nil {
215
		return 0, m.err
216
	}
217
	copied, _ := m.buffer.Read(p)
218
	if m.buffer.Len() == 0 {
219
		r.freeBuffer(m.buffer)
220
		r.last = nil
221
	} else {
222
		r.last = m.buffer
223
	}
224
	return copied, nil
225
}
226

227
type streamState uint32
228

229
const (
230
	streamActive    streamState = iota
231
	streamWriteDone             // EndStream sent
232
	streamReadDone              // EndStream received
233
	streamDone                  // the entire stream is finished.
234
)
235

236
// Stream represents an RPC in the transport layer.
237
type Stream struct {
238
	id           uint32
239
	st           ServerTransport    // nil for client side Stream
240
	ct           *http2Client       // nil for server side Stream
241
	ctx          context.Context    // the associated context of the stream
242
	cancel       context.CancelFunc // always nil for client side Stream
243
	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
244
	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
245
	method       string             // the associated RPC method of the stream
246
	recvCompress string
247
	sendCompress string
248
	buf          *recvBuffer
249
	trReader     io.Reader
250
	fc           *inFlow
251
	wq           *writeQuota
252

253
	// Callback to state application's intentions to read data. This
254
	// is used to adjust flow control, if needed.
255
	requestRead func(int)
256

257
	headerChan       chan struct{} // closed to indicate the end of header metadata.
258
	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
259
	// headerValid indicates whether a valid header was received.  Only
260
	// meaningful after headerChan is closed (always call waitOnHeader() before
261
	// reading its value).  Not valid on server side.
262
	headerValid bool
263

264
	// hdrMu protects header and trailer metadata on the server-side.
265
	hdrMu sync.Mutex
266
	// On client side, header keeps the received header metadata.
267
	//
268
	// On server side, header keeps the header set by SetHeader(). The complete
269
	// header will merged into this after t.WriteHeader() is called.
270
	header  metadata.MD
271
	trailer metadata.MD // the key-value map of trailer metadata.
272

273
	noHeaders bool // set if the client never received headers (set only after the stream is done).
274

275
	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
276
	headerSent uint32
277

278
	state streamState
279

280
	// On client-side it is the status error received from the server.
281
	// On server-side it is unused.
282
	status *status.Status
283

284
	bytesReceived uint32 // indicates whether any bytes have been received on this stream
285
	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
286

287
	// contentSubtype is the content-subtype for requests.
288
	// this must be lowercase or the behavior is undefined.
289
	contentSubtype string
290
}
291

292
// isHeaderSent is only valid on the server-side.
293
func (s *Stream) isHeaderSent() bool {
294
	return atomic.LoadUint32(&s.headerSent) == 1
295
}
296

297
// updateHeaderSent updates headerSent and returns true
298
// if it was alreay set. It is valid only on server-side.
299
func (s *Stream) updateHeaderSent() bool {
300
	return atomic.SwapUint32(&s.headerSent, 1) == 1
301
}
302

303
func (s *Stream) swapState(st streamState) streamState {
304
	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
305
}
306

307
func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
308
	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
309
}
310

311
func (s *Stream) getState() streamState {
312
	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
313
}
314

315
func (s *Stream) waitOnHeader() {
316
	if s.headerChan == nil {
317
		// On the server headerChan is always nil since a stream originates
318
		// only after having received headers.
319
		return
320
	}
321
	select {
322
	case <-s.ctx.Done():
323
		// Close the stream to prevent headers/trailers from changing after
324
		// this function returns.
325
		s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
326
		// headerChan could possibly not be closed yet if closeStream raced
327
		// with operateHeaders; wait until it is closed explicitly here.
328
		<-s.headerChan
329
	case <-s.headerChan:
330
	}
331
}
332

333
// RecvCompress returns the compression algorithm applied to the inbound
334
// message. It is empty string if there is no compression applied.
335
func (s *Stream) RecvCompress() string {
336
	s.waitOnHeader()
337
	return s.recvCompress
338
}
339

340
// SetSendCompress sets the compression algorithm to the stream.
341
func (s *Stream) SetSendCompress(str string) {
342
	s.sendCompress = str
343
}
344

345
// Done returns a channel which is closed when it receives the final status
346
// from the server.
347
func (s *Stream) Done() <-chan struct{} {
348
	return s.done
349
}
350

351
// Header returns the header metadata of the stream.
352
//
353
// On client side, it acquires the key-value pairs of header metadata once it is
354
// available. It blocks until i) the metadata is ready or ii) there is no header
355
// metadata or iii) the stream is canceled/expired.
356
//
357
// On server side, it returns the out header after t.WriteHeader is called.  It
358
// does not block and must not be called until after WriteHeader.
359
func (s *Stream) Header() (metadata.MD, error) {
360
	if s.headerChan == nil {
361
		// On server side, return the header in stream. It will be the out
362
		// header after t.WriteHeader is called.
363
		return s.header.Copy(), nil
364
	}
365
	s.waitOnHeader()
366
	if !s.headerValid {
367
		return nil, s.status.Err()
368
	}
369
	return s.header.Copy(), nil
370
}
371

372
// TrailersOnly blocks until a header or trailers-only frame is received and
373
// then returns true if the stream was trailers-only.  If the stream ends
374
// before headers are received, returns true, nil.  Client-side only.
375
func (s *Stream) TrailersOnly() bool {
376
	s.waitOnHeader()
377
	return s.noHeaders
378
}
379

380
// Trailer returns the cached trailer metedata. Note that if it is not called
381
// after the entire stream is done, it could return an empty MD. Client
382
// side only.
383
// It can be safely read only after stream has ended that is either read
384
// or write have returned io.EOF.
385
func (s *Stream) Trailer() metadata.MD {
386
	c := s.trailer.Copy()
387
	return c
388
}
389

390
// ContentSubtype returns the content-subtype for a request. For example, a
391
// content-subtype of "proto" will result in a content-type of
392
// "application/grpc+proto". This will always be lowercase.  See
393
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
394
// more details.
395
func (s *Stream) ContentSubtype() string {
396
	return s.contentSubtype
397
}
398

399
// Context returns the context of the stream.
400
func (s *Stream) Context() context.Context {
401
	return s.ctx
402
}
403

404
// Method returns the method for the stream.
405
func (s *Stream) Method() string {
406
	return s.method
407
}
408

409
// Status returns the status received from the server.
410
// Status can be read safely only after the stream has ended,
411
// that is, after Done() is closed.
412
func (s *Stream) Status() *status.Status {
413
	return s.status
414
}
415

416
// SetHeader sets the header metadata. This can be called multiple times.
417
// Server side only.
418
// This should not be called in parallel to other data writes.
419
func (s *Stream) SetHeader(md metadata.MD) error {
420
	if md.Len() == 0 {
421
		return nil
422
	}
423
	if s.isHeaderSent() || s.getState() == streamDone {
424
		return ErrIllegalHeaderWrite
425
	}
426
	s.hdrMu.Lock()
427
	s.header = metadata.Join(s.header, md)
428
	s.hdrMu.Unlock()
429
	return nil
430
}
431

432
// SendHeader sends the given header metadata. The given metadata is
433
// combined with any metadata set by previous calls to SetHeader and
434
// then written to the transport stream.
435
func (s *Stream) SendHeader(md metadata.MD) error {
436
	return s.st.WriteHeader(s, md)
437
}
438

439
// SetTrailer sets the trailer metadata which will be sent with the RPC status
440
// by the server. This can be called multiple times. Server side only.
441
// This should not be called parallel to other data writes.
442
func (s *Stream) SetTrailer(md metadata.MD) error {
443
	if md.Len() == 0 {
444
		return nil
445
	}
446
	if s.getState() == streamDone {
447
		return ErrIllegalHeaderWrite
448
	}
449
	s.hdrMu.Lock()
450
	s.trailer = metadata.Join(s.trailer, md)
451
	s.hdrMu.Unlock()
452
	return nil
453
}
454

455
func (s *Stream) write(m recvMsg) {
456
	s.buf.put(m)
457
}
458

459
// Read reads all p bytes from the wire for this stream.
460
func (s *Stream) Read(p []byte) (n int, err error) {
461
	// Don't request a read if there was an error earlier
462
	if er := s.trReader.(*transportReader).er; er != nil {
463
		return 0, er
464
	}
465
	s.requestRead(len(p))
466
	return io.ReadFull(s.trReader, p)
467
}
468

469
// tranportReader reads all the data available for this Stream from the transport and
470
// passes them into the decoder, which converts them into a gRPC message stream.
471
// The error is io.EOF when the stream is done or another non-nil error if
472
// the stream broke.
473
type transportReader struct {
474
	reader io.Reader
475
	// The handler to control the window update procedure for both this
476
	// particular stream and the associated transport.
477
	windowHandler func(int)
478
	er            error
479
}
480

481
func (t *transportReader) Read(p []byte) (n int, err error) {
482
	n, err = t.reader.Read(p)
483
	if err != nil {
484
		t.er = err
485
		return
486
	}
487
	t.windowHandler(n)
488
	return
489
}
490

491
// BytesReceived indicates whether any bytes have been received on this stream.
492
func (s *Stream) BytesReceived() bool {
493
	return atomic.LoadUint32(&s.bytesReceived) == 1
494
}
495

496
// Unprocessed indicates whether the server did not process this stream --
497
// i.e. it sent a refused stream or GOAWAY including this stream ID.
498
func (s *Stream) Unprocessed() bool {
499
	return atomic.LoadUint32(&s.unprocessed) == 1
500
}
501

502
// GoString is implemented by Stream so context.String() won't
503
// race when printing %#v.
504
func (s *Stream) GoString() string {
505
	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
506
}
507

508
// state of transport
509
type transportState int
510

511
const (
512
	reachable transportState = iota
513
	closing
514
	draining
515
)
516

517
// ServerConfig consists of all the configurations to establish a server transport.
518
type ServerConfig struct {
519
	MaxStreams            uint32
520
	AuthInfo              credentials.AuthInfo
521
	InTapHandle           tap.ServerInHandle
522
	StatsHandler          stats.Handler
523
	KeepaliveParams       keepalive.ServerParameters
524
	KeepalivePolicy       keepalive.EnforcementPolicy
525
	InitialWindowSize     int32
526
	InitialConnWindowSize int32
527
	WriteBufferSize       int
528
	ReadBufferSize        int
529
	ChannelzParentID      int64
530
	MaxHeaderListSize     *uint32
531
	HeaderTableSize       *uint32
532
}
533

534
// NewServerTransport creates a ServerTransport with conn or non-nil error
535
// if it fails.
536
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
537
	return newHTTP2Server(conn, config)
538
}
539

540
// ConnectOptions covers all relevant options for communicating with the server.
541
type ConnectOptions struct {
542
	// UserAgent is the application user agent.
543
	UserAgent string
544
	// Dialer specifies how to dial a network address.
545
	Dialer func(context.Context, string) (net.Conn, error)
546
	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
547
	FailOnNonTempDialError bool
548
	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
549
	PerRPCCredentials []credentials.PerRPCCredentials
550
	// TransportCredentials stores the Authenticator required to setup a client
551
	// connection. Only one of TransportCredentials and CredsBundle is non-nil.
552
	TransportCredentials credentials.TransportCredentials
553
	// CredsBundle is the credentials bundle to be used. Only one of
554
	// TransportCredentials and CredsBundle is non-nil.
555
	CredsBundle credentials.Bundle
556
	// KeepaliveParams stores the keepalive parameters.
557
	KeepaliveParams keepalive.ClientParameters
558
	// StatsHandler stores the handler for stats.
559
	StatsHandler stats.Handler
560
	// InitialWindowSize sets the initial window size for a stream.
561
	InitialWindowSize int32
562
	// InitialConnWindowSize sets the initial window size for a connection.
563
	InitialConnWindowSize int32
564
	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
565
	WriteBufferSize int
566
	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
567
	ReadBufferSize int
568
	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
569
	ChannelzParentID int64
570
	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
571
	MaxHeaderListSize *uint32
572
	// UseProxy specifies if a proxy should be used.
573
	UseProxy bool
574
}
575

576
// NewClientTransport establishes the transport with the required ConnectOptions
577
// and returns it to the caller.
578
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
579
	return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
580
}
581

582
// Options provides additional hints and information for message
583
// transmission.
584
type Options struct {
585
	// Last indicates whether this write is the last piece for
586
	// this stream.
587
	Last bool
588
}
589

590
// CallHdr carries the information of a particular RPC.
591
type CallHdr struct {
592
	// Host specifies the peer's host.
593
	Host string
594

595
	// Method specifies the operation to perform.
596
	Method string
597

598
	// SendCompress specifies the compression algorithm applied on
599
	// outbound message.
600
	SendCompress string
601

602
	// Creds specifies credentials.PerRPCCredentials for a call.
603
	Creds credentials.PerRPCCredentials
604

605
	// ContentSubtype specifies the content-subtype for a request. For example, a
606
	// content-subtype of "proto" will result in a content-type of
607
	// "application/grpc+proto". The value of ContentSubtype must be all
608
	// lowercase, otherwise the behavior is undefined. See
609
	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
610
	// for more details.
611
	ContentSubtype string
612

613
	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
614
}
615

616
// ClientTransport is the common interface for all gRPC client-side transport
617
// implementations.
618
type ClientTransport interface {
619
	// Close tears down this transport. Once it returns, the transport
620
	// should not be accessed any more. The caller must make sure this
621
	// is called only once.
622
	Close() error
623

624
	// GracefulClose starts to tear down the transport: the transport will stop
625
	// accepting new RPCs and NewStream will return error. Once all streams are
626
	// finished, the transport will close.
627
	//
628
	// It does not block.
629
	GracefulClose()
630

631
	// Write sends the data for the given stream. A nil stream indicates
632
	// the write is to be performed on the transport as a whole.
633
	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
634

635
	// NewStream creates a Stream for an RPC.
636
	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
637

638
	// CloseStream clears the footprint of a stream when the stream is
639
	// not needed any more. The err indicates the error incurred when
640
	// CloseStream is called. Must be called when a stream is finished
641
	// unless the associated transport is closing.
642
	CloseStream(stream *Stream, err error)
643

644
	// Error returns a channel that is closed when some I/O error
645
	// happens. Typically the caller should have a goroutine to monitor
646
	// this in order to take action (e.g., close the current transport
647
	// and create a new one) in error case. It should not return nil
648
	// once the transport is initiated.
649
	Error() <-chan struct{}
650

651
	// GoAway returns a channel that is closed when ClientTransport
652
	// receives the draining signal from the server (e.g., GOAWAY frame in
653
	// HTTP/2).
654
	GoAway() <-chan struct{}
655

656
	// GetGoAwayReason returns the reason why GoAway frame was received.
657
	GetGoAwayReason() GoAwayReason
658

659
	// RemoteAddr returns the remote network address.
660
	RemoteAddr() net.Addr
661

662
	// IncrMsgSent increments the number of message sent through this transport.
663
	IncrMsgSent()
664

665
	// IncrMsgRecv increments the number of message received through this transport.
666
	IncrMsgRecv()
667
}
668

669
// ServerTransport is the common interface for all gRPC server-side transport
670
// implementations.
671
//
672
// Methods may be called concurrently from multiple goroutines, but
673
// Write methods for a given Stream will be called serially.
674
type ServerTransport interface {
675
	// HandleStreams receives incoming streams using the given handler.
676
	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
677

678
	// WriteHeader sends the header metadata for the given stream.
679
	// WriteHeader may not be called on all streams.
680
	WriteHeader(s *Stream, md metadata.MD) error
681

682
	// Write sends the data for the given stream.
683
	// Write may not be called on all streams.
684
	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
685

686
	// WriteStatus sends the status of a stream to the client.  WriteStatus is
687
	// the final call made on a stream and always occurs.
688
	WriteStatus(s *Stream, st *status.Status) error
689

690
	// Close tears down the transport. Once it is called, the transport
691
	// should not be accessed any more. All the pending streams and their
692
	// handlers will be terminated asynchronously.
693
	Close() error
694

695
	// RemoteAddr returns the remote network address.
696
	RemoteAddr() net.Addr
697

698
	// Drain notifies the client this ServerTransport stops accepting new RPCs.
699
	Drain()
700

701
	// IncrMsgSent increments the number of message sent through this transport.
702
	IncrMsgSent()
703

704
	// IncrMsgRecv increments the number of message received through this transport.
705
	IncrMsgRecv()
706
}
707

708
// connectionErrorf creates an ConnectionError with the specified error description.
709
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
710
	return ConnectionError{
711
		Desc: fmt.Sprintf(format, a...),
712
		temp: temp,
713
		err:  e,
714
	}
715
}
716

717
// ConnectionError is an error that results in the termination of the
718
// entire connection and the retry of all the active streams.
719
type ConnectionError struct {
720
	Desc string
721
	temp bool
722
	err  error
723
}
724

725
func (e ConnectionError) Error() string {
726
	return fmt.Sprintf("connection error: desc = %q", e.Desc)
727
}
728

729
// Temporary indicates if this connection error is temporary or fatal.
730
func (e ConnectionError) Temporary() bool {
731
	return e.temp
732
}
733

734
// Origin returns the original error of this connection error.
735
func (e ConnectionError) Origin() error {
736
	// Never return nil error here.
737
	// If the original error is nil, return itself.
738
	if e.err == nil {
739
		return e
740
	}
741
	return e.err
742
}
743

744
var (
745
	// ErrConnClosing indicates that the transport is closing.
746
	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
747
	// errStreamDrain indicates that the stream is rejected because the
748
	// connection is draining. This could be caused by goaway or balancer
749
	// removing the address.
750
	errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
751
	// errStreamDone is returned from write at the client side to indiacte application
752
	// layer of an error.
753
	errStreamDone = errors.New("the stream is done")
754
	// StatusGoAway indicates that the server sent a GOAWAY that included this
755
	// stream's ID in unprocessed RPCs.
756
	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
757
)
758

759
// GoAwayReason contains the reason for the GoAway frame received.
760
type GoAwayReason uint8
761

762
const (
763
	// GoAwayInvalid indicates that no GoAway frame is received.
764
	GoAwayInvalid GoAwayReason = 0
765
	// GoAwayNoReason is the default value when GoAway frame is received.
766
	GoAwayNoReason GoAwayReason = 1
767
	// GoAwayTooManyPings indicates that a GoAway frame with
768
	// ErrCodeEnhanceYourCalm was received and that the debug data said
769
	// "too_many_pings".
770
	GoAwayTooManyPings GoAwayReason = 2
771
)
772

773
// channelzData is used to store channelz related data for http2Client and http2Server.
774
// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
775
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
776
// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
777
type channelzData struct {
778
	kpCount int64
779
	// The number of streams that have started, including already finished ones.
780
	streamsStarted int64
781
	// Client side: The number of streams that have ended successfully by receiving
782
	// EoS bit set frame from server.
783
	// Server side: The number of streams that have ended successfully by sending
784
	// frame with EoS bit set.
785
	streamsSucceeded int64
786
	streamsFailed    int64
787
	// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
788
	// instead of time.Time since it's more costly to atomically update time.Time variable than int64
789
	// variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
790
	lastStreamCreatedTime int64
791
	msgSent               int64
792
	msgRecv               int64
793
	lastMsgSentTime       int64
794
	lastMsgRecvTime       int64
795
}
796

797
// ContextErr converts the error from context package into a status error.
798
func ContextErr(err error) error {
799
	switch err {
800
	case context.DeadlineExceeded:
801
		return status.Error(codes.DeadlineExceeded, err.Error())
802
	case context.Canceled:
803
		return status.Error(codes.Canceled, err.Error())
804
	}
805
	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
806
}
807

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.