kraken

Форк
0
/
cluster_client.go 
296 строк · 8.4 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package blobclient
15

16
import (
17
	"errors"
18
	"fmt"
19
	"io"
20
	"math/rand"
21
	"net/http"
22
	"sync"
23
	"time"
24

25
	"github.com/cenkalti/backoff"
26

27
	"github.com/uber/kraken/core"
28
	"github.com/uber/kraken/lib/hostlist"
29
	"github.com/uber/kraken/utils/errutil"
30
	"github.com/uber/kraken/utils/httputil"
31
	"github.com/uber/kraken/utils/log"
32
)
33

34
// Locations queries cluster for the locations of d.
35
func Locations(p Provider, cluster hostlist.List, d core.Digest) (locs []string, err error) {
36
	addrs := cluster.Resolve().Sample(3)
37
	if len(addrs) == 0 {
38
		return nil, errors.New("cluster is empty")
39
	}
40
	for addr := range addrs {
41
		locs, err = p.Provide(addr).Locations(d)
42
		if err != nil {
43
			continue
44
		}
45
		break
46
	}
47
	return locs, err
48
}
49

50
// ClientResolver resolves digests into Clients of origins.
51
type ClientResolver interface {
52
	// Resolve must return an ordered, stable list of Clients for origins owning d.
53
	Resolve(d core.Digest) ([]Client, error)
54
}
55

56
type clientResolver struct {
57
	provider Provider
58
	cluster  hostlist.List
59
}
60

61
// NewClientResolver returns a new client resolver.
62
func NewClientResolver(p Provider, cluster hostlist.List) ClientResolver {
63
	return &clientResolver{p, cluster}
64
}
65

66
func (r *clientResolver) Resolve(d core.Digest) ([]Client, error) {
67
	locs, err := Locations(r.provider, r.cluster, d)
68
	if err != nil {
69
		return nil, err
70
	}
71
	var clients []Client
72
	for _, loc := range locs {
73
		clients = append(clients, r.provider.Provide(loc))
74
	}
75
	return clients, nil
76
}
77

78
// ClusterClient defines a top-level origin cluster client which handles blob
79
// location resolution and retries.
80
type ClusterClient interface {
81
	UploadBlob(namespace string, d core.Digest, blob io.Reader) error
82
	DownloadBlob(namespace string, d core.Digest, dst io.Writer) error
83
	GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
84
	Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
85
	OverwriteMetaInfo(d core.Digest, pieceLength int64) error
86
	Owners(d core.Digest) ([]core.PeerContext, error)
87
	ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error
88
}
89

90
type clusterClient struct {
91
	resolver ClientResolver
92
}
93

94
// NewClusterClient returns a new ClusterClient.
95
func NewClusterClient(r ClientResolver) ClusterClient {
96
	return &clusterClient{r}
97
}
98

99
// defaultPollBackOff returns the default backoff used on Poll operations.
100
func (c *clusterClient) defaultPollBackOff() backoff.BackOff {
101
	return &backoff.ExponentialBackOff{
102
		InitialInterval:     time.Second,
103
		RandomizationFactor: 0.05,
104
		Multiplier:          1.3,
105
		MaxInterval:         5 * time.Second,
106
		MaxElapsedTime:      15 * time.Minute,
107
		Clock:               backoff.SystemClock,
108
	}
109
}
110

111
// UploadBlob uploads blob to origin cluster. See Client.UploadBlob for more details.
112
func (c *clusterClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) (err error) {
113
	clients, err := c.resolver.Resolve(d)
114
	if err != nil {
115
		return fmt.Errorf("resolve clients: %s", err)
116
	}
117

118
	// We prefer the origin with highest hashing score so the first origin will handle
119
	// replication to origins with lower score. This is because we want to reduce upload
120
	// conflicts between local replicas.
121
	for _, client := range clients {
122
		err = client.UploadBlob(namespace, d, blob)
123
		// Allow retry on another origin if the current upstream is temporarily
124
		// unavailable or under high load.
125
		if httputil.IsNetworkError(err) || httputil.IsRetryable(err) {
126
			continue
127
		}
128
		break
129
	}
130
	return err
131
}
132

133
// GetMetaInfo returns the metainfo for d. Does not handle polling.
134
func (c *clusterClient) GetMetaInfo(namespace string, d core.Digest) (mi *core.MetaInfo, err error) {
135
	clients, err := c.resolver.Resolve(d)
136
	if err != nil {
137
		return nil, fmt.Errorf("resolve clients: %s", err)
138
	}
139
	for _, client := range clients {
140
		mi, err = client.GetMetaInfo(namespace, d)
141
		// Do not try the next replica on 202 errors.
142
		if err != nil && !httputil.IsAccepted(err) {
143
			continue
144
		}
145
		break
146
	}
147
	return mi, err
148
}
149

150
// Stat checks availability of a blob in the cluster.
151
func (c *clusterClient) Stat(namespace string, d core.Digest) (bi *core.BlobInfo, err error) {
152
	clients, err := c.resolver.Resolve(d)
153
	if err != nil {
154
		return nil, fmt.Errorf("resolve clients: %s", err)
155
	}
156

157
	shuffle(clients)
158
	for _, client := range clients {
159
		bi, err = client.Stat(namespace, d)
160
		if err != nil {
161
			continue
162
		}
163
		break
164
	}
165

166
	return bi, err
167
}
168

169
// OverwriteMetaInfo overwrites existing metainfo for d with new metainfo configured
170
// with pieceLength on every origin server. Returns error if any origin was unable
171
// to overwrite metainfo. Primarly intended for benchmarking purposes.
172
func (c *clusterClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error {
173
	clients, err := c.resolver.Resolve(d)
174
	if err != nil {
175
		return fmt.Errorf("resolve clients: %s", err)
176
	}
177
	var errs []error
178
	for _, client := range clients {
179
		if err := client.OverwriteMetaInfo(d, pieceLength); err != nil {
180
			errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
181
		}
182
	}
183
	return errutil.Join(errs)
184
}
185

186
// DownloadBlob pulls a blob from the origin cluster.
187
func (c *clusterClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error {
188
	err := Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
189
		return client.DownloadBlob(namespace, d, dst)
190
	})
191
	if httputil.IsNotFound(err) {
192
		err = ErrBlobNotFound
193
	}
194
	return err
195
}
196

197
// Owners returns the origin peers which own d.
198
func (c *clusterClient) Owners(d core.Digest) ([]core.PeerContext, error) {
199
	clients, err := c.resolver.Resolve(d)
200
	if err != nil {
201
		return nil, fmt.Errorf("resolve clients: %s", err)
202
	}
203

204
	var mu sync.Mutex
205
	var peers []core.PeerContext
206
	var errs []error
207

208
	var wg sync.WaitGroup
209
	for _, client := range clients {
210
		wg.Add(1)
211
		go func(client Client) {
212
			defer wg.Done()
213
			pctx, err := client.GetPeerContext()
214
			mu.Lock()
215
			if err != nil {
216
				errs = append(errs, err)
217
			} else {
218
				peers = append(peers, pctx)
219
			}
220
			mu.Unlock()
221
		}(client)
222
	}
223
	wg.Wait()
224

225
	err = errutil.Join(errs)
226

227
	if len(peers) == 0 {
228
		if err != nil {
229
			return nil, err
230
		}
231
		return nil, errors.New("no origin peers found")
232
	}
233

234
	if err != nil {
235
		log.With("blob", d.Hex()).Errorf("Error getting all origin peers: %s", err)
236
	}
237
	return peers, nil
238
}
239

240
// ReplicateToRemote replicates d to a remote origin cluster.
241
func (c *clusterClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
242
	// Re-use download backoff since replicate may download blobs.
243
	return Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
244
		return client.ReplicateToRemote(namespace, d, remoteDNS)
245
	})
246
}
247

248
func shuffle(cs []Client) {
249
	for i := range cs {
250
		j := rand.Intn(i + 1)
251
		cs[i], cs[j] = cs[j], cs[i]
252
	}
253
}
254

255
// Poll wraps requests for endpoints which require polling, due to a blob
256
// being asynchronously fetched from remote storage in the origin cluster.
257
func Poll(
258
	r ClientResolver, b backoff.BackOff, d core.Digest, makeRequest func(Client) error) error {
259

260
	// By looping over clients in order, we will always prefer the same origin
261
	// for making requests to loosely guarantee that only one origin needs to
262
	// fetch the file from remote backend.
263
	clients, err := r.Resolve(d)
264
	if err != nil {
265
		return fmt.Errorf("resolve clients: %s", err)
266
	}
267
	var errs []error
268
ORIGINS:
269
	for _, client := range clients {
270
		b.Reset()
271
	POLL:
272
		for {
273
			if err := makeRequest(client); err != nil {
274
				if serr, ok := err.(httputil.StatusError); ok {
275
					if serr.Status == http.StatusAccepted {
276
						d := b.NextBackOff()
277
						if d == backoff.Stop {
278
							break POLL // Backoff timed out.
279
						}
280
						time.Sleep(d)
281
						continue POLL
282
					}
283
					if serr.Status < 500 {
284
						return err
285
					}
286
				}
287
				errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
288
				continue ORIGINS
289
			}
290
			return nil // Success!
291
		}
292
		errs = append(errs,
293
			fmt.Errorf("origin %s: backoff timed out on 202 responses", client.Addr()))
294
	}
295
	return fmt.Errorf("all origins unavailable: %s", errutil.Join(errs))
296
}
297

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.