cubefs

Форк
0
217 строк · 5.6 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package transport
20

21
import (
22
	"fmt"
23
	"math"
24
	"sync"
25
	"sync/atomic"
26
)
27

28
// writeQuota is a soft limit on the amount of data a stream can
29
// schedule before some of it is written out.
30
type writeQuota struct {
31
	quota int32
32
	// get waits on read from when quota goes less than or equal to zero.
33
	// replenish writes on it when quota goes positive again.
34
	ch chan struct{}
35
	// done is triggered in error case.
36
	done <-chan struct{}
37
	// replenish is called by loopyWriter to give quota back to.
38
	// It is implemented as a field so that it can be updated
39
	// by tests.
40
	replenish func(n int)
41
}
42

43
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
44
	w := &writeQuota{
45
		quota: sz,
46
		ch:    make(chan struct{}, 1),
47
		done:  done,
48
	}
49
	w.replenish = w.realReplenish
50
	return w
51
}
52

53
func (w *writeQuota) get(sz int32) error {
54
	for {
55
		if atomic.LoadInt32(&w.quota) > 0 {
56
			atomic.AddInt32(&w.quota, -sz)
57
			return nil
58
		}
59
		select {
60
		case <-w.ch:
61
			continue
62
		case <-w.done:
63
			return errStreamDone
64
		}
65
	}
66
}
67

68
func (w *writeQuota) realReplenish(n int) {
69
	sz := int32(n)
70
	a := atomic.AddInt32(&w.quota, sz)
71
	b := a - sz
72
	if b <= 0 && a > 0 {
73
		select {
74
		case w.ch <- struct{}{}:
75
		default:
76
		}
77
	}
78
}
79

80
type trInFlow struct {
81
	limit               uint32
82
	unacked             uint32
83
	effectiveWindowSize uint32
84
}
85

86
func (f *trInFlow) newLimit(n uint32) uint32 {
87
	d := n - f.limit
88
	f.limit = n
89
	f.updateEffectiveWindowSize()
90
	return d
91
}
92

93
func (f *trInFlow) onData(n uint32) uint32 {
94
	f.unacked += n
95
	if f.unacked >= f.limit/4 {
96
		w := f.unacked
97
		f.unacked = 0
98
		f.updateEffectiveWindowSize()
99
		return w
100
	}
101
	f.updateEffectiveWindowSize()
102
	return 0
103
}
104

105
func (f *trInFlow) reset() uint32 {
106
	w := f.unacked
107
	f.unacked = 0
108
	f.updateEffectiveWindowSize()
109
	return w
110
}
111

112
func (f *trInFlow) updateEffectiveWindowSize() {
113
	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
114
}
115

116
func (f *trInFlow) getSize() uint32 {
117
	return atomic.LoadUint32(&f.effectiveWindowSize)
118
}
119

120
// TODO(mmukhi): Simplify this code.
121
// inFlow deals with inbound flow control
122
type inFlow struct {
123
	mu sync.Mutex
124
	// The inbound flow control limit for pending data.
125
	limit uint32
126
	// pendingData is the overall data which have been received but not been
127
	// consumed by applications.
128
	pendingData uint32
129
	// The amount of data the application has consumed but grpc has not sent
130
	// window update for them. Used to reduce window update frequency.
131
	pendingUpdate uint32
132
	// delta is the extra window update given by receiver when an application
133
	// is reading data bigger in size than the inFlow limit.
134
	delta uint32
135
}
136

137
// newLimit updates the inflow window to a new value n.
138
// It assumes that n is always greater than the old limit.
139
func (f *inFlow) newLimit(n uint32) uint32 {
140
	f.mu.Lock()
141
	d := n - f.limit
142
	f.limit = n
143
	f.mu.Unlock()
144
	return d
145
}
146

147
func (f *inFlow) maybeAdjust(n uint32) uint32 {
148
	if n > uint32(math.MaxInt32) {
149
		n = uint32(math.MaxInt32)
150
	}
151
	f.mu.Lock()
152
	defer f.mu.Unlock()
153
	// estSenderQuota is the receiver's view of the maximum number of bytes the sender
154
	// can send without a window update.
155
	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
156
	// estUntransmittedData is the maximum number of bytes the sends might not have put
157
	// on the wire yet. A value of 0 or less means that we have already received all or
158
	// more bytes than the application is requesting to read.
159
	estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
160
	// This implies that unless we send a window update, the sender won't be able to send all the bytes
161
	// for this message. Therefore we must send an update over the limit since there's an active read
162
	// request from the application.
163
	if estUntransmittedData > estSenderQuota {
164
		// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
165
		if f.limit+n > maxWindowSize {
166
			f.delta = maxWindowSize - f.limit
167
		} else {
168
			// Send a window update for the whole message and not just the difference between
169
			// estUntransmittedData and estSenderQuota. This will be helpful in case the message
170
			// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
171
			f.delta = n
172
		}
173
		return f.delta
174
	}
175
	return 0
176
}
177

178
// onData is invoked when some data frame is received. It updates pendingData.
179
func (f *inFlow) onData(n uint32) error {
180
	f.mu.Lock()
181
	f.pendingData += n
182
	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
183
		limit := f.limit
184
		rcvd := f.pendingData + f.pendingUpdate
185
		f.mu.Unlock()
186
		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
187
	}
188
	f.mu.Unlock()
189
	return nil
190
}
191

192
// onRead is invoked when the application reads the data. It returns the window size
193
// to be sent to the peer.
194
func (f *inFlow) onRead(n uint32) uint32 {
195
	f.mu.Lock()
196
	if f.pendingData == 0 {
197
		f.mu.Unlock()
198
		return 0
199
	}
200
	f.pendingData -= n
201
	if n > f.delta {
202
		n -= f.delta
203
		f.delta = 0
204
	} else {
205
		f.delta -= n
206
		n = 0
207
	}
208
	f.pendingUpdate += n
209
	if f.pendingUpdate >= f.limit/4 {
210
		wu := f.pendingUpdate
211
		f.pendingUpdate = 0
212
		f.mu.Unlock()
213
		return wu
214
	}
215
	f.mu.Unlock()
216
	return 0
217
}
218

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.