1
// Copyright Istio Authors
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
25
"istio.io/istio/pkg/test/util/assert"
26
"istio.io/istio/pkg/test/util/retry"
29
func TestOrdering(t *testing.T) {
32
q := NewQueue(1 * time.Microsecond)
33
stop := make(chan struct{})
36
wg := sync.WaitGroup{}
40
for i := 0; i < numValues; i++ {
51
// Start the queue at the halfway point.
57
// wait for all task processed
60
if len(out) != numValues {
61
t.Fatalf("expected output array length %d to equal %d", len(out), numValues)
64
for i := 0; i < numValues; i++ {
66
t.Fatalf("expected out[%d] %v to equal %v", i, out[i], i)
71
func TestRetry(t *testing.T) {
72
q := NewQueue(1 * time.Microsecond)
73
stop := make(chan struct{})
76
// Push a task that fails the first time and retries.
77
wg := sync.WaitGroup{}
86
return errors.New("fake error")
91
// wait for the task to run twice.
95
func TestResourceFree(t *testing.T) {
96
q := NewQueue(1 * time.Microsecond)
97
stop := make(chan struct{})
98
signal := make(chan struct{})
104
q.Push(func() error {
109
// mock queue block wait cond signal
110
time.AfterFunc(10*time.Millisecond, func() {
115
case <-time.After(200 * time.Millisecond):
116
t.Error("close stop, method exit timeout.")
118
t.Log("queue return.")
122
func TestClosed(t *testing.T) {
123
t.Run("immediate close", func(t *testing.T) {
124
stop := make(chan struct{})
128
if err := WaitForClose(q, 10*time.Second); err != nil {
132
t.Run("no tasks after close", func(t *testing.T) {
133
stop := make(chan struct{})
135
taskComplete := atomic.NewBool(false)
136
q.Push(func() error {
141
if err := WaitForClose(q, 10*time.Second); err != nil {
144
q.Push(func() error {
145
taskComplete.Store(true)
148
if taskComplete.Load() {
149
t.Error("task ran on closed queue")
154
func TestSync(t *testing.T) {
155
handles := atomic.NewInt32(0)
156
task := func() error {
162
stop := make(chan struct{})
164
retry.UntilOrFail(t, q.HasSynced, retry.Delay(time.Microsecond))
165
// Must always be 1 since we are synced
166
assert.Equal(t, handles.Load(), 1)
168
assert.NoError(t, WaitForClose(q, time.Second))