kraken
285 строк · 8.7 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobclient
15
16import (
17"crypto/tls"
18"encoding/json"
19"errors"
20"fmt"
21"io"
22"io/ioutil"
23"math"
24"net/http"
25"net/url"
26"strconv"
27"strings"
28"time"
29
30"github.com/uber/kraken/core"
31"github.com/uber/kraken/utils/httputil"
32"github.com/uber/kraken/utils/memsize"
33)
34
35// Client provides a wrapper around all Server HTTP endpoints.
36type Client interface {
37Addr() string
38
39Locations(d core.Digest) ([]string, error)
40DeleteBlob(d core.Digest) error
41TransferBlob(d core.Digest, blob io.Reader) error
42
43Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
44StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error)
45
46GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
47OverwriteMetaInfo(d core.Digest, pieceLength int64) error
48
49UploadBlob(namespace string, d core.Digest, blob io.Reader) error
50DuplicateUploadBlob(namespace string, d core.Digest, blob io.Reader, delay time.Duration) error
51
52DownloadBlob(namespace string, d core.Digest, dst io.Writer) error
53
54ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error
55
56GetPeerContext() (core.PeerContext, error)
57
58ForceCleanup(ttl time.Duration) error
59}
60
61// HTTPClient defines the Client implementation.
62type HTTPClient struct {
63addr string
64chunkSize uint64
65tls *tls.Config
66}
67
68// Option allows setting optional HTTPClient parameters.
69type Option func(*HTTPClient)
70
71// WithChunkSize configures an HTTPClient with a custom chunk size for uploads.
72func WithChunkSize(s uint64) Option {
73return func(c *HTTPClient) { c.chunkSize = s }
74}
75
76// WithTLS configures an HTTPClient with tls configuration.
77func WithTLS(tls *tls.Config) Option {
78return func(c *HTTPClient) { c.tls = tls }
79}
80
81// New returns a new HTTPClient scoped to addr.
82func New(addr string, opts ...Option) *HTTPClient {
83c := &HTTPClient{
84addr: addr,
85chunkSize: 32 * memsize.MB,
86}
87for _, opt := range opts {
88opt(c)
89}
90return c
91}
92
93// Addr returns the address of the server the client is provisioned for.
94func (c *HTTPClient) Addr() string {
95return c.addr
96}
97
98// Locations returns the origin server addresses which d is sharded on.
99func (c *HTTPClient) Locations(d core.Digest) ([]string, error) {
100r, err := httputil.Get(
101fmt.Sprintf("http://%s/blobs/%s/locations", c.addr, d),
102httputil.SendTimeout(5*time.Second),
103httputil.SendTLS(c.tls))
104if err != nil {
105return nil, err
106}
107locs := strings.Split(r.Header.Get("Origin-Locations"), ",")
108if len(locs) == 0 {
109return nil, errors.New("no locations found")
110}
111return locs, nil
112}
113
114// Stat returns blob info. It returns error if the origin does not have a blob
115// for d.
116func (c *HTTPClient) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
117return c.stat(namespace, d, false)
118}
119
120// StatLocal returns blob info. It returns error if the origin does not have a blob
121// for d locally.
122func (c *HTTPClient) StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error) {
123return c.stat(namespace, d, true)
124}
125
126func (c *HTTPClient) stat(namespace string, d core.Digest, local bool) (*core.BlobInfo, error) {
127u := fmt.Sprintf(
128"http://%s/internal/namespace/%s/blobs/%s",
129c.addr,
130url.PathEscape(namespace),
131d)
132if local {
133u += "?local=true"
134}
135
136r, err := httputil.Head(
137u,
138httputil.SendTimeout(15*time.Second),
139httputil.SendTLS(c.tls))
140if err != nil {
141if httputil.IsNotFound(err) {
142return nil, ErrBlobNotFound
143}
144return nil, err
145}
146var size int64
147hdr := r.Header.Get("Content-Length")
148if hdr != "" {
149size, err = strconv.ParseInt(hdr, 10, 64)
150if err != nil {
151return nil, err
152}
153}
154return core.NewBlobInfo(size), nil
155}
156
157// DeleteBlob deletes the blob corresponding to d.
158func (c *HTTPClient) DeleteBlob(d core.Digest) error {
159_, err := httputil.Delete(
160fmt.Sprintf("http://%s/internal/blobs/%s", c.addr, d),
161httputil.SendAcceptedCodes(http.StatusAccepted),
162httputil.SendTLS(c.tls))
163return err
164}
165
166// TransferBlob uploads a blob to a single origin server. Unlike its cousin UploadBlob,
167// TransferBlob is an internal API which does not replicate the blob.
168func (c *HTTPClient) TransferBlob(d core.Digest, blob io.Reader) error {
169tc := newTransferClient(c.addr, c.tls)
170return runChunkedUpload(tc, d, blob, int64(c.chunkSize))
171}
172
173// UploadBlob uploads and replicates blob to the origin cluster, asynchronously
174// backing the blob up to the remote storage configured for namespace.
175func (c *HTTPClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) error {
176uc := newUploadClient(c.addr, namespace, _publicUpload, 0, c.tls)
177return runChunkedUpload(uc, d, blob, int64(c.chunkSize))
178}
179
180// DuplicateUploadBlob duplicates an blob upload request, which will attempt to
181// write-back at the given delay.
182func (c *HTTPClient) DuplicateUploadBlob(
183namespace string, d core.Digest, blob io.Reader, delay time.Duration) error {
184
185uc := newUploadClient(c.addr, namespace, _duplicateUpload, delay, c.tls)
186return runChunkedUpload(uc, d, blob, int64(c.chunkSize))
187}
188
189// DownloadBlob downloads blob for d. If the blob of d is not available yet
190// (i.e. still downloading), returns 202 httputil.StatusError, indicating that
191// the request shoudl be retried later. If not blob exists for d, returns a 404
192// httputil.StatusError.
193func (c *HTTPClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error {
194r, err := httputil.Get(
195fmt.Sprintf("http://%s/namespace/%s/blobs/%s", c.addr, url.PathEscape(namespace), d),
196httputil.SendTLS(c.tls))
197if err != nil {
198return err
199}
200defer r.Body.Close()
201if _, err := io.Copy(dst, r.Body); err != nil {
202return fmt.Errorf("copy body: %s", err)
203}
204return nil
205}
206
207// ReplicateToRemote replicates the blob of d to a remote origin cluster. If the
208// blob of d is not available yet, returns 202 httputil.StatusError, indicating
209// that the request should be retried later.
210func (c *HTTPClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
211_, err := httputil.Post(
212fmt.Sprintf("http://%s/namespace/%s/blobs/%s/remote/%s",
213c.addr, url.PathEscape(namespace), d, remoteDNS),
214httputil.SendTLS(c.tls))
215return err
216}
217
218// GetMetaInfo returns metainfo for d. If the blob of d is not available yet
219// (i.e. still downloading), returns a 202 httputil.StatusError, indicating that
220// the request should be retried later. If no blob exists for d, returns a 404
221// httputil.StatusError.
222func (c *HTTPClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) {
223r, err := httputil.Get(
224fmt.Sprintf("http://%s/internal/namespace/%s/blobs/%s/metainfo",
225c.addr, url.PathEscape(namespace), d),
226httputil.SendTimeout(15*time.Second),
227httputil.SendTLS(c.tls))
228if err != nil {
229return nil, err
230}
231defer r.Body.Close()
232raw, err := ioutil.ReadAll(r.Body)
233if err != nil {
234return nil, fmt.Errorf("read body: %s", err)
235}
236mi, err := core.DeserializeMetaInfo(raw)
237if err != nil {
238return nil, fmt.Errorf("deserialize metainfo: %s", err)
239}
240return mi, nil
241}
242
243// OverwriteMetaInfo overwrites existing metainfo for d with new metainfo
244// configured with pieceLength. Primarily intended for benchmarking purposes.
245func (c *HTTPClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error {
246_, err := httputil.Post(
247fmt.Sprintf("http://%s/internal/blobs/%s/metainfo?piece_length=%d", c.addr, d, pieceLength),
248httputil.SendTLS(c.tls))
249return err
250}
251
252// GetPeerContext gets the PeerContext of the p2p client running alongside the Server.
253func (c *HTTPClient) GetPeerContext() (core.PeerContext, error) {
254var pctx core.PeerContext
255r, err := httputil.Get(
256fmt.Sprintf("http://%s/internal/peercontext", c.addr),
257httputil.SendTimeout(5*time.Second),
258httputil.SendTLS(c.tls))
259if err != nil {
260return pctx, err
261}
262defer r.Body.Close()
263if err := json.NewDecoder(r.Body).Decode(&pctx); err != nil {
264return pctx, err
265}
266return pctx, nil
267}
268
269// ForceCleanup forces cache cleanup to run.
270func (c *HTTPClient) ForceCleanup(ttl time.Duration) error {
271v := url.Values{}
272v.Add("ttl_hr", strconv.Itoa(int(math.Ceil(float64(ttl)/float64(time.Hour)))))
273_, err := httputil.Post(
274fmt.Sprintf("http://%s/forcecleanup?%s", c.addr, v.Encode()),
275httputil.SendTimeout(2*time.Minute),
276httputil.SendTLS(c.tls))
277return err
278}
279
280func min(a, b int64) int64 {
281if a < b {
282return a
283}
284return b
285}
286