kraken

Форк
0
/
ring_test.go 
275 строк · 6.2 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package hashring
15

16
import (
17
	"testing"
18
	"time"
19

20
	"github.com/uber/kraken/core"
21
	"github.com/uber/kraken/lib/healthcheck"
22
	"github.com/uber/kraken/lib/hostlist"
23
	"github.com/uber/kraken/mocks/lib/hashring"
24
	"github.com/uber/kraken/mocks/lib/hostlist"
25
	"github.com/uber/kraken/utils/randutil"
26
	"github.com/uber/kraken/utils/stringset"
27

28
	"github.com/golang/mock/gomock"
29
	"github.com/stretchr/testify/require"
30
)
31

32
func addrsFixture(n int) []string {
33
	var addrs []string
34
	for i := 0; i < n; i++ {
35
		addrs = append(addrs, randutil.Addr())
36
	}
37
	return addrs
38
}
39

40
func TestRingLocationsDistribution(t *testing.T) {
41
	tests := []struct {
42
		desc                 string
43
		clusterSize          int
44
		maxReplica           int
45
		expectedDistribution float64
46
	}{
47
		{"single host", 1, 1, 1.0},
48
		{"all replicas", 3, 3, 1.0},
49
		{"below max replica", 2, 3, 1.0},
50
		{"above max replica", 6, 3, 0.5},
51
	}
52
	for _, test := range tests {
53
		t.Run(test.desc, func(t *testing.T) {
54
			require := require.New(t)
55

56
			addrs := addrsFixture(test.clusterSize)
57

58
			r := New(
59
				Config{MaxReplica: test.maxReplica},
60
				hostlist.Fixture(addrs...),
61
				healthcheck.IdentityFilter{})
62

63
			sampleSize := 2000
64

65
			counts := make(map[string]int)
66
			for i := 0; i < sampleSize; i++ {
67
				for _, addr := range r.Locations(core.DigestFixture()) {
68
					counts[addr]++
69
				}
70
			}
71

72
			for _, addr := range addrs {
73
				distribution := float64(counts[addr]) / float64(sampleSize)
74
				require.InDelta(test.expectedDistribution, distribution, 0.05)
75
			}
76
		})
77
	}
78
}
79

80
func TestRingLocationsFiltersOutUnhealthyHosts(t *testing.T) {
81
	require := require.New(t)
82

83
	filter := healthcheck.NewManualFilter()
84

85
	r := New(
86
		Config{MaxReplica: 3},
87
		hostlist.Fixture(addrsFixture(10)...),
88
		filter)
89

90
	d := core.DigestFixture()
91

92
	replicas := r.Locations(d)
93
	require.Len(replicas, 3)
94

95
	filter.Unhealthy.Add(replicas[0])
96
	r.Refresh()
97

98
	result := r.Locations(d)
99
	require.Equal(replicas[1:], result)
100
}
101

102
func TestRingLocationsReturnsNextHealthyHostWhenReplicaSetUnhealthy(t *testing.T) {
103
	require := require.New(t)
104

105
	filter := healthcheck.NewManualFilter()
106

107
	r := New(
108
		Config{MaxReplica: 3},
109
		hostlist.Fixture(addrsFixture(10)...),
110
		filter)
111

112
	d := core.DigestFixture()
113

114
	replicas := r.Locations(d)
115
	require.Len(replicas, 3)
116

117
	// Mark all the original replicas as unhealthy.
118
	for _, addr := range replicas {
119
		filter.Unhealthy.Add(addr)
120
	}
121
	r.Refresh()
122

123
	// Should consistently select the next address.
124
	var next []string
125
	for i := 0; i < 10; i++ {
126
		next = r.Locations(d)
127
		require.Len(next, 1)
128
		require.NotContains(replicas, next[0])
129
	}
130

131
	// Mark the next address as unhealthy.
132
	filter.Unhealthy.Add(next[0])
133
	r.Refresh()
134

135
	// Should consistently select the address after next.
136
	for i := 0; i < 10; i++ {
137
		nextNext := r.Locations(d)
138
		require.Len(nextNext, 1)
139
		require.NotContains(append(replicas, next[0]), nextNext[0])
140
	}
141
}
142

143
func TestRingLocationsReturnsFirstHostWhenAllHostsUnhealthy(t *testing.T) {
144
	require := require.New(t)
145

146
	filter := healthcheck.NewBinaryFilter()
147

148
	r := New(
149
		Config{MaxReplica: 3},
150
		hostlist.Fixture(addrsFixture(10)...),
151
		filter)
152

153
	d := core.DigestFixture()
154

155
	replicas := r.Locations(d)
156
	require.Len(replicas, 3)
157

158
	filter.Healthy = false
159
	r.Refresh()
160

161
	// Should consistently select the first replica once all are unhealthy.
162
	for i := 0; i < 10; i++ {
163
		result := r.Locations(d)
164
		require.Len(result, 1)
165
		require.Equal(replicas[0], result[0])
166
	}
167
}
168

169
func TestRingContains(t *testing.T) {
170
	require := require.New(t)
171

172
	x := "x:80"
173
	y := "y:80"
174
	z := "z:80"
175

176
	r := New(Config{}, hostlist.Fixture(x, y), healthcheck.IdentityFilter{})
177

178
	require.True(r.Contains(x))
179
	require.True(r.Contains(y))
180
	require.False(r.Contains(z))
181
}
182

183
func TestRingMonitor(t *testing.T) {
184
	require := require.New(t)
185

186
	ctrl := gomock.NewController(t)
187
	defer ctrl.Finish()
188

189
	cluster := mockhostlist.NewMockList(ctrl)
190

191
	x := "x:80"
192
	y := "y:80"
193

194
	gomock.InOrder(
195
		cluster.EXPECT().Resolve().Return(stringset.New(x)),
196
		cluster.EXPECT().Resolve().Return(stringset.New(y)),
197
	)
198

199
	r := New(
200
		Config{RefreshInterval: time.Second},
201
		cluster,
202
		healthcheck.IdentityFilter{})
203

204
	stop := make(chan struct{})
205
	defer close(stop)
206
	go r.Monitor(stop)
207

208
	d := core.DigestFixture()
209

210
	require.Equal([]string{x}, r.Locations(d))
211

212
	// Monitor should refresh the ring.
213
	time.Sleep(1250 * time.Millisecond)
214

215
	require.Equal([]string{y}, r.Locations(d))
216
}
217

218
func TestRingRefreshUpdatesMembership(t *testing.T) {
219
	require := require.New(t)
220

221
	ctrl := gomock.NewController(t)
222
	defer ctrl.Finish()
223

224
	cluster := mockhostlist.NewMockList(ctrl)
225

226
	x := "x:80"
227
	y := "y:80"
228
	z := "z:80"
229

230
	// x is removed and z is added on the 2nd resolve.
231
	gomock.InOrder(
232
		cluster.EXPECT().Resolve().Return(stringset.New(x, y)),
233
		cluster.EXPECT().Resolve().Return(stringset.New(y, z)),
234
	)
235

236
	r := New(Config{}, cluster, healthcheck.IdentityFilter{})
237

238
	d := core.DigestFixture()
239

240
	require.ElementsMatch([]string{x, y}, r.Locations(d))
241

242
	r.Refresh()
243

244
	require.ElementsMatch([]string{y, z}, r.Locations(d))
245
}
246

247
func TestRingNotifiesWatchersOnMembershipChanges(t *testing.T) {
248
	ctrl := gomock.NewController(t)
249
	defer ctrl.Finish()
250

251
	cluster := mockhostlist.NewMockList(ctrl)
252

253
	watcher := mockhashring.NewMockWatcher(ctrl)
254

255
	x := "x:80"
256
	y := "y:80"
257
	z := "z:80"
258

259
	gomock.InOrder(
260
		// Called during initial refresh when ring is created.
261
		cluster.EXPECT().Resolve().Return(stringset.New(x, y)),
262
		watcher.EXPECT().Notify(stringset.New(x, y)),
263

264
		// Called on subsequent refresh.
265
		cluster.EXPECT().Resolve().Return(stringset.New(x, y, z)),
266
		watcher.EXPECT().Notify(stringset.New(x, y, z)),
267

268
		// No changes does not notify.
269
		cluster.EXPECT().Resolve().Return(stringset.New(x, y, z)),
270
	)
271

272
	r := New(Config{}, cluster, healthcheck.IdentityFilter{}, WithWatcher(watcher))
273
	r.Refresh()
274
	r.Refresh()
275
}
276

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.