1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
20
"github.com/uber/kraken/core"
21
"github.com/uber/kraken/lib/healthcheck"
22
"github.com/uber/kraken/lib/hostlist"
23
"github.com/uber/kraken/mocks/lib/hashring"
24
"github.com/uber/kraken/mocks/lib/hostlist"
25
"github.com/uber/kraken/utils/randutil"
26
"github.com/uber/kraken/utils/stringset"
28
"github.com/golang/mock/gomock"
29
"github.com/stretchr/testify/require"
32
func addrsFixture(n int) []string {
34
for i := 0; i < n; i++ {
35
addrs = append(addrs, randutil.Addr())
40
func TestRingLocationsDistribution(t *testing.T) {
45
expectedDistribution float64
47
{"single host", 1, 1, 1.0},
48
{"all replicas", 3, 3, 1.0},
49
{"below max replica", 2, 3, 1.0},
50
{"above max replica", 6, 3, 0.5},
52
for _, test := range tests {
53
t.Run(test.desc, func(t *testing.T) {
54
require := require.New(t)
56
addrs := addrsFixture(test.clusterSize)
59
Config{MaxReplica: test.maxReplica},
60
hostlist.Fixture(addrs...),
61
healthcheck.IdentityFilter{})
65
counts := make(map[string]int)
66
for i := 0; i < sampleSize; i++ {
67
for _, addr := range r.Locations(core.DigestFixture()) {
72
for _, addr := range addrs {
73
distribution := float64(counts[addr]) / float64(sampleSize)
74
require.InDelta(test.expectedDistribution, distribution, 0.05)
80
func TestRingLocationsFiltersOutUnhealthyHosts(t *testing.T) {
81
require := require.New(t)
83
filter := healthcheck.NewManualFilter()
86
Config{MaxReplica: 3},
87
hostlist.Fixture(addrsFixture(10)...),
90
d := core.DigestFixture()
92
replicas := r.Locations(d)
93
require.Len(replicas, 3)
95
filter.Unhealthy.Add(replicas[0])
98
result := r.Locations(d)
99
require.Equal(replicas[1:], result)
102
func TestRingLocationsReturnsNextHealthyHostWhenReplicaSetUnhealthy(t *testing.T) {
103
require := require.New(t)
105
filter := healthcheck.NewManualFilter()
108
Config{MaxReplica: 3},
109
hostlist.Fixture(addrsFixture(10)...),
112
d := core.DigestFixture()
114
replicas := r.Locations(d)
115
require.Len(replicas, 3)
117
// Mark all the original replicas as unhealthy.
118
for _, addr := range replicas {
119
filter.Unhealthy.Add(addr)
123
// Should consistently select the next address.
125
for i := 0; i < 10; i++ {
126
next = r.Locations(d)
128
require.NotContains(replicas, next[0])
131
// Mark the next address as unhealthy.
132
filter.Unhealthy.Add(next[0])
135
// Should consistently select the address after next.
136
for i := 0; i < 10; i++ {
137
nextNext := r.Locations(d)
138
require.Len(nextNext, 1)
139
require.NotContains(append(replicas, next[0]), nextNext[0])
143
func TestRingLocationsReturnsFirstHostWhenAllHostsUnhealthy(t *testing.T) {
144
require := require.New(t)
146
filter := healthcheck.NewBinaryFilter()
149
Config{MaxReplica: 3},
150
hostlist.Fixture(addrsFixture(10)...),
153
d := core.DigestFixture()
155
replicas := r.Locations(d)
156
require.Len(replicas, 3)
158
filter.Healthy = false
161
// Should consistently select the first replica once all are unhealthy.
162
for i := 0; i < 10; i++ {
163
result := r.Locations(d)
164
require.Len(result, 1)
165
require.Equal(replicas[0], result[0])
169
func TestRingContains(t *testing.T) {
170
require := require.New(t)
176
r := New(Config{}, hostlist.Fixture(x, y), healthcheck.IdentityFilter{})
178
require.True(r.Contains(x))
179
require.True(r.Contains(y))
180
require.False(r.Contains(z))
183
func TestRingMonitor(t *testing.T) {
184
require := require.New(t)
186
ctrl := gomock.NewController(t)
189
cluster := mockhostlist.NewMockList(ctrl)
195
cluster.EXPECT().Resolve().Return(stringset.New(x)),
196
cluster.EXPECT().Resolve().Return(stringset.New(y)),
200
Config{RefreshInterval: time.Second},
202
healthcheck.IdentityFilter{})
204
stop := make(chan struct{})
208
d := core.DigestFixture()
210
require.Equal([]string{x}, r.Locations(d))
212
// Monitor should refresh the ring.
213
time.Sleep(1250 * time.Millisecond)
215
require.Equal([]string{y}, r.Locations(d))
218
func TestRingRefreshUpdatesMembership(t *testing.T) {
219
require := require.New(t)
221
ctrl := gomock.NewController(t)
224
cluster := mockhostlist.NewMockList(ctrl)
230
// x is removed and z is added on the 2nd resolve.
232
cluster.EXPECT().Resolve().Return(stringset.New(x, y)),
233
cluster.EXPECT().Resolve().Return(stringset.New(y, z)),
236
r := New(Config{}, cluster, healthcheck.IdentityFilter{})
238
d := core.DigestFixture()
240
require.ElementsMatch([]string{x, y}, r.Locations(d))
244
require.ElementsMatch([]string{y, z}, r.Locations(d))
247
func TestRingNotifiesWatchersOnMembershipChanges(t *testing.T) {
248
ctrl := gomock.NewController(t)
251
cluster := mockhostlist.NewMockList(ctrl)
253
watcher := mockhashring.NewMockWatcher(ctrl)
260
// Called during initial refresh when ring is created.
261
cluster.EXPECT().Resolve().Return(stringset.New(x, y)),
262
watcher.EXPECT().Notify(stringset.New(x, y)),
264
// Called on subsequent refresh.
265
cluster.EXPECT().Resolve().Return(stringset.New(x, y, z)),
266
watcher.EXPECT().Notify(stringset.New(x, y, z)),
268
// No changes does not notify.
269
cluster.EXPECT().Resolve().Return(stringset.New(x, y, z)),
272
r := New(Config{}, cluster, healthcheck.IdentityFilter{}, WithWatcher(watcher))