kraken

Форк
0
285 строк · 8.7 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package blobclient
15

16
import (
17
	"crypto/tls"
18
	"encoding/json"
19
	"errors"
20
	"fmt"
21
	"io"
22
	"io/ioutil"
23
	"math"
24
	"net/http"
25
	"net/url"
26
	"strconv"
27
	"strings"
28
	"time"
29

30
	"github.com/uber/kraken/core"
31
	"github.com/uber/kraken/utils/httputil"
32
	"github.com/uber/kraken/utils/memsize"
33
)
34

35
// Client provides a wrapper around all Server HTTP endpoints.
36
type Client interface {
37
	Addr() string
38

39
	Locations(d core.Digest) ([]string, error)
40
	DeleteBlob(d core.Digest) error
41
	TransferBlob(d core.Digest, blob io.Reader) error
42

43
	Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
44
	StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error)
45

46
	GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
47
	OverwriteMetaInfo(d core.Digest, pieceLength int64) error
48

49
	UploadBlob(namespace string, d core.Digest, blob io.Reader) error
50
	DuplicateUploadBlob(namespace string, d core.Digest, blob io.Reader, delay time.Duration) error
51

52
	DownloadBlob(namespace string, d core.Digest, dst io.Writer) error
53

54
	ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error
55

56
	GetPeerContext() (core.PeerContext, error)
57

58
	ForceCleanup(ttl time.Duration) error
59
}
60

61
// HTTPClient defines the Client implementation.
62
type HTTPClient struct {
63
	addr      string
64
	chunkSize uint64
65
	tls       *tls.Config
66
}
67

68
// Option allows setting optional HTTPClient parameters.
69
type Option func(*HTTPClient)
70

71
// WithChunkSize configures an HTTPClient with a custom chunk size for uploads.
72
func WithChunkSize(s uint64) Option {
73
	return func(c *HTTPClient) { c.chunkSize = s }
74
}
75

76
// WithTLS configures an HTTPClient with tls configuration.
77
func WithTLS(tls *tls.Config) Option {
78
	return func(c *HTTPClient) { c.tls = tls }
79
}
80

81
// New returns a new HTTPClient scoped to addr.
82
func New(addr string, opts ...Option) *HTTPClient {
83
	c := &HTTPClient{
84
		addr:      addr,
85
		chunkSize: 32 * memsize.MB,
86
	}
87
	for _, opt := range opts {
88
		opt(c)
89
	}
90
	return c
91
}
92

93
// Addr returns the address of the server the client is provisioned for.
94
func (c *HTTPClient) Addr() string {
95
	return c.addr
96
}
97

98
// Locations returns the origin server addresses which d is sharded on.
99
func (c *HTTPClient) Locations(d core.Digest) ([]string, error) {
100
	r, err := httputil.Get(
101
		fmt.Sprintf("http://%s/blobs/%s/locations", c.addr, d),
102
		httputil.SendTimeout(5*time.Second),
103
		httputil.SendTLS(c.tls))
104
	if err != nil {
105
		return nil, err
106
	}
107
	locs := strings.Split(r.Header.Get("Origin-Locations"), ",")
108
	if len(locs) == 0 {
109
		return nil, errors.New("no locations found")
110
	}
111
	return locs, nil
112
}
113

114
// Stat returns blob info. It returns error if the origin does not have a blob
115
// for d.
116
func (c *HTTPClient) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
117
	return c.stat(namespace, d, false)
118
}
119

120
// StatLocal returns blob info. It returns error if the origin does not have a blob
121
// for d locally.
122
func (c *HTTPClient) StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error) {
123
	return c.stat(namespace, d, true)
124
}
125

126
func (c *HTTPClient) stat(namespace string, d core.Digest, local bool) (*core.BlobInfo, error) {
127
	u := fmt.Sprintf(
128
		"http://%s/internal/namespace/%s/blobs/%s",
129
		c.addr,
130
		url.PathEscape(namespace),
131
		d)
132
	if local {
133
		u += "?local=true"
134
	}
135

136
	r, err := httputil.Head(
137
		u,
138
		httputil.SendTimeout(15*time.Second),
139
		httputil.SendTLS(c.tls))
140
	if err != nil {
141
		if httputil.IsNotFound(err) {
142
			return nil, ErrBlobNotFound
143
		}
144
		return nil, err
145
	}
146
	var size int64
147
	hdr := r.Header.Get("Content-Length")
148
	if hdr != "" {
149
		size, err = strconv.ParseInt(hdr, 10, 64)
150
		if err != nil {
151
			return nil, err
152
		}
153
	}
154
	return core.NewBlobInfo(size), nil
155
}
156

157
// DeleteBlob deletes the blob corresponding to d.
158
func (c *HTTPClient) DeleteBlob(d core.Digest) error {
159
	_, err := httputil.Delete(
160
		fmt.Sprintf("http://%s/internal/blobs/%s", c.addr, d),
161
		httputil.SendAcceptedCodes(http.StatusAccepted),
162
		httputil.SendTLS(c.tls))
163
	return err
164
}
165

166
// TransferBlob uploads a blob to a single origin server. Unlike its cousin UploadBlob,
167
// TransferBlob is an internal API which does not replicate the blob.
168
func (c *HTTPClient) TransferBlob(d core.Digest, blob io.Reader) error {
169
	tc := newTransferClient(c.addr, c.tls)
170
	return runChunkedUpload(tc, d, blob, int64(c.chunkSize))
171
}
172

173
// UploadBlob uploads and replicates blob to the origin cluster, asynchronously
174
// backing the blob up to the remote storage configured for namespace.
175
func (c *HTTPClient) UploadBlob(namespace string, d core.Digest, blob io.Reader) error {
176
	uc := newUploadClient(c.addr, namespace, _publicUpload, 0, c.tls)
177
	return runChunkedUpload(uc, d, blob, int64(c.chunkSize))
178
}
179

180
// DuplicateUploadBlob duplicates an blob upload request, which will attempt to
181
// write-back at the given delay.
182
func (c *HTTPClient) DuplicateUploadBlob(
183
	namespace string, d core.Digest, blob io.Reader, delay time.Duration) error {
184

185
	uc := newUploadClient(c.addr, namespace, _duplicateUpload, delay, c.tls)
186
	return runChunkedUpload(uc, d, blob, int64(c.chunkSize))
187
}
188

189
// DownloadBlob downloads blob for d. If the blob of d is not available yet
190
// (i.e. still downloading), returns 202 httputil.StatusError, indicating that
191
// the request shoudl be retried later. If not blob exists for d, returns a 404
192
// httputil.StatusError.
193
func (c *HTTPClient) DownloadBlob(namespace string, d core.Digest, dst io.Writer) error {
194
	r, err := httputil.Get(
195
		fmt.Sprintf("http://%s/namespace/%s/blobs/%s", c.addr, url.PathEscape(namespace), d),
196
		httputil.SendTLS(c.tls))
197
	if err != nil {
198
		return err
199
	}
200
	defer r.Body.Close()
201
	if _, err := io.Copy(dst, r.Body); err != nil {
202
		return fmt.Errorf("copy body: %s", err)
203
	}
204
	return nil
205
}
206

207
// ReplicateToRemote replicates the blob of d to a remote origin cluster. If the
208
// blob of d is not available yet, returns 202 httputil.StatusError, indicating
209
// that the request should be retried later.
210
func (c *HTTPClient) ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
211
	_, err := httputil.Post(
212
		fmt.Sprintf("http://%s/namespace/%s/blobs/%s/remote/%s",
213
			c.addr, url.PathEscape(namespace), d, remoteDNS),
214
		httputil.SendTLS(c.tls))
215
	return err
216
}
217

218
// GetMetaInfo returns metainfo for d. If the blob of d is not available yet
219
// (i.e. still downloading), returns a 202 httputil.StatusError, indicating that
220
// the request should be retried later. If no blob exists for d, returns a 404
221
// httputil.StatusError.
222
func (c *HTTPClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) {
223
	r, err := httputil.Get(
224
		fmt.Sprintf("http://%s/internal/namespace/%s/blobs/%s/metainfo",
225
			c.addr, url.PathEscape(namespace), d),
226
		httputil.SendTimeout(15*time.Second),
227
		httputil.SendTLS(c.tls))
228
	if err != nil {
229
		return nil, err
230
	}
231
	defer r.Body.Close()
232
	raw, err := ioutil.ReadAll(r.Body)
233
	if err != nil {
234
		return nil, fmt.Errorf("read body: %s", err)
235
	}
236
	mi, err := core.DeserializeMetaInfo(raw)
237
	if err != nil {
238
		return nil, fmt.Errorf("deserialize metainfo: %s", err)
239
	}
240
	return mi, nil
241
}
242

243
// OverwriteMetaInfo overwrites existing metainfo for d with new metainfo
244
// configured with pieceLength. Primarily intended for benchmarking purposes.
245
func (c *HTTPClient) OverwriteMetaInfo(d core.Digest, pieceLength int64) error {
246
	_, err := httputil.Post(
247
		fmt.Sprintf("http://%s/internal/blobs/%s/metainfo?piece_length=%d", c.addr, d, pieceLength),
248
		httputil.SendTLS(c.tls))
249
	return err
250
}
251

252
// GetPeerContext gets the PeerContext of the p2p client running alongside the Server.
253
func (c *HTTPClient) GetPeerContext() (core.PeerContext, error) {
254
	var pctx core.PeerContext
255
	r, err := httputil.Get(
256
		fmt.Sprintf("http://%s/internal/peercontext", c.addr),
257
		httputil.SendTimeout(5*time.Second),
258
		httputil.SendTLS(c.tls))
259
	if err != nil {
260
		return pctx, err
261
	}
262
	defer r.Body.Close()
263
	if err := json.NewDecoder(r.Body).Decode(&pctx); err != nil {
264
		return pctx, err
265
	}
266
	return pctx, nil
267
}
268

269
// ForceCleanup forces cache cleanup to run.
270
func (c *HTTPClient) ForceCleanup(ttl time.Duration) error {
271
	v := url.Values{}
272
	v.Add("ttl_hr", strconv.Itoa(int(math.Ceil(float64(ttl)/float64(time.Hour)))))
273
	_, err := httputil.Post(
274
		fmt.Sprintf("http://%s/forcecleanup?%s", c.addr, v.Encode()),
275
		httputil.SendTimeout(2*time.Minute),
276
		httputil.SendTLS(c.tls))
277
	return err
278
}
279

280
func min(a, b int64) int64 {
281
	if a < b {
282
		return a
283
	}
284
	return b
285
}
286

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.