25
func TestPriorityQueue(t *testing.T) {
29
t1 := &delayTask{runAt: t0.Add(0)}
30
t2 := &delayTask{runAt: t0.Add(1 * time.Hour)}
31
t3 := &delayTask{runAt: t0.Add(2 * time.Hour)}
32
t4 := &delayTask{runAt: t0.Add(3 * time.Hour)}
33
sorted := []*delayTask{t1, t2, t3, t4}
35
unsorted := []*delayTask{t4, t2, t3, t1}
36
for _, task := range unsorted {
41
for i, task := range sorted {
43
popped := heap.Pop(pq)
45
t.Fatalf("pop %d was not in order", i)
48
t.Fatalf("did not peek at the next item to be popped")
53
func TestDelayQueueOrdering(t *testing.T) {
54
dq := NewDelayed(DelayQueueWorkers(2))
55
stop := make(chan struct{})
60
var t0, t1, t2 time.Time
62
done := make(chan struct{})
63
dq.PushDelayed(func() error {
69
}, 200*time.Millisecond)
70
dq.PushDelayed(func() error {
75
}, 100*time.Millisecond)
76
dq.Push(func() error {
84
case <-time.After(500 * time.Millisecond):
89
if !(t2.After(t1) && t1.After(t0)) {
90
t.Errorf("expected jobs to be run in order based on delays")
95
func TestDelayQueuePushBeforeRun(t *testing.T) {
97
dq := NewDelayed(DelayQueueBuffer(0))
98
st := make(chan struct{})
107
dq.Push(func() error {
114
<-time.After(time.Millisecond * 10)
118
func TestDelayQueuePushNonblockingWithFullBuffer(t *testing.T) {
120
dq := NewDelayed(DelayQueueBuffer(0), DelayQueueWorkers(0))
122
success := make(chan struct{})
123
timeout := time.After(500 * time.Millisecond)
127
for i := 0; i < queuedItems; i++ {
128
dq.PushDelayed(func() error { return nil }, time.Minute*time.Duration(queuedItems-i))
130
success <- struct{}{}
135
dq := dq.(*delayQueue)
137
if dq.queue.Len() < queuedItems {
138
t.Fatalf("expected 50 items in the queue, got %d", dq.queue.Len())
143
t.Fatal("timed out waiting for enqueues")
147
func TestPriorityQueueShrinking(t *testing.T) {
153
for i := 0; i < c; i++ {
154
dt := &delayTask{runAt: t0.Add(time.Duration(i) * time.Hour)}
159
t.Fatalf("the length of pq should be %d, but end up %d", c, len(pq))
163
t.Fatalf("the capacity of pq should be %d, but end up %d", c, cap(pq))
166
for i := 0; i < c; i++ {
168
if i == 1+c/2 && cap(pq) != c/2 {
169
t.Fatalf("the capacity of pq should be reduced to half its length %d, but got %d", c/2, cap(pq))
174
func TestDelayQueueRetry(t *testing.T) {
176
stop := make(chan struct{})
181
done := make(chan struct{})
182
dq.PushDelayed(func() error {
184
if count == maxTaskRetry+1 {
187
return fmt.Errorf("error count %d", count)
188
}, 200*time.Millisecond)
191
case <-time.After(500 * time.Millisecond):
192
t.Errorf("timeout waiting for the task done")
196
if count != maxTaskRetry+1 {
197
t.Errorf("running count %d != %d", count, maxTaskRetry+1)