kraken
296 строк · 8.4 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobclient
15
16import (
17"errors"
18"fmt"
19"io"
20"math/rand"
21"net/http"
22"sync"
23"time"
24
25"github.com/cenkalti/backoff"
26
27"github.com/uber/kraken/core"
28"github.com/uber/kraken/lib/hostlist"
29"github.com/uber/kraken/utils/errutil"
30"github.com/uber/kraken/utils/httputil"
31"github.com/uber/kraken/utils/log"
32)
33
34// Locations queries cluster for the locations of d.
35func Locations(p Provider, cluster hostlist.List, d core.Digest) (locs []string, err error) {
36addrs := cluster.Resolve().Sample(3)
37if len(addrs) == 0 {
38return nil, errors.New("cluster is empty")
39}
40for addr := range addrs {
41locs, err = p.Provide(addr).Locations(d)
42if err != nil {
43continue
44}
45break
46}
47return locs, err
48}
49
50// ClientResolver resolves digests into Clients of origins.
51type ClientResolver interface {
52// Resolve must return an ordered, stable list of Clients for origins owning d.
53Resolve(d core.Digest) ([]Client, error)
54}
55
56type clientResolver struct {
57provider Provider
58cluster hostlist.List
59}
60
61// NewClientResolver returns a new client resolver.
62func NewClientResolver(p Provider, cluster hostlist.List) ClientResolver {
63return &clientResolver{p, cluster}
64}
65
66func (r *clientResolver) Resolve(d core.Digest) ([]Client, error) {
67locs, err := Locations(r.provider, r.cluster, d)
68if err != nil {
69return nil, err
70}
71var clients []Client
72for _, loc := range locs {
73clients = append(clients, r.provider.Provide(loc))
74}
75return clients, nil
76}
77
78// ClusterClient defines a top-level origin cluster client which handles blob
79// location resolution and retries.
80type ClusterClient interface {
81UploadBlob(namespace string, d core.Digest, blob io.Reader) error
82DownloadBlob(namespace string, d core.Digest, dst io.Writer) error
83GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
84Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
85OverwriteMetaInfo(d core.Digest, pieceLength int64) error
86Owners(d core.Digest) ([]core.PeerContext, error)
87ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error
88}
89
90type clusterClient struct {
91resolver ClientResolver
92}
93
94// NewClusterClient returns a new ClusterClient.
95func NewClusterClient(r ClientResolver) ClusterClient {
96return &clusterClient{r}
97}
98
99// defaultPollBackOff returns the default backoff used on Poll operations.
100func (c *clusterClient) defaultPollBackOff() backoff.BackOff {
101return &backoff.ExponentialBackOff{
102InitialInterval: time.Second,
103RandomizationFactor: 0.05,
104Multiplier: 1.3,
105MaxInterval: 5 * time.Second,
106MaxElapsedTime: 15 * time.Minute,
107Clock: backoff.SystemClock,
108}
109}
110
111// UploadBlob uploads blob to origin cluster. See Client.UploadBlob for more details.
112func (c *clusterClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) (err error) {
113clients, err := c.resolver.Resolve(d)
114if err != nil {
115return fmt.Errorf("resolve clients: %s", err)
116}
117
118// We prefer the origin with highest hashing score so the first origin will handle
119// replication to origins with lower score. This is because we want to reduce upload
120// conflicts between local replicas.
121for _, client := range clients {
122err = client.UploadBlob(namespace, d, blob)
123// Allow retry on another origin if the current upstream is temporarily
124// unavailable or under high load.
125if httputil.IsNetworkError(err) || httputil.IsRetryable(err) {
126continue
127}
128break
129}
130return err
131}
132
133// GetMetaInfo returns the metainfo for d. Does not handle polling.
134func (c *clusterClient) GetMetaInfo(namespace string, d core.Digest) (mi *core.MetaInfo, err error) {
135clients, err := c.resolver.Resolve(d)
136if err != nil {
137return nil, fmt.Errorf("resolve clients: %s", err)
138}
139for _, client := range clients {
140mi, err = client.GetMetaInfo(namespace, d)
141// Do not try the next replica on 202 errors.
142if err != nil && !httputil.IsAccepted(err) {
143continue
144}
145break
146}
147return mi, err
148}
149
150// Stat checks availability of a blob in the cluster.
151func (c *clusterClient) Stat(namespace string, d core.Digest) (bi *core.BlobInfo, err error) {
152clients, err := c.resolver.Resolve(d)
153if err != nil {
154return nil, fmt.Errorf("resolve clients: %s", err)
155}
156
157shuffle(clients)
158for _, client := range clients {
159bi, err = client.Stat(namespace, d)
160if err != nil {
161continue
162}
163break
164}
165
166return bi, err
167}
168
169// OverwriteMetaInfo overwrites existing metainfo for d with new metainfo configured
170// with pieceLength on every origin server. Returns error if any origin was unable
171// to overwrite metainfo. Primarly intended for benchmarking purposes.
172func (c *clusterClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error {
173clients, err := c.resolver.Resolve(d)
174if err != nil {
175return fmt.Errorf("resolve clients: %s", err)
176}
177var errs []error
178for _, client := range clients {
179if err := client.OverwriteMetaInfo(d, pieceLength); err != nil {
180errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
181}
182}
183return errutil.Join(errs)
184}
185
186// DownloadBlob pulls a blob from the origin cluster.
187func (c *clusterClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error {
188err := Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
189return client.DownloadBlob(namespace, d, dst)
190})
191if httputil.IsNotFound(err) {
192err = ErrBlobNotFound
193}
194return err
195}
196
197// Owners returns the origin peers which own d.
198func (c *clusterClient) Owners(d core.Digest) ([]core.PeerContext, error) {
199clients, err := c.resolver.Resolve(d)
200if err != nil {
201return nil, fmt.Errorf("resolve clients: %s", err)
202}
203
204var mu sync.Mutex
205var peers []core.PeerContext
206var errs []error
207
208var wg sync.WaitGroup
209for _, client := range clients {
210wg.Add(1)
211go func(client Client) {
212defer wg.Done()
213pctx, err := client.GetPeerContext()
214mu.Lock()
215if err != nil {
216errs = append(errs, err)
217} else {
218peers = append(peers, pctx)
219}
220mu.Unlock()
221}(client)
222}
223wg.Wait()
224
225err = errutil.Join(errs)
226
227if len(peers) == 0 {
228if err != nil {
229return nil, err
230}
231return nil, errors.New("no origin peers found")
232}
233
234if err != nil {
235log.With("blob", d.Hex()).Errorf("Error getting all origin peers: %s", err)
236}
237return peers, nil
238}
239
240// ReplicateToRemote replicates d to a remote origin cluster.
241func (c *clusterClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
242// Re-use download backoff since replicate may download blobs.
243return Poll(c.resolver, c.defaultPollBackOff(), d, func(client Client) error {
244return client.ReplicateToRemote(namespace, d, remoteDNS)
245})
246}
247
248func shuffle(cs []Client) {
249for i := range cs {
250j := rand.Intn(i + 1)
251cs[i], cs[j] = cs[j], cs[i]
252}
253}
254
255// Poll wraps requests for endpoints which require polling, due to a blob
256// being asynchronously fetched from remote storage in the origin cluster.
257func Poll(
258r ClientResolver, b backoff.BackOff, d core.Digest, makeRequest func(Client) error) error {
259
260// By looping over clients in order, we will always prefer the same origin
261// for making requests to loosely guarantee that only one origin needs to
262// fetch the file from remote backend.
263clients, err := r.Resolve(d)
264if err != nil {
265return fmt.Errorf("resolve clients: %s", err)
266}
267var errs []error
268ORIGINS:
269for _, client := range clients {
270b.Reset()
271POLL:
272for {
273if err := makeRequest(client); err != nil {
274if serr, ok := err.(httputil.StatusError); ok {
275if serr.Status == http.StatusAccepted {
276d := b.NextBackOff()
277if d == backoff.Stop {
278break POLL // Backoff timed out.
279}
280time.Sleep(d)
281continue POLL
282}
283if serr.Status < 500 {
284return err
285}
286}
287errs = append(errs, fmt.Errorf("origin %s: %s", client.Addr(), err))
288continue ORIGINS
289}
290return nil // Success!
291}
292errs = append(errs,
293fmt.Errorf("origin %s: backoff timed out on 202 responses", client.Addr()))
294}
295return fmt.Errorf("all origins unavailable: %s", errutil.Join(errs))
296}
297