Dragonfly2

Форк
0
458 строк · 13.3 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package transport
18

19
import (
20
	"bytes"
21
	"context"
22
	"crypto/tls"
23
	"fmt"
24
	"io"
25
	"math"
26
	"net"
27
	"net/http"
28
	"net/http/httputil"
29
	"regexp"
30
	"strconv"
31
	"strings"
32
	"time"
33

34
	"github.com/go-http-utils/headers"
35
	"go.opentelemetry.io/otel"
36
	"go.opentelemetry.io/otel/propagation"
37
	"go.opentelemetry.io/otel/trace"
38
	"google.golang.org/grpc/status"
39

40
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
41
	errordetailsv1 "d7y.io/api/v2/pkg/apis/errordetails/v1"
42

43
	"d7y.io/dragonfly/v2/client/config"
44
	"d7y.io/dragonfly/v2/client/daemon/metrics"
45
	"d7y.io/dragonfly/v2/client/daemon/peer"
46
	logger "d7y.io/dragonfly/v2/internal/dflog"
47
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
48
)
49

50
var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
51

52
var (
53
	// layerReg the regex to determine if it is an image download
54
	layerReg     = regexp.MustCompile("^.+/blobs/sha256.*$")
55
	traceContext = propagation.TraceContext{}
56
)
57

58
// transport implements RoundTripper for dragonfly.
59
// It uses http.fileTransport to serve requests that need to use dragonfly,
60
// and uses http.Transport to serve the other requests.
61
type transport struct {
62
	// baseRoundTripper is an implementation of RoundTripper that supports HTTP
63
	baseRoundTripper http.RoundTripper
64

65
	// shouldUseDragonfly is used to determine to download resources with or without dragonfly
66
	shouldUseDragonfly func(req *http.Request) bool
67

68
	// peerTaskManager is the peer task manager
69
	peerTaskManager peer.TaskManager
70

71
	// defaultFilter is used when http request without X-Dragonfly-Filter Header
72
	defaultFilter string
73

74
	// defaultTag is used when http request without X-Dragonfly-Tag Header
75
	defaultTag string
76

77
	// defaultTag is used when http request without X-Dragonfly-Tag Header
78
	defaultApplication string
79

80
	// defaultPriority is used when http request without X-Dragonfly-Priority Header
81
	defaultPriority commonv1.Priority
82

83
	// dumpHTTPContent indicates to dump http request header and response header
84
	dumpHTTPContent bool
85

86
	peerIDGenerator peer.IDGenerator
87
}
88

89
// Option is functional config for transport.
90
type Option func(rt *transport) *transport
91

92
// WithPeerIDGenerator sets the peerIDGenerator for transport
93
func WithPeerIDGenerator(peerIDGenerator peer.IDGenerator) Option {
94
	return func(rt *transport) *transport {
95
		rt.peerIDGenerator = peerIDGenerator
96
		return rt
97
	}
98
}
99

100
// WithPeerTaskManager sets the peerTaskManager for transport
101
func WithPeerTaskManager(peerTaskManager peer.TaskManager) Option {
102
	return func(rt *transport) *transport {
103
		rt.peerTaskManager = peerTaskManager
104
		return rt
105
	}
106
}
107

108
// WithTLS configures TLS config used for http transport.
109
func WithTLS(cfg *tls.Config) Option {
110
	return func(rt *transport) *transport {
111
		rt.baseRoundTripper = defaultHTTPTransport(cfg)
112
		return rt
113
	}
114
}
115

116
// WithCondition configures how to decide whether to use dragonfly or not.
117
func WithCondition(c func(r *http.Request) bool) Option {
118
	return func(rt *transport) *transport {
119
		rt.shouldUseDragonfly = c
120
		return rt
121
	}
122
}
123

124
// WithDefaultFilter sets default filter for http requests with X-Dragonfly-Filter Header
125
func WithDefaultFilter(f string) Option {
126
	return func(rt *transport) *transport {
127
		rt.defaultFilter = f
128
		return rt
129
	}
130
}
131

132
// WithDefaultTag sets default tag for http requests with X-Dragonfly-Tag Header
133
func WithDefaultTag(b string) Option {
134
	return func(rt *transport) *transport {
135
		rt.defaultTag = b
136
		return rt
137
	}
138
}
139

140
// WithDefaultApplication sets default Application for http requests with X-Dragonfly-Application Header
141
func WithDefaultApplication(b string) Option {
142
	return func(rt *transport) *transport {
143
		rt.defaultApplication = b
144
		return rt
145
	}
146
}
147

148
// WithDefaultPriority sets default Priority for http requests with X-Dragonfly-Priority Header
149
func WithDefaultPriority(p commonv1.Priority) Option {
150
	return func(rt *transport) *transport {
151
		rt.defaultPriority = p
152
		return rt
153
	}
154

155
}
156

157
func WithDumpHTTPContent(b bool) Option {
158
	return func(rt *transport) *transport {
159
		rt.dumpHTTPContent = b
160
		return rt
161
	}
162
}
163

164
var tracer trace.Tracer
165

166
func init() {
167
	tracer = otel.Tracer("dfget-transport")
168
}
169

170
// New constructs a new instance of a RoundTripper with additional options.
171
func New(options ...Option) (http.RoundTripper, error) {
172
	rt := &transport{
173
		baseRoundTripper:   defaultHTTPTransport(nil),
174
		shouldUseDragonfly: NeedUseDragonfly,
175
	}
176

177
	for _, opt := range options {
178
		opt(rt)
179
	}
180

181
	return rt, nil
182
}
183

184
// RoundTrip only process first redirect at present
185
func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
186
	if rt.shouldUseDragonfly(req) {
187
		// delete the Accept-Encoding header to avoid returning the same cached
188
		// result for different requests
189
		req.Header.Del("Accept-Encoding")
190

191
		ctx := req.Context()
192
		if req.URL.Scheme == "https" {
193
			// for https, the trace info is in request header
194
			ctx = traceContext.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
195
		}
196

197
		logger.Debugf("round trip with dragonfly: %s", req.URL.String())
198
		metrics.ProxyRequestViaDragonflyCount.Add(1)
199
		resp, err = rt.download(ctx, req)
200
	} else {
201
		logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
202
		req.Host = req.URL.Host
203
		req.Header.Set("Host", req.Host)
204
		metrics.ProxyRequestNotViaDragonflyCount.Add(1)
205
		resp, err = rt.baseRoundTripper.RoundTrip(req)
206
	}
207

208
	if err != nil {
209
		logger.With("method", req.Method, "url", req.URL.String()).Errorf("round trip error: %s", err)
210
		return resp, err
211
	}
212

213
	if resp.ContentLength > 0 {
214
		metrics.ProxyRequestBytesCount.WithLabelValues(req.Method).Add(float64(resp.ContentLength))
215
	}
216

217
	rt.processDumpHTTPContent(req, resp)
218
	return resp, err
219
}
220

221
// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
222
// images layers with dragonfly.
223
func NeedUseDragonfly(req *http.Request) bool {
224
	return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path)
225
}
226

227
// download uses dragonfly to download.
228
// the ctx has span info from transport, did not use the ctx from request
229
func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Response, error) {
230
	url := req.URL.String()
231
	peerID := rt.peerIDGenerator.PeerID()
232

233
	ctx, span := tracer.Start(ctx, config.SpanTransport, trace.WithSpanKind(trace.SpanKindClient))
234
	defer span.End()
235
	logKV := []any{
236
		"peer", peerID, "component", "transport",
237
	}
238
	if span.SpanContext().TraceID().IsValid() {
239
		logKV = append(logKV, "trace", span.SpanContext().TraceID().String())
240
	}
241
	log := logger.With(logKV...)
242

243
	log.Infof("start download with url: %s", url)
244
	log.Debugf("request url: %s, with header: %#v", url, req.Header)
245

246
	// Init meta value
247
	meta := &commonv1.UrlMeta{Header: map[string]string{}}
248
	var rg *nethttp.Range
249

250
	// Set meta range's value
251
	if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 {
252
		rgs, err := nethttp.ParseRange(rangeHeader, math.MaxInt64)
253
		if err != nil {
254
			span.RecordError(err)
255
			return badRequest(req, err.Error())
256
		}
257
		if len(rgs) > 1 {
258
			// TODO support multiple range request
259
			return notImplemented(req, "multiple range is not supported")
260
		} else if len(rgs) == 0 {
261
			return requestedRangeNotSatisfiable(req, "zero range is not supported")
262
		}
263
		rg = &rgs[0]
264
		// range in dragonfly is without "bytes="
265
		meta.Range = strings.TrimPrefix(rangeHeader, "bytes=")
266
	}
267

268
	// Pick header's parameters
269
	filter := nethttp.PickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter)
270
	tag := nethttp.PickHeader(req.Header, config.HeaderDragonflyTag, rt.defaultTag)
271
	application := nethttp.PickHeader(req.Header, config.HeaderDragonflyApplication, rt.defaultApplication)
272
	var priority = rt.defaultPriority
273
	priorityString := nethttp.PickHeader(req.Header, config.HeaderDragonflyPriority, fmt.Sprintf("%d", rt.defaultPriority))
274
	priorityInt, err := strconv.ParseInt(priorityString, 10, 32)
275
	if err == nil {
276
		priority = commonv1.Priority(priorityInt)
277
	}
278

279
	// Delete hop-by-hop headers
280
	delHopHeaders(req.Header)
281

282
	meta.Header = nethttp.HeaderToMap(req.Header)
283
	meta.Tag = tag
284
	meta.Filter = filter
285
	meta.Application = application
286
	meta.Priority = priority
287

288
	body, attr, err := rt.peerTaskManager.StartStreamTask(
289
		ctx,
290
		&peer.StreamTaskRequest{
291
			URL:     url,
292
			URLMeta: meta,
293
			Range:   rg,
294
			PeerID:  peerID,
295
		},
296
	)
297
	if err != nil {
298
		log.Errorf("start stream task error: %v", err)
299
		// check underlay status code
300
		if st, ok := status.FromError(err); ok {
301
			for _, detail := range st.Details() {
302
				switch d := detail.(type) {
303
				case *errordetailsv1.SourceError:
304
					hdr := nethttp.MapToHeader(attr)
305
					for k, v := range d.Metadata.Header {
306
						hdr.Set(k, v)
307
					}
308
					resp := &http.Response{
309
						StatusCode: int(d.Metadata.StatusCode),
310
						Body:       io.NopCloser(bytes.NewBufferString(d.Metadata.Status)),
311
						Header:     hdr,
312
						Proto:      req.Proto,
313
						ProtoMajor: req.ProtoMajor,
314
						ProtoMinor: req.ProtoMinor,
315
					}
316
					log.Errorf("underlay response code: %d", d.Metadata.StatusCode)
317
					return resp, nil
318
				}
319
			}
320
		}
321
		// add more info for debugging
322
		if attr != nil {
323
			err = fmt.Errorf("task: %s\npeer: %s\nerror: %s",
324
				attr[config.HeaderDragonflyTask], attr[config.HeaderDragonflyPeer], err)
325
		}
326
		return nil, err
327
	}
328

329
	hdr := nethttp.MapToHeader(attr)
330
	log.Infof("download stream attribute: %v", hdr)
331

332
	var contentLength int64 = -1
333
	if l, ok := attr[headers.ContentLength]; ok {
334
		if i, e := strconv.ParseInt(l, 10, 64); e == nil {
335
			contentLength = i
336
		}
337
	}
338

339
	var status int
340
	if meta.Range == "" {
341
		status = http.StatusOK
342
	} else {
343
		status = http.StatusPartialContent
344
		if hdr.Get(headers.ContentRange) == "" && contentLength > 0 {
345
			value := fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+contentLength-1, rg.Start+contentLength)
346
			hdr.Set(headers.ContentRange, value)
347
		}
348
	}
349
	resp := &http.Response{
350
		StatusCode:    status,
351
		Body:          body,
352
		Header:        hdr,
353
		ContentLength: contentLength,
354

355
		Proto:      req.Proto,
356
		ProtoMajor: req.ProtoMajor,
357
		ProtoMinor: req.ProtoMinor,
358
	}
359
	return resp, nil
360
}
361

362
func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) {
363
	if !rt.dumpHTTPContent {
364
		return
365
	}
366
	if out, e := httputil.DumpRequest(req, false); e == nil {
367
		logger.Debugf("dump request in transport: %s", string(out))
368
	} else {
369
		logger.Errorf("dump request in transport error: %s", e)
370
	}
371
	if resp == nil {
372
		return
373
	}
374
	if out, e := httputil.DumpResponse(resp, false); e == nil {
375
		logger.Debugf("dump response in transport: %s", string(out))
376
	} else {
377
		logger.Errorf("dump response in transport error: %s", e)
378
	}
379
}
380

381
func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
382
	if cfg == nil {
383
		cfg = &tls.Config{InsecureSkipVerify: true}
384
	}
385

386
	return &http.Transport{
387
		DialContext: (&net.Dialer{
388
			Timeout:   10 * time.Second,
389
			KeepAlive: 30 * time.Second,
390
		}).DialContext,
391
		MaxIdleConns:          100,
392
		IdleConnTimeout:       90 * time.Second,
393
		TLSHandshakeTimeout:   10 * time.Second,
394
		ExpectContinueTimeout: 1 * time.Second,
395
		TLSClientConfig:       cfg,
396
	}
397
}
398

399
// Hop-by-hop headers. These are removed when sent to the backend.
400
// As of RFC 7230, hop-by-hop headers are required to appear in the
401
// Connection header field. These are the headers defined by the
402
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
403
// compatibility.
404
// copy from net/http/httputil/reverseproxy.go
405

406
// dragonfly need generate task id with header, need to remove some other headers
407
var hopHeaders = []string{
408
	"Connection",
409
	"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
410
	"Keep-Alive",
411
	"Proxy-Authenticate",
412
	"Proxy-Authorization",
413
	"Te",      // canonicalized version of "TE"
414
	"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
415
	"Transfer-Encoding",
416
	"Upgrade",
417

418
	// remove by dragonfly
419
	// "Accept", Accept header should not be removed, issue: https://github.com/dragonflyoss/Dragonfly2/issues/1290
420
	"User-Agent",
421
	"X-Forwarded-For",
422
}
423

424
// delHopHeaders delete hop-by-hop headers.
425
func delHopHeaders(header http.Header) {
426
	for _, h := range hopHeaders {
427
		header.Del(h)
428
	}
429
	// remove correlation with trace header
430
	for _, h := range traceContext.Fields() {
431
		header.Del(h)
432
	}
433
}
434

435
func compositeErrorHTTPResponse(req *http.Request, status int, body string) (*http.Response, error) {
436
	resp := &http.Response{
437
		StatusCode:    status,
438
		Body:          io.NopCloser(bytes.NewBufferString(body)),
439
		ContentLength: int64(len(body)),
440

441
		Proto:      req.Proto,
442
		ProtoMajor: req.ProtoMajor,
443
		ProtoMinor: req.ProtoMinor,
444
	}
445
	return resp, nil
446
}
447

448
func badRequest(req *http.Request, body string) (*http.Response, error) {
449
	return compositeErrorHTTPResponse(req, http.StatusBadRequest, body)
450
}
451

452
func notImplemented(req *http.Request, body string) (*http.Response, error) {
453
	return compositeErrorHTTPResponse(req, http.StatusNotImplemented, body)
454
}
455

456
func requestedRangeNotSatisfiable(req *http.Request, body string) (*http.Response, error) {
457
	return compositeErrorHTTPResponse(req, http.StatusRequestedRangeNotSatisfiable, body)
458
}
459

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.