podman

Форк
0
184 строки · 4.4 Кб
1
// Copyright 2014 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
package http2
6

7
import (
8
	"errors"
9
	"io"
10
	"sync"
11
)
12

13
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15
// underlying buffer is an interface. (io.Pipe is always unbuffered)
16
type pipe struct {
17
	mu       sync.Mutex
18
	c        sync.Cond     // c.L lazily initialized to &p.mu
19
	b        pipeBuffer    // nil when done reading
20
	unread   int           // bytes unread when done
21
	err      error         // read error once empty. non-nil means closed.
22
	breakErr error         // immediate read error (caller doesn't see rest of b)
23
	donec    chan struct{} // closed on error
24
	readFn   func()        // optional code to run in Read before error
25
}
26

27
type pipeBuffer interface {
28
	Len() int
29
	io.Writer
30
	io.Reader
31
}
32

33
// setBuffer initializes the pipe buffer.
34
// It has no effect if the pipe is already closed.
35
func (p *pipe) setBuffer(b pipeBuffer) {
36
	p.mu.Lock()
37
	defer p.mu.Unlock()
38
	if p.err != nil || p.breakErr != nil {
39
		return
40
	}
41
	p.b = b
42
}
43

44
func (p *pipe) Len() int {
45
	p.mu.Lock()
46
	defer p.mu.Unlock()
47
	if p.b == nil {
48
		return p.unread
49
	}
50
	return p.b.Len()
51
}
52

53
// Read waits until data is available and copies bytes
54
// from the buffer into p.
55
func (p *pipe) Read(d []byte) (n int, err error) {
56
	p.mu.Lock()
57
	defer p.mu.Unlock()
58
	if p.c.L == nil {
59
		p.c.L = &p.mu
60
	}
61
	for {
62
		if p.breakErr != nil {
63
			return 0, p.breakErr
64
		}
65
		if p.b != nil && p.b.Len() > 0 {
66
			return p.b.Read(d)
67
		}
68
		if p.err != nil {
69
			if p.readFn != nil {
70
				p.readFn()     // e.g. copy trailers
71
				p.readFn = nil // not sticky like p.err
72
			}
73
			p.b = nil
74
			return 0, p.err
75
		}
76
		p.c.Wait()
77
	}
78
}
79

80
var (
81
	errClosedPipeWrite        = errors.New("write on closed buffer")
82
	errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
83
)
84

85
// Write copies bytes from p into the buffer and wakes a reader.
86
// It is an error to write more data than the buffer can hold.
87
func (p *pipe) Write(d []byte) (n int, err error) {
88
	p.mu.Lock()
89
	defer p.mu.Unlock()
90
	if p.c.L == nil {
91
		p.c.L = &p.mu
92
	}
93
	defer p.c.Signal()
94
	if p.err != nil || p.breakErr != nil {
95
		return 0, errClosedPipeWrite
96
	}
97
	// pipe.setBuffer is never invoked, leaving the buffer uninitialized.
98
	// We shouldn't try to write to an uninitialized pipe,
99
	// but returning an error is better than panicking.
100
	if p.b == nil {
101
		return 0, errUninitializedPipeWrite
102
	}
103
	return p.b.Write(d)
104
}
105

106
// CloseWithError causes the next Read (waking up a current blocked
107
// Read if needed) to return the provided err after all data has been
108
// read.
109
//
110
// The error must be non-nil.
111
func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
112

113
// BreakWithError causes the next Read (waking up a current blocked
114
// Read if needed) to return the provided err immediately, without
115
// waiting for unread data.
116
func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
117

118
// closeWithErrorAndCode is like CloseWithError but also sets some code to run
119
// in the caller's goroutine before returning the error.
120
func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
121

122
func (p *pipe) closeWithError(dst *error, err error, fn func()) {
123
	if err == nil {
124
		panic("err must be non-nil")
125
	}
126
	p.mu.Lock()
127
	defer p.mu.Unlock()
128
	if p.c.L == nil {
129
		p.c.L = &p.mu
130
	}
131
	defer p.c.Signal()
132
	if *dst != nil {
133
		// Already been done.
134
		return
135
	}
136
	p.readFn = fn
137
	if dst == &p.breakErr {
138
		if p.b != nil {
139
			p.unread += p.b.Len()
140
		}
141
		p.b = nil
142
	}
143
	*dst = err
144
	p.closeDoneLocked()
145
}
146

147
// requires p.mu be held.
148
func (p *pipe) closeDoneLocked() {
149
	if p.donec == nil {
150
		return
151
	}
152
	// Close if unclosed. This isn't racy since we always
153
	// hold p.mu while closing.
154
	select {
155
	case <-p.donec:
156
	default:
157
		close(p.donec)
158
	}
159
}
160

161
// Err returns the error (if any) first set by BreakWithError or CloseWithError.
162
func (p *pipe) Err() error {
163
	p.mu.Lock()
164
	defer p.mu.Unlock()
165
	if p.breakErr != nil {
166
		return p.breakErr
167
	}
168
	return p.err
169
}
170

171
// Done returns a channel which is closed if and when this pipe is closed
172
// with CloseWithError.
173
func (p *pipe) Done() <-chan struct{} {
174
	p.mu.Lock()
175
	defer p.mu.Unlock()
176
	if p.donec == nil {
177
		p.donec = make(chan struct{})
178
		if p.err != nil || p.breakErr != nil {
179
			// Already hit an error.
180
			p.closeDoneLocked()
181
		}
182
	}
183
	return p.donec
184
}
185

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.