kraken

Форк
0
/
ring.go 
165 строк · 4.4 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package hashring
15

16
import (
17
	"log"
18
	"sync"
19
	"time"
20

21
	"github.com/uber/kraken/core"
22
	"github.com/uber/kraken/lib/healthcheck"
23
	"github.com/uber/kraken/lib/hostlist"
24
	"github.com/uber/kraken/lib/hrw"
25
	"github.com/uber/kraken/utils/stringset"
26
)
27

28
const _defaultWeight = 100
29

30
// Watcher allows clients to watch the ring for changes. Whenever membership
31
// changes, each registered Watcher is notified with the latest hosts.
32
type Watcher interface {
33
	Notify(latest stringset.Set)
34
}
35

36
// Ring is a rendezvous hashing ring which calculates an ordered replica set
37
// of healthy addresses which own any given digest.
38
//
39
// Address membership within the ring is defined by a dynamic hostlist.List. On
40
// top of that, replica sets are filtered by the health status of their addresses.
41
// Membership and health status may be refreshed by using Monitor.
42
//
43
// Ring maintains the invariant that it is always non-empty and can always provide
44
// locations, although in some scenarios the provided locations are not guaranteed
45
// to be healthy (see Locations).
46
type Ring interface {
47
	Locations(d core.Digest) []string
48
	Contains(addr string) bool
49
	Monitor(stop <-chan struct{})
50
	Refresh()
51
}
52

53
type ring struct {
54
	config  Config
55
	cluster hostlist.List
56
	filter  healthcheck.Filter
57

58
	mu      sync.RWMutex // Protects the following fields:
59
	addrs   stringset.Set
60
	hash    *hrw.RendezvousHash
61
	healthy stringset.Set
62

63
	watchers []Watcher
64
}
65

66
// Option allows setting custom parameters for ring.
67
type Option func(*ring)
68

69
// WithWatcher adds a watcher to the ring. Can be used multiple times.
70
func WithWatcher(w Watcher) Option {
71
	return func(r *ring) { r.watchers = append(r.watchers, w) }
72
}
73

74
// New creates a new Ring whose members are defined by cluster.
75
func New(
76
	config Config, cluster hostlist.List, filter healthcheck.Filter, opts ...Option) Ring {
77

78
	config.applyDefaults()
79
	r := &ring{
80
		config:  config,
81
		cluster: cluster,
82
		filter:  filter,
83
	}
84
	for _, opt := range opts {
85
		opt(r)
86
	}
87
	r.Refresh()
88
	return r
89
}
90

91
// Locations returns an ordered replica set of healthy addresses which own d.
92
// If all addresses in the replica set are unhealthy, then returns the next
93
// healthy address. If all addresses in the ring are unhealthy, then returns
94
// the first address which owns d (regardless of health). As such, Locations
95
// always returns a non-empty list.
96
func (r *ring) Locations(d core.Digest) []string {
97
	r.mu.RLock()
98
	defer r.mu.RUnlock()
99

100
	nodes := r.hash.GetOrderedNodes(d.ShardID(), len(r.addrs))
101
	if len(nodes) != len(r.addrs) {
102
		// This should never happen.
103
		log.Fatal("invariant violation: ordered hash nodes not equal to cluster size")
104
	}
105

106
	if len(r.healthy) == 0 {
107
		return []string{nodes[0].Label}
108
	}
109

110
	var locs []string
111
	for i := 0; i < len(nodes) && (len(locs) == 0 || i < r.config.MaxReplica); i++ {
112
		addr := nodes[i].Label
113
		if r.healthy.Has(addr) {
114
			locs = append(locs, addr)
115
		}
116
	}
117
	return locs
118
}
119

120
// Contains returns whether the ring contains addr.
121
func (r *ring) Contains(addr string) bool {
122
	r.mu.RLock()
123
	defer r.mu.RUnlock()
124

125
	return r.addrs.Has(addr)
126
}
127

128
// Monitor refreshes the ring at the configured interval. Blocks until the
129
// provided stop channel is closed.
130
func (r *ring) Monitor(stop <-chan struct{}) {
131
	for {
132
		select {
133
		case <-stop:
134
			return
135
		case <-time.After(r.config.RefreshInterval):
136
			r.Refresh()
137
		}
138
	}
139
}
140

141
// Refresh updates the membership and health information of r.
142
func (r *ring) Refresh() {
143
	latest := r.cluster.Resolve()
144

145
	healthy := r.filter.Run(latest)
146

147
	hash := r.hash
148
	if !stringset.Equal(r.addrs, latest) {
149
		// Membership has changed -- update hash nodes.
150
		hash = hrw.NewRendezvousHash(hrw.Murmur3Hash, hrw.UInt64ToFloat64)
151
		for addr := range latest {
152
			hash.AddNode(addr, _defaultWeight)
153
		}
154
		// Notify watchers.
155
		for _, w := range r.watchers {
156
			w.Notify(latest.Copy())
157
		}
158
	}
159

160
	r.mu.Lock()
161
	r.addrs = latest
162
	r.hash = hash
163
	r.healthy = healthy
164
	r.mu.Unlock()
165
}
166

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.