1
// Copyright 2014 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
13
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15
// underlying buffer is an interface. (io.Pipe is always unbuffered)
18
c sync.Cond // c.L lazily initialized to &p.mu
19
b pipeBuffer // nil when done reading
20
unread int // bytes unread when done
21
err error // read error once empty. non-nil means closed.
22
breakErr error // immediate read error (caller doesn't see rest of b)
23
donec chan struct{} // closed on error
24
readFn func() // optional code to run in Read before error
27
type pipeBuffer interface {
33
// setBuffer initializes the pipe buffer.
34
// It has no effect if the pipe is already closed.
35
func (p *pipe) setBuffer(b pipeBuffer) {
38
if p.err != nil || p.breakErr != nil {
44
func (p *pipe) Len() int {
53
// Read waits until data is available and copies bytes
54
// from the buffer into p.
55
func (p *pipe) Read(d []byte) (n int, err error) {
62
if p.breakErr != nil {
65
if p.b != nil && p.b.Len() > 0 {
70
p.readFn() // e.g. copy trailers
71
p.readFn = nil // not sticky like p.err
81
errClosedPipeWrite = errors.New("write on closed buffer")
82
errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
85
// Write copies bytes from p into the buffer and wakes a reader.
86
// It is an error to write more data than the buffer can hold.
87
func (p *pipe) Write(d []byte) (n int, err error) {
94
if p.err != nil || p.breakErr != nil {
95
return 0, errClosedPipeWrite
97
// pipe.setBuffer is never invoked, leaving the buffer uninitialized.
98
// We shouldn't try to write to an uninitialized pipe,
99
// but returning an error is better than panicking.
101
return 0, errUninitializedPipeWrite
106
// CloseWithError causes the next Read (waking up a current blocked
107
// Read if needed) to return the provided err after all data has been
110
// The error must be non-nil.
111
func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
113
// BreakWithError causes the next Read (waking up a current blocked
114
// Read if needed) to return the provided err immediately, without
115
// waiting for unread data.
116
func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
118
// closeWithErrorAndCode is like CloseWithError but also sets some code to run
119
// in the caller's goroutine before returning the error.
120
func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
122
func (p *pipe) closeWithError(dst *error, err error, fn func()) {
124
panic("err must be non-nil")
133
// Already been done.
137
if dst == &p.breakErr {
139
p.unread += p.b.Len()
147
// requires p.mu be held.
148
func (p *pipe) closeDoneLocked() {
152
// Close if unclosed. This isn't racy since we always
153
// hold p.mu while closing.
161
// Err returns the error (if any) first set by BreakWithError or CloseWithError.
162
func (p *pipe) Err() error {
165
if p.breakErr != nil {
171
// Done returns a channel which is closed if and when this pipe is closed
172
// with CloseWithError.
173
func (p *pipe) Done() <-chan struct{} {
177
p.donec = make(chan struct{})
178
if p.err != nil || p.breakErr != nil {
179
// Already hit an error.