cubefs

Форк
0
/
taskpool_test.go 
192 строки · 3.7 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package common
16

17
import (
18
	"sync"
19
	"sync/atomic"
20
	"testing"
21
	"time"
22

23
	"github.com/stretchr/testify/require"
24
)
25

26
var (
27
	poolSize = 10
28
	tpool    = []TaskPool{New(10, 10), New(10, 10)}
29
	batchCnt = 10000
30
	listNum  = 200000
31
)
32

33
func BenchmarkCheck_With_Chan_Con(b *testing.B) {
34
	t := &testing.T{}
35
	for i := 0; i < b.N; i++ {
36
		Test_Check_With_Chan_Con(t)
37
	}
38
}
39

40
func BenchmarkCheck_With_Chan_Serial(b *testing.B) {
41
	t := &testing.T{}
42
	for i := 0; i < b.N; i++ {
43
		Test_Check_With_Chan_Serial(t)
44
	}
45
}
46

47
func Test_Check_With_Chan_Con(t *testing.T) {
48
	nums := make([]int, 0, listNum)
49
	for i := 0; i < listNum; i++ {
50
		nums = append(nums, i)
51
	}
52
	in := func() <-chan int {
53
		out := make(chan int)
54
		go func() {
55
			for _, n := range nums {
56
				out <- n
57
			}
58
			close(out)
59
		}()
60
		return out
61
	}()
62
	getChanged := func(in <-chan int, batchCnt int) <-chan int {
63
		out := make(chan int)
64
		changed := make([]int, 0)
65
		tpool[0].Run(func() {
66
			tmp := make([]int, 0, batchCnt)
67
			for i := range in {
68
				tmp = append(tmp, i)
69
				if len(tmp) == batchCnt {
70
					changed = append(changed, batch(tmp)...)
71
					tmp = tmp[:0]
72
				}
73
			}
74
			if len(tmp) != 0 {
75
				changed = append(changed, batch(tmp)...)
76
			}
77

78
			for i := range changed {
79
				out <- changed[i]
80
			}
81
			close(out)
82
		})
83

84
		return out
85
	}
86
	chans := make([]<-chan int, poolSize)
87
	for i := range chans {
88
		chans[i] = getChanged(in, batchCnt)
89
	}
90
	var wg sync.WaitGroup
91
	var cnt int32
92

93
	for n := range merge(chans[:]) {
94
		wg.Add(1)
95
		nn := n
96
		tpool[1].Run(func() {
97
			if nn%2 == 1 {
98
				atomic.AddInt32(&cnt, 1)
99
				time.Sleep(time.Microsecond)
100
			}
101
			wg.Done()
102
		})
103
	}
104
	wg.Wait()
105
	require.Equal(t, cnt, int32(listNum/2))
106
}
107

108
func Test_Check_With_Chan_Serial(t *testing.T) {
109
	nums := make([]int, 0, listNum)
110
	for i := 0; i < listNum; i++ {
111
		nums = append(nums, i)
112
	}
113
	in := func() <-chan int {
114
		out := make(chan int)
115
		go func() {
116
			for _, n := range nums {
117
				out <- n
118
			}
119
			close(out)
120
		}()
121
		return out
122
	}()
123
	getChanged := func(in <-chan int, batchCnt int) <-chan int {
124
		out := make(chan int)
125
		changed := make([]int, 0)
126
		tpool[0].Run(func() {
127
			tmp := make([]int, 0, batchCnt)
128
			for i := range in {
129
				tmp = append(tmp, i)
130
				if len(tmp) == batchCnt {
131
					changed = append(changed, batch(tmp)...)
132
					tmp = tmp[:0]
133
				}
134
			}
135
			if len(tmp) != 0 {
136
				changed = append(changed, batch(tmp)...)
137
			}
138

139
			for i := range changed {
140
				out <- changed[i]
141
			}
142
			close(out)
143
		})
144

145
		return out
146
	}
147
	chans := make([]<-chan int, poolSize)
148
	for i := range chans {
149
		chans[i] = getChanged(in, batchCnt)
150
	}
151
	var cnt int32
152
	for n := range merge(chans[:]) {
153
		if n%2 == 1 {
154
			atomic.AddInt32(&cnt, 1)
155
			time.Sleep(time.Microsecond)
156
		}
157
	}
158

159
	require.Equal(t, cnt, int32(listNum/2))
160
}
161

162
func batch(inodes []int) []int {
163
	changed := make([]int, 0)
164
	for i := range inodes {
165
		if inodes[i]%2 == 1 {
166
			changed = append(changed, inodes[i])
167
		}
168
	}
169
	return changed
170
}
171

172
func merge(cs []<-chan int) <-chan int {
173
	var wg sync.WaitGroup
174
	out := make(chan int)
175

176
	wg.Add(len(cs))
177
	for i := range cs {
178
		cc := cs[i]
179
		go func() {
180
			for n := range cc {
181
				out <- n
182
			}
183
			wg.Done()
184
		}()
185

186
	}
187
	go func() {
188
		wg.Wait()
189
		close(out)
190
	}()
191
	return out
192
}
193

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.