istio

Форк
0
/
delay_test.go 
199 строк · 4.3 Кб
1
// Copyright Istio Authors
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package queue
16

17
import (
18
	"container/heap"
19
	"fmt"
20
	"sync"
21
	"testing"
22
	"time"
23
)
24

25
func TestPriorityQueue(t *testing.T) {
26
	pq := &pq{}
27

28
	t0 := time.Now()
29
	t1 := &delayTask{runAt: t0.Add(0)}
30
	t2 := &delayTask{runAt: t0.Add(1 * time.Hour)}
31
	t3 := &delayTask{runAt: t0.Add(2 * time.Hour)}
32
	t4 := &delayTask{runAt: t0.Add(3 * time.Hour)}
33
	sorted := []*delayTask{t1, t2, t3, t4}
34
	// fill in an unsorted order
35
	unsorted := []*delayTask{t4, t2, t3, t1}
36
	for _, task := range unsorted {
37
		heap.Push(pq, task)
38
	}
39

40
	// dequeue should be in order
41
	for i, task := range sorted {
42
		peeked := pq.Peek()
43
		popped := heap.Pop(pq)
44
		if task != popped {
45
			t.Fatalf("pop %d was not in order", i)
46
		}
47
		if peeked != popped {
48
			t.Fatalf("did not peek at the next item to be popped")
49
		}
50
	}
51
}
52

53
func TestDelayQueueOrdering(t *testing.T) {
54
	dq := NewDelayed(DelayQueueWorkers(2))
55
	stop := make(chan struct{})
56
	defer close(stop)
57
	go dq.Run(stop)
58

59
	mu := sync.Mutex{}
60
	var t0, t1, t2 time.Time
61

62
	done := make(chan struct{})
63
	dq.PushDelayed(func() error {
64
		mu.Lock()
65
		defer mu.Unlock()
66
		defer close(done)
67
		t2 = time.Now()
68
		return nil
69
	}, 200*time.Millisecond)
70
	dq.PushDelayed(func() error {
71
		mu.Lock()
72
		defer mu.Unlock()
73
		t1 = time.Now()
74
		return nil
75
	}, 100*time.Millisecond)
76
	dq.Push(func() error {
77
		mu.Lock()
78
		defer mu.Unlock()
79
		t0 = time.Now()
80
		return nil
81
	})
82

83
	select {
84
	case <-time.After(500 * time.Millisecond):
85
	case <-done:
86
	}
87

88
	mu.Lock()
89
	if !(t2.After(t1) && t1.After(t0)) {
90
		t.Errorf("expected jobs to be run in order based on delays")
91
	}
92
	mu.Unlock()
93
}
94

95
func TestDelayQueuePushBeforeRun(t *testing.T) {
96
	// This is a regression test to ensure we can push while Run() is called without a race
97
	dq := NewDelayed(DelayQueueBuffer(0))
98
	st := make(chan struct{})
99
	go func() {
100
		// Enqueue a bunch until we stop
101
		for {
102
			select {
103
			case <-st:
104
				return
105
			default:
106
			}
107
			dq.Push(func() error {
108
				return nil
109
			})
110
		}
111
	}()
112
	go dq.Run(st)
113
	// Wait a bit
114
	<-time.After(time.Millisecond * 10)
115
	close(st)
116
}
117

118
func TestDelayQueuePushNonblockingWithFullBuffer(t *testing.T) {
119
	queuedItems := 50
120
	dq := NewDelayed(DelayQueueBuffer(0), DelayQueueWorkers(0))
121

122
	success := make(chan struct{})
123
	timeout := time.After(500 * time.Millisecond)
124
	defer close(success)
125

126
	go func() {
127
		for i := 0; i < queuedItems; i++ {
128
			dq.PushDelayed(func() error { return nil }, time.Minute*time.Duration(queuedItems-i))
129
		}
130
		success <- struct{}{}
131
	}()
132

133
	select {
134
	case <-success:
135
		dq := dq.(*delayQueue)
136
		dq.mu.Lock()
137
		if dq.queue.Len() < queuedItems {
138
			t.Fatalf("expected 50 items in the queue, got %d", dq.queue.Len())
139
		}
140
		dq.mu.Unlock()
141
		return
142
	case <-timeout:
143
		t.Fatal("timed out waiting for enqueues")
144
	}
145
}
146

147
func TestPriorityQueueShrinking(t *testing.T) {
148
	c := 48
149
	pq := make(pq, 0, c)
150
	pqp := &pq
151

152
	t0 := time.Now()
153
	for i := 0; i < c; i++ {
154
		dt := &delayTask{runAt: t0.Add(time.Duration(i) * time.Hour)}
155
		heap.Push(pqp, dt)
156
	}
157

158
	if len(pq) != c {
159
		t.Fatalf("the length of pq should be %d, but end up %d", c, len(pq))
160
	}
161

162
	if cap(pq) != c {
163
		t.Fatalf("the capacity of pq should be %d, but end up %d", c, cap(pq))
164
	}
165

166
	for i := 0; i < c; i++ {
167
		_ = heap.Pop(pqp)
168
		if i == 1+c/2 && cap(pq) != c/2 {
169
			t.Fatalf("the capacity of pq should be reduced to half its length %d, but got %d", c/2, cap(pq))
170
		}
171
	}
172
}
173

174
func TestDelayQueueRetry(t *testing.T) {
175
	dq := NewDelayed()
176
	stop := make(chan struct{})
177
	defer close(stop)
178
	go dq.Run(stop)
179

180
	count := 0
181
	done := make(chan struct{})
182
	dq.PushDelayed(func() error {
183
		count++
184
		if count == maxTaskRetry+1 {
185
			close(done)
186
		}
187
		return fmt.Errorf("error count %d", count)
188
	}, 200*time.Millisecond)
189

190
	select {
191
	case <-time.After(500 * time.Millisecond):
192
		t.Errorf("timeout waiting for the task done")
193
	case <-done:
194
	}
195

196
	if count != maxTaskRetry+1 {
197
		t.Errorf("running count %d != %d", count, maxTaskRetry+1)
198
	}
199
}
200

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.