kraken
192 строки · 5.1 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobclient
15
16import (
17"bytes"
18"crypto/tls"
19"encoding/json"
20"errors"
21"fmt"
22"io"
23"net/url"
24"time"
25
26"github.com/uber/kraken/core"
27"github.com/uber/kraken/utils/httputil"
28)
29
30// uploader provides methods for executing a chunked upload.
31type uploader interface {
32start(d core.Digest) (uid string, err error)
33patch(d core.Digest, uid string, start, stop int64, chunk io.Reader) error
34commit(d core.Digest, uid string) error
35}
36
37func runChunkedUpload(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error {
38if err := runChunkedUploadHelper(u, d, blob, chunkSize); err != nil && !httputil.IsConflict(err) {
39return err
40}
41return nil
42}
43
44func runChunkedUploadHelper(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error {
45uid, err := u.start(d)
46if err != nil {
47return err
48}
49var pos int64
50buf := make([]byte, chunkSize)
51for {
52n, err := blob.Read(buf)
53if err != nil {
54if err == io.EOF {
55break
56}
57return fmt.Errorf("read blob: %s", err)
58}
59chunk := bytes.NewReader(buf[:n])
60stop := pos + int64(n)
61if err := u.patch(d, uid, pos, stop, chunk); err != nil {
62return err
63}
64pos = stop
65}
66return u.commit(d, uid)
67}
68
69// transferClient executes chunked uploads for internal blob transfers.
70type transferClient struct {
71addr string
72tls *tls.Config
73}
74
75func newTransferClient(addr string, tls *tls.Config) *transferClient {
76return &transferClient{addr, tls}
77}
78
79func (c *transferClient) start(d core.Digest) (uid string, err error) {
80r, err := httputil.Post(
81fmt.Sprintf("http://%s/internal/blobs/%s/uploads", c.addr, d),
82httputil.SendTLS(c.tls))
83if err != nil {
84return "", err
85}
86uid = r.Header.Get("Location")
87if uid == "" {
88return "", errors.New("request succeeded, but Location header not set")
89}
90return uid, nil
91}
92
93func (c *transferClient) patch(
94d core.Digest, uid string, start, stop int64, chunk io.Reader) error {
95
96_, err := httputil.Patch(
97fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid),
98httputil.SendBody(chunk),
99httputil.SendHeaders(map[string]string{
100"Content-Range": fmt.Sprintf("%d-%d", start, stop),
101}),
102httputil.SendTLS(c.tls))
103return err
104}
105
106func (c *transferClient) commit(d core.Digest, uid string) error {
107_, err := httputil.Put(
108fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid),
109httputil.SendTimeout(15*time.Minute),
110httputil.SendTLS(c.tls))
111return err
112}
113
114type uploadType int
115
116const (
117_publicUpload = iota + 1
118_duplicateUpload
119)
120
121// uploadClient executes chunked uploads for external cluster upload operations.
122type uploadClient struct {
123addr string
124namespace string
125uploadType uploadType
126delay time.Duration
127tls *tls.Config
128}
129
130func newUploadClient(
131addr string, namespace string, t uploadType, delay time.Duration, tls *tls.Config) *uploadClient {
132
133return &uploadClient{addr, namespace, t, delay, tls}
134}
135
136func (c *uploadClient) start(d core.Digest) (uid string, err error) {
137r, err := httputil.Post(
138fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads",
139c.addr, url.PathEscape(c.namespace), d),
140httputil.SendTLS(c.tls))
141if err != nil {
142return "", err
143}
144uid = r.Header.Get("Location")
145if uid == "" {
146return "", errors.New("request succeeded, but Location header not set")
147}
148return uid, nil
149}
150
151func (c *uploadClient) patch(
152d core.Digest, uid string, start, stop int64, chunk io.Reader) error {
153
154_, err := httputil.Patch(
155fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads/%s",
156c.addr, url.PathEscape(c.namespace), d, uid),
157httputil.SendBody(chunk),
158httputil.SendHeaders(map[string]string{
159"Content-Range": fmt.Sprintf("%d-%d", start, stop),
160}),
161httputil.SendTLS(c.tls))
162return err
163}
164
165// DuplicateCommitUploadRequest defines HTTP request body.
166type DuplicateCommitUploadRequest struct {
167Delay time.Duration `yaml:"delay"`
168}
169
170func (c *uploadClient) commit(d core.Digest, uid string) error {
171var template string
172var body io.Reader
173switch c.uploadType {
174case _publicUpload:
175template = "http://%s/namespace/%s/blobs/%s/uploads/%s"
176case _duplicateUpload:
177template = "http://%s/internal/duplicate/namespace/%s/blobs/%s/uploads/%s"
178b, err := json.Marshal(DuplicateCommitUploadRequest{c.delay})
179if err != nil {
180return fmt.Errorf("json: %s", err)
181}
182body = bytes.NewBuffer(b)
183default:
184return fmt.Errorf("unknown upload type: %d", c.uploadType)
185}
186_, err := httputil.Put(
187fmt.Sprintf(template, c.addr, url.PathEscape(c.namespace), d, uid),
188httputil.SendTimeout(15*time.Minute),
189httputil.SendBody(body),
190httputil.SendTLS(c.tls))
191return err
192}
193