1
// Copyright 2024 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
12
// testSyncHooks coordinates goroutines in tests.
14
// For example, a call to ClientConn.RoundTrip involves several goroutines, including:
15
// - the goroutine running RoundTrip;
16
// - the clientStream.doRequest goroutine, which writes the request; and
17
// - the clientStream.readLoop goroutine, which reads the response.
19
// Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines
20
// are blocked waiting for some condition such as reading the Request.Body or waiting for
21
// flow control to become available.
23
// The testSyncHooks also manage timers and synthetic time in tests.
24
// This permits us to, for example, start a request and cause it to time out waiting for
25
// response headers without resorting to time.Sleep calls.
26
type testSyncHooks struct {
27
// active/inactive act as a mutex and condition variable.
29
// - neither chan contains a value: testSyncHooks is locked.
30
// - active contains a value: unlocked, and at least one goroutine is not blocked
31
// - inactive contains a value: unlocked, and all goroutines are blocked
33
inactive chan struct{}
36
total int // total goroutines
37
condwait map[*sync.Cond]int // blocked in sync.Cond.Wait
38
blocked []*testBlockedGoroutine // otherwise blocked
44
// Transport testing: Report various events.
45
newclientconn func(*ClientConn)
46
newstream func(*clientStream)
49
// testBlockedGoroutine is a blocked goroutine.
50
type testBlockedGoroutine struct {
51
f func() bool // blocked until f returns true
52
ch chan struct{} // closed when unblocked
55
func newTestSyncHooks() *testSyncHooks {
57
active: make(chan struct{}, 1),
58
inactive: make(chan struct{}, 1),
59
condwait: map[*sync.Cond]int{},
61
h.inactive <- struct{}{}
62
h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
66
// lock acquires the testSyncHooks mutex.
67
func (h *testSyncHooks) lock() {
74
// waitInactive waits for all goroutines to become inactive.
75
func (h *testSyncHooks) waitInactive() {
84
// unlock releases the testSyncHooks mutex.
85
// It reports whether any goroutines are active.
86
func (h *testSyncHooks) unlock() (active bool) {
87
// Look for a blocked goroutine which can be unblocked.
88
blocked := h.blocked[:0]
90
for _, b := range h.blocked {
91
if !unblocked && b.f() {
95
blocked = append(blocked, b)
100
// Count goroutines blocked on condition variables.
102
for _, count := range h.condwait {
106
if h.total > condwait+len(blocked) {
107
h.active <- struct{}{}
110
h.inactive <- struct{}{}
115
// goRun starts a new goroutine.
116
func (h *testSyncHooks) goRun(f func()) {
130
// blockUntil indicates that a goroutine is blocked waiting for some condition to become true.
131
// It waits until f returns true before proceeding.
135
// h.blockUntil(func() bool {
136
// // Is the context done yet?
144
// // Wait for the context to become done.
147
// The function f passed to blockUntil must be non-blocking and idempotent.
148
func (h *testSyncHooks) blockUntil(f func() bool) {
152
ch := make(chan struct{})
154
h.blocked = append(h.blocked, &testBlockedGoroutine{
162
// broadcast is sync.Cond.Broadcast.
163
func (h *testSyncHooks) condBroadcast(cond *sync.Cond) {
165
delete(h.condwait, cond)
170
// broadcast is sync.Cond.Wait.
171
func (h *testSyncHooks) condWait(cond *sync.Cond) {
177
// newTimer creates a new fake timer.
178
func (h *testSyncHooks) newTimer(d time.Duration) timer {
184
c: make(chan time.Time),
186
h.timers = append(h.timers, t)
190
// afterFunc creates a new fake AfterFunc timer.
191
func (h *testSyncHooks) afterFunc(d time.Duration, f func()) timer {
199
h.timers = append(h.timers, t)
203
func (h *testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
204
ctx, cancel := context.WithCancel(ctx)
205
t := h.afterFunc(d, cancel)
212
func (h *testSyncHooks) timeUntilEvent() time.Duration {
216
for _, t := range h.timers {
217
if next.IsZero() || t.when.Before(next) {
221
if d := next.Sub(h.now); d > 0 {
227
// advance advances time and causes synthetic timers to fire.
228
func (h *testSyncHooks) advance(d time.Duration) {
232
timers := h.timers[:0]
233
for _, t := range h.timers {
234
t := t // remove after go.mod depends on go1.22
237
case t.when.After(h.now):
238
timers = append(timers, t)
239
case t.when.IsZero():
263
// A timer wraps a time.Timer, or a synthetic equivalent in tests.
264
// Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires.
265
type timer interface {
268
Reset(d time.Duration) bool
271
// timeTimer implements timer using real time.
272
type timeTimer struct {
277
// newTimeTimer creates a new timer using real time.
278
func newTimeTimer(d time.Duration) timer {
279
ch := make(chan time.Time)
280
t := time.AfterFunc(d, func() {
283
return &timeTimer{t, ch}
286
// newTimeAfterFunc creates an AfterFunc timer using real time.
287
func newTimeAfterFunc(d time.Duration, f func()) timer {
289
t: time.AfterFunc(d, f),
293
func (t timeTimer) C() <-chan time.Time { return t.c }
294
func (t timeTimer) Stop() bool { return t.t.Stop() }
295
func (t timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) }
297
// fakeTimer implements timer using fake time.
298
type fakeTimer struct {
302
when time.Time // when the timer will fire
303
c chan time.Time // closed when the timer fires; mutually exclusive with f
304
f func() // called when the timer fires; mutually exclusive with c
307
func (t *fakeTimer) C() <-chan time.Time { return t.c }
309
func (t *fakeTimer) Stop() bool {
312
stopped := t.when.IsZero()
317
func (t *fakeTimer) Reset(d time.Duration) bool {
318
if t.c != nil || t.f == nil {
319
panic("fakeTimer only supports Reset on AfterFunc timers")
324
defer t.hooks.unlock()
325
active := !t.when.IsZero()
326
t.when = t.hooks.now.Add(d)
328
t.hooks.timers = append(t.hooks.timers, t)