1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/stretchr/testify/require"
28
tpool = []TaskPool{New(10, 10), New(10, 10)}
33
func BenchmarkCheck_With_Chan_Con(b *testing.B) {
35
for i := 0; i < b.N; i++ {
36
Test_Check_With_Chan_Con(t)
40
func BenchmarkCheck_With_Chan_Serial(b *testing.B) {
42
for i := 0; i < b.N; i++ {
43
Test_Check_With_Chan_Serial(t)
47
func Test_Check_With_Chan_Con(t *testing.T) {
48
nums := make([]int, 0, listNum)
49
for i := 0; i < listNum; i++ {
50
nums = append(nums, i)
52
in := func() <-chan int {
55
for _, n := range nums {
62
getChanged := func(in <-chan int, batchCnt int) <-chan int {
64
changed := make([]int, 0)
66
tmp := make([]int, 0, batchCnt)
69
if len(tmp) == batchCnt {
70
changed = append(changed, batch(tmp)...)
75
changed = append(changed, batch(tmp)...)
78
for i := range changed {
86
chans := make([]<-chan int, poolSize)
87
for i := range chans {
88
chans[i] = getChanged(in, batchCnt)
93
for n := range merge(chans[:]) {
98
atomic.AddInt32(&cnt, 1)
99
time.Sleep(time.Microsecond)
105
require.Equal(t, cnt, int32(listNum/2))
108
func Test_Check_With_Chan_Serial(t *testing.T) {
109
nums := make([]int, 0, listNum)
110
for i := 0; i < listNum; i++ {
111
nums = append(nums, i)
113
in := func() <-chan int {
114
out := make(chan int)
116
for _, n := range nums {
123
getChanged := func(in <-chan int, batchCnt int) <-chan int {
124
out := make(chan int)
125
changed := make([]int, 0)
126
tpool[0].Run(func() {
127
tmp := make([]int, 0, batchCnt)
130
if len(tmp) == batchCnt {
131
changed = append(changed, batch(tmp)...)
136
changed = append(changed, batch(tmp)...)
139
for i := range changed {
147
chans := make([]<-chan int, poolSize)
148
for i := range chans {
149
chans[i] = getChanged(in, batchCnt)
152
for n := range merge(chans[:]) {
154
atomic.AddInt32(&cnt, 1)
155
time.Sleep(time.Microsecond)
159
require.Equal(t, cnt, int32(listNum/2))
162
func batch(inodes []int) []int {
163
changed := make([]int, 0)
164
for i := range inodes {
165
if inodes[i]%2 == 1 {
166
changed = append(changed, inodes[i])
172
func merge(cs []<-chan int) <-chan int {
173
var wg sync.WaitGroup
174
out := make(chan int)