1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
21
"github.com/uber/kraken/core"
22
"github.com/uber/kraken/lib/healthcheck"
23
"github.com/uber/kraken/lib/hostlist"
24
"github.com/uber/kraken/lib/hrw"
25
"github.com/uber/kraken/utils/stringset"
28
const _defaultWeight = 100
30
// Watcher allows clients to watch the ring for changes. Whenever membership
31
// changes, each registered Watcher is notified with the latest hosts.
32
type Watcher interface {
33
Notify(latest stringset.Set)
36
// Ring is a rendezvous hashing ring which calculates an ordered replica set
37
// of healthy addresses which own any given digest.
39
// Address membership within the ring is defined by a dynamic hostlist.List. On
40
// top of that, replica sets are filtered by the health status of their addresses.
41
// Membership and health status may be refreshed by using Monitor.
43
// Ring maintains the invariant that it is always non-empty and can always provide
44
// locations, although in some scenarios the provided locations are not guaranteed
45
// to be healthy (see Locations).
47
Locations(d core.Digest) []string
48
Contains(addr string) bool
49
Monitor(stop <-chan struct{})
56
filter healthcheck.Filter
58
mu sync.RWMutex // Protects the following fields:
60
hash *hrw.RendezvousHash
66
// Option allows setting custom parameters for ring.
67
type Option func(*ring)
69
// WithWatcher adds a watcher to the ring. Can be used multiple times.
70
func WithWatcher(w Watcher) Option {
71
return func(r *ring) { r.watchers = append(r.watchers, w) }
74
// New creates a new Ring whose members are defined by cluster.
76
config Config, cluster hostlist.List, filter healthcheck.Filter, opts ...Option) Ring {
78
config.applyDefaults()
84
for _, opt := range opts {
91
// Locations returns an ordered replica set of healthy addresses which own d.
92
// If all addresses in the replica set are unhealthy, then returns the next
93
// healthy address. If all addresses in the ring are unhealthy, then returns
94
// the first address which owns d (regardless of health). As such, Locations
95
// always returns a non-empty list.
96
func (r *ring) Locations(d core.Digest) []string {
100
nodes := r.hash.GetOrderedNodes(d.ShardID(), len(r.addrs))
101
if len(nodes) != len(r.addrs) {
102
// This should never happen.
103
log.Fatal("invariant violation: ordered hash nodes not equal to cluster size")
106
if len(r.healthy) == 0 {
107
return []string{nodes[0].Label}
111
for i := 0; i < len(nodes) && (len(locs) == 0 || i < r.config.MaxReplica); i++ {
112
addr := nodes[i].Label
113
if r.healthy.Has(addr) {
114
locs = append(locs, addr)
120
// Contains returns whether the ring contains addr.
121
func (r *ring) Contains(addr string) bool {
125
return r.addrs.Has(addr)
128
// Monitor refreshes the ring at the configured interval. Blocks until the
129
// provided stop channel is closed.
130
func (r *ring) Monitor(stop <-chan struct{}) {
135
case <-time.After(r.config.RefreshInterval):
141
// Refresh updates the membership and health information of r.
142
func (r *ring) Refresh() {
143
latest := r.cluster.Resolve()
145
healthy := r.filter.Run(latest)
148
if !stringset.Equal(r.addrs, latest) {
149
// Membership has changed -- update hash nodes.
150
hash = hrw.NewRendezvousHash(hrw.Murmur3Hash, hrw.UInt64ToFloat64)
151
for addr := range latest {
152
hash.AddNode(addr, _defaultWeight)
155
for _, w := range r.watchers {
156
w.Notify(latest.Copy())