kraken

Форк
0
/
uploader.go 
192 строки · 5.1 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package blobclient
15

16
import (
17
	"bytes"
18
	"crypto/tls"
19
	"encoding/json"
20
	"errors"
21
	"fmt"
22
	"io"
23
	"net/url"
24
	"time"
25

26
	"github.com/uber/kraken/core"
27
	"github.com/uber/kraken/utils/httputil"
28
)
29

30
// uploader provides methods for executing a chunked upload.
31
type uploader interface {
32
	start(d core.Digest) (uid string, err error)
33
	patch(d core.Digest, uid string, start, stop int64, chunk io.Reader) error
34
	commit(d core.Digest, uid string) error
35
}
36

37
func runChunkedUpload(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error {
38
	if err := runChunkedUploadHelper(u, d, blob, chunkSize); err != nil && !httputil.IsConflict(err) {
39
		return err
40
	}
41
	return nil
42
}
43

44
func runChunkedUploadHelper(u uploader, d core.Digest, blob io.Reader, chunkSize int64) error {
45
	uid, err := u.start(d)
46
	if err != nil {
47
		return err
48
	}
49
	var pos int64
50
	buf := make([]byte, chunkSize)
51
	for {
52
		n, err := blob.Read(buf)
53
		if err != nil {
54
			if err == io.EOF {
55
				break
56
			}
57
			return fmt.Errorf("read blob: %s", err)
58
		}
59
		chunk := bytes.NewReader(buf[:n])
60
		stop := pos + int64(n)
61
		if err := u.patch(d, uid, pos, stop, chunk); err != nil {
62
			return err
63
		}
64
		pos = stop
65
	}
66
	return u.commit(d, uid)
67
}
68

69
// transferClient executes chunked uploads for internal blob transfers.
70
type transferClient struct {
71
	addr string
72
	tls  *tls.Config
73
}
74

75
func newTransferClient(addr string, tls *tls.Config) *transferClient {
76
	return &transferClient{addr, tls}
77
}
78

79
func (c *transferClient) start(d core.Digest) (uid string, err error) {
80
	r, err := httputil.Post(
81
		fmt.Sprintf("http://%s/internal/blobs/%s/uploads", c.addr, d),
82
		httputil.SendTLS(c.tls))
83
	if err != nil {
84
		return "", err
85
	}
86
	uid = r.Header.Get("Location")
87
	if uid == "" {
88
		return "", errors.New("request succeeded, but Location header not set")
89
	}
90
	return uid, nil
91
}
92

93
func (c *transferClient) patch(
94
	d core.Digest, uid string, start, stop int64, chunk io.Reader) error {
95

96
	_, err := httputil.Patch(
97
		fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid),
98
		httputil.SendBody(chunk),
99
		httputil.SendHeaders(map[string]string{
100
			"Content-Range": fmt.Sprintf("%d-%d", start, stop),
101
		}),
102
		httputil.SendTLS(c.tls))
103
	return err
104
}
105

106
func (c *transferClient) commit(d core.Digest, uid string) error {
107
	_, err := httputil.Put(
108
		fmt.Sprintf("http://%s/internal/blobs/%s/uploads/%s", c.addr, d, uid),
109
		httputil.SendTimeout(15*time.Minute),
110
		httputil.SendTLS(c.tls))
111
	return err
112
}
113

114
type uploadType int
115

116
const (
117
	_publicUpload = iota + 1
118
	_duplicateUpload
119
)
120

121
// uploadClient executes chunked uploads for external cluster upload operations.
122
type uploadClient struct {
123
	addr       string
124
	namespace  string
125
	uploadType uploadType
126
	delay      time.Duration
127
	tls        *tls.Config
128
}
129

130
func newUploadClient(
131
	addr string, namespace string, t uploadType, delay time.Duration, tls *tls.Config) *uploadClient {
132

133
	return &uploadClient{addr, namespace, t, delay, tls}
134
}
135

136
func (c *uploadClient) start(d core.Digest) (uid string, err error) {
137
	r, err := httputil.Post(
138
		fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads",
139
			c.addr, url.PathEscape(c.namespace), d),
140
		httputil.SendTLS(c.tls))
141
	if err != nil {
142
		return "", err
143
	}
144
	uid = r.Header.Get("Location")
145
	if uid == "" {
146
		return "", errors.New("request succeeded, but Location header not set")
147
	}
148
	return uid, nil
149
}
150

151
func (c *uploadClient) patch(
152
	d core.Digest, uid string, start, stop int64, chunk io.Reader) error {
153

154
	_, err := httputil.Patch(
155
		fmt.Sprintf("http://%s/namespace/%s/blobs/%s/uploads/%s",
156
			c.addr, url.PathEscape(c.namespace), d, uid),
157
		httputil.SendBody(chunk),
158
		httputil.SendHeaders(map[string]string{
159
			"Content-Range": fmt.Sprintf("%d-%d", start, stop),
160
		}),
161
		httputil.SendTLS(c.tls))
162
	return err
163
}
164

165
// DuplicateCommitUploadRequest defines HTTP request body.
166
type DuplicateCommitUploadRequest struct {
167
	Delay time.Duration `yaml:"delay"`
168
}
169

170
func (c *uploadClient) commit(d core.Digest, uid string) error {
171
	var template string
172
	var body io.Reader
173
	switch c.uploadType {
174
	case _publicUpload:
175
		template = "http://%s/namespace/%s/blobs/%s/uploads/%s"
176
	case _duplicateUpload:
177
		template = "http://%s/internal/duplicate/namespace/%s/blobs/%s/uploads/%s"
178
		b, err := json.Marshal(DuplicateCommitUploadRequest{c.delay})
179
		if err != nil {
180
			return fmt.Errorf("json: %s", err)
181
		}
182
		body = bytes.NewBuffer(b)
183
	default:
184
		return fmt.Errorf("unknown upload type: %d", c.uploadType)
185
	}
186
	_, err := httputil.Put(
187
		fmt.Sprintf(template, c.addr, url.PathEscape(c.namespace), d, uid),
188
		httputil.SendTimeout(15*time.Minute),
189
		httputil.SendBody(body),
190
		httputil.SendTLS(c.tls))
191
	return err
192
}
193

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.