Dragonfly2
458 строк · 13.3 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package transport
18
19import (
20"bytes"
21"context"
22"crypto/tls"
23"fmt"
24"io"
25"math"
26"net"
27"net/http"
28"net/http/httputil"
29"regexp"
30"strconv"
31"strings"
32"time"
33
34"github.com/go-http-utils/headers"
35"go.opentelemetry.io/otel"
36"go.opentelemetry.io/otel/propagation"
37"go.opentelemetry.io/otel/trace"
38"google.golang.org/grpc/status"
39
40commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
41errordetailsv1 "d7y.io/api/v2/pkg/apis/errordetails/v1"
42
43"d7y.io/dragonfly/v2/client/config"
44"d7y.io/dragonfly/v2/client/daemon/metrics"
45"d7y.io/dragonfly/v2/client/daemon/peer"
46logger "d7y.io/dragonfly/v2/internal/dflog"
47nethttp "d7y.io/dragonfly/v2/pkg/net/http"
48)
49
50var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
51
52var (
53// layerReg the regex to determine if it is an image download
54layerReg = regexp.MustCompile("^.+/blobs/sha256.*$")
55traceContext = propagation.TraceContext{}
56)
57
58// transport implements RoundTripper for dragonfly.
59// It uses http.fileTransport to serve requests that need to use dragonfly,
60// and uses http.Transport to serve the other requests.
61type transport struct {
62// baseRoundTripper is an implementation of RoundTripper that supports HTTP
63baseRoundTripper http.RoundTripper
64
65// shouldUseDragonfly is used to determine to download resources with or without dragonfly
66shouldUseDragonfly func(req *http.Request) bool
67
68// peerTaskManager is the peer task manager
69peerTaskManager peer.TaskManager
70
71// defaultFilter is used when http request without X-Dragonfly-Filter Header
72defaultFilter string
73
74// defaultTag is used when http request without X-Dragonfly-Tag Header
75defaultTag string
76
77// defaultTag is used when http request without X-Dragonfly-Tag Header
78defaultApplication string
79
80// defaultPriority is used when http request without X-Dragonfly-Priority Header
81defaultPriority commonv1.Priority
82
83// dumpHTTPContent indicates to dump http request header and response header
84dumpHTTPContent bool
85
86peerIDGenerator peer.IDGenerator
87}
88
89// Option is functional config for transport.
90type Option func(rt *transport) *transport
91
92// WithPeerIDGenerator sets the peerIDGenerator for transport
93func WithPeerIDGenerator(peerIDGenerator peer.IDGenerator) Option {
94return func(rt *transport) *transport {
95rt.peerIDGenerator = peerIDGenerator
96return rt
97}
98}
99
100// WithPeerTaskManager sets the peerTaskManager for transport
101func WithPeerTaskManager(peerTaskManager peer.TaskManager) Option {
102return func(rt *transport) *transport {
103rt.peerTaskManager = peerTaskManager
104return rt
105}
106}
107
108// WithTLS configures TLS config used for http transport.
109func WithTLS(cfg *tls.Config) Option {
110return func(rt *transport) *transport {
111rt.baseRoundTripper = defaultHTTPTransport(cfg)
112return rt
113}
114}
115
116// WithCondition configures how to decide whether to use dragonfly or not.
117func WithCondition(c func(r *http.Request) bool) Option {
118return func(rt *transport) *transport {
119rt.shouldUseDragonfly = c
120return rt
121}
122}
123
124// WithDefaultFilter sets default filter for http requests with X-Dragonfly-Filter Header
125func WithDefaultFilter(f string) Option {
126return func(rt *transport) *transport {
127rt.defaultFilter = f
128return rt
129}
130}
131
132// WithDefaultTag sets default tag for http requests with X-Dragonfly-Tag Header
133func WithDefaultTag(b string) Option {
134return func(rt *transport) *transport {
135rt.defaultTag = b
136return rt
137}
138}
139
140// WithDefaultApplication sets default Application for http requests with X-Dragonfly-Application Header
141func WithDefaultApplication(b string) Option {
142return func(rt *transport) *transport {
143rt.defaultApplication = b
144return rt
145}
146}
147
148// WithDefaultPriority sets default Priority for http requests with X-Dragonfly-Priority Header
149func WithDefaultPriority(p commonv1.Priority) Option {
150return func(rt *transport) *transport {
151rt.defaultPriority = p
152return rt
153}
154
155}
156
157func WithDumpHTTPContent(b bool) Option {
158return func(rt *transport) *transport {
159rt.dumpHTTPContent = b
160return rt
161}
162}
163
164var tracer trace.Tracer
165
166func init() {
167tracer = otel.Tracer("dfget-transport")
168}
169
170// New constructs a new instance of a RoundTripper with additional options.
171func New(options ...Option) (http.RoundTripper, error) {
172rt := &transport{
173baseRoundTripper: defaultHTTPTransport(nil),
174shouldUseDragonfly: NeedUseDragonfly,
175}
176
177for _, opt := range options {
178opt(rt)
179}
180
181return rt, nil
182}
183
184// RoundTrip only process first redirect at present
185func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
186if rt.shouldUseDragonfly(req) {
187// delete the Accept-Encoding header to avoid returning the same cached
188// result for different requests
189req.Header.Del("Accept-Encoding")
190
191ctx := req.Context()
192if req.URL.Scheme == "https" {
193// for https, the trace info is in request header
194ctx = traceContext.Extract(req.Context(), propagation.HeaderCarrier(req.Header))
195}
196
197logger.Debugf("round trip with dragonfly: %s", req.URL.String())
198metrics.ProxyRequestViaDragonflyCount.Add(1)
199resp, err = rt.download(ctx, req)
200} else {
201logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
202req.Host = req.URL.Host
203req.Header.Set("Host", req.Host)
204metrics.ProxyRequestNotViaDragonflyCount.Add(1)
205resp, err = rt.baseRoundTripper.RoundTrip(req)
206}
207
208if err != nil {
209logger.With("method", req.Method, "url", req.URL.String()).Errorf("round trip error: %s", err)
210return resp, err
211}
212
213if resp.ContentLength > 0 {
214metrics.ProxyRequestBytesCount.WithLabelValues(req.Method).Add(float64(resp.ContentLength))
215}
216
217rt.processDumpHTTPContent(req, resp)
218return resp, err
219}
220
221// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
222// images layers with dragonfly.
223func NeedUseDragonfly(req *http.Request) bool {
224return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path)
225}
226
227// download uses dragonfly to download.
228// the ctx has span info from transport, did not use the ctx from request
229func (rt *transport) download(ctx context.Context, req *http.Request) (*http.Response, error) {
230url := req.URL.String()
231peerID := rt.peerIDGenerator.PeerID()
232
233ctx, span := tracer.Start(ctx, config.SpanTransport, trace.WithSpanKind(trace.SpanKindClient))
234defer span.End()
235logKV := []any{
236"peer", peerID, "component", "transport",
237}
238if span.SpanContext().TraceID().IsValid() {
239logKV = append(logKV, "trace", span.SpanContext().TraceID().String())
240}
241log := logger.With(logKV...)
242
243log.Infof("start download with url: %s", url)
244log.Debugf("request url: %s, with header: %#v", url, req.Header)
245
246// Init meta value
247meta := &commonv1.UrlMeta{Header: map[string]string{}}
248var rg *nethttp.Range
249
250// Set meta range's value
251if rangeHeader := req.Header.Get("Range"); len(rangeHeader) > 0 {
252rgs, err := nethttp.ParseRange(rangeHeader, math.MaxInt64)
253if err != nil {
254span.RecordError(err)
255return badRequest(req, err.Error())
256}
257if len(rgs) > 1 {
258// TODO support multiple range request
259return notImplemented(req, "multiple range is not supported")
260} else if len(rgs) == 0 {
261return requestedRangeNotSatisfiable(req, "zero range is not supported")
262}
263rg = &rgs[0]
264// range in dragonfly is without "bytes="
265meta.Range = strings.TrimPrefix(rangeHeader, "bytes=")
266}
267
268// Pick header's parameters
269filter := nethttp.PickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter)
270tag := nethttp.PickHeader(req.Header, config.HeaderDragonflyTag, rt.defaultTag)
271application := nethttp.PickHeader(req.Header, config.HeaderDragonflyApplication, rt.defaultApplication)
272var priority = rt.defaultPriority
273priorityString := nethttp.PickHeader(req.Header, config.HeaderDragonflyPriority, fmt.Sprintf("%d", rt.defaultPriority))
274priorityInt, err := strconv.ParseInt(priorityString, 10, 32)
275if err == nil {
276priority = commonv1.Priority(priorityInt)
277}
278
279// Delete hop-by-hop headers
280delHopHeaders(req.Header)
281
282meta.Header = nethttp.HeaderToMap(req.Header)
283meta.Tag = tag
284meta.Filter = filter
285meta.Application = application
286meta.Priority = priority
287
288body, attr, err := rt.peerTaskManager.StartStreamTask(
289ctx,
290&peer.StreamTaskRequest{
291URL: url,
292URLMeta: meta,
293Range: rg,
294PeerID: peerID,
295},
296)
297if err != nil {
298log.Errorf("start stream task error: %v", err)
299// check underlay status code
300if st, ok := status.FromError(err); ok {
301for _, detail := range st.Details() {
302switch d := detail.(type) {
303case *errordetailsv1.SourceError:
304hdr := nethttp.MapToHeader(attr)
305for k, v := range d.Metadata.Header {
306hdr.Set(k, v)
307}
308resp := &http.Response{
309StatusCode: int(d.Metadata.StatusCode),
310Body: io.NopCloser(bytes.NewBufferString(d.Metadata.Status)),
311Header: hdr,
312Proto: req.Proto,
313ProtoMajor: req.ProtoMajor,
314ProtoMinor: req.ProtoMinor,
315}
316log.Errorf("underlay response code: %d", d.Metadata.StatusCode)
317return resp, nil
318}
319}
320}
321// add more info for debugging
322if attr != nil {
323err = fmt.Errorf("task: %s\npeer: %s\nerror: %s",
324attr[config.HeaderDragonflyTask], attr[config.HeaderDragonflyPeer], err)
325}
326return nil, err
327}
328
329hdr := nethttp.MapToHeader(attr)
330log.Infof("download stream attribute: %v", hdr)
331
332var contentLength int64 = -1
333if l, ok := attr[headers.ContentLength]; ok {
334if i, e := strconv.ParseInt(l, 10, 64); e == nil {
335contentLength = i
336}
337}
338
339var status int
340if meta.Range == "" {
341status = http.StatusOK
342} else {
343status = http.StatusPartialContent
344if hdr.Get(headers.ContentRange) == "" && contentLength > 0 {
345value := fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+contentLength-1, rg.Start+contentLength)
346hdr.Set(headers.ContentRange, value)
347}
348}
349resp := &http.Response{
350StatusCode: status,
351Body: body,
352Header: hdr,
353ContentLength: contentLength,
354
355Proto: req.Proto,
356ProtoMajor: req.ProtoMajor,
357ProtoMinor: req.ProtoMinor,
358}
359return resp, nil
360}
361
362func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) {
363if !rt.dumpHTTPContent {
364return
365}
366if out, e := httputil.DumpRequest(req, false); e == nil {
367logger.Debugf("dump request in transport: %s", string(out))
368} else {
369logger.Errorf("dump request in transport error: %s", e)
370}
371if resp == nil {
372return
373}
374if out, e := httputil.DumpResponse(resp, false); e == nil {
375logger.Debugf("dump response in transport: %s", string(out))
376} else {
377logger.Errorf("dump response in transport error: %s", e)
378}
379}
380
381func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
382if cfg == nil {
383cfg = &tls.Config{InsecureSkipVerify: true}
384}
385
386return &http.Transport{
387DialContext: (&net.Dialer{
388Timeout: 10 * time.Second,
389KeepAlive: 30 * time.Second,
390}).DialContext,
391MaxIdleConns: 100,
392IdleConnTimeout: 90 * time.Second,
393TLSHandshakeTimeout: 10 * time.Second,
394ExpectContinueTimeout: 1 * time.Second,
395TLSClientConfig: cfg,
396}
397}
398
399// Hop-by-hop headers. These are removed when sent to the backend.
400// As of RFC 7230, hop-by-hop headers are required to appear in the
401// Connection header field. These are the headers defined by the
402// obsoleted RFC 2616 (section 13.5.1) and are used for backward
403// compatibility.
404// copy from net/http/httputil/reverseproxy.go
405
406// dragonfly need generate task id with header, need to remove some other headers
407var hopHeaders = []string{
408"Connection",
409"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
410"Keep-Alive",
411"Proxy-Authenticate",
412"Proxy-Authorization",
413"Te", // canonicalized version of "TE"
414"Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
415"Transfer-Encoding",
416"Upgrade",
417
418// remove by dragonfly
419// "Accept", Accept header should not be removed, issue: https://github.com/dragonflyoss/Dragonfly2/issues/1290
420"User-Agent",
421"X-Forwarded-For",
422}
423
424// delHopHeaders delete hop-by-hop headers.
425func delHopHeaders(header http.Header) {
426for _, h := range hopHeaders {
427header.Del(h)
428}
429// remove correlation with trace header
430for _, h := range traceContext.Fields() {
431header.Del(h)
432}
433}
434
435func compositeErrorHTTPResponse(req *http.Request, status int, body string) (*http.Response, error) {
436resp := &http.Response{
437StatusCode: status,
438Body: io.NopCloser(bytes.NewBufferString(body)),
439ContentLength: int64(len(body)),
440
441Proto: req.Proto,
442ProtoMajor: req.ProtoMajor,
443ProtoMinor: req.ProtoMinor,
444}
445return resp, nil
446}
447
448func badRequest(req *http.Request, body string) (*http.Response, error) {
449return compositeErrorHTTPResponse(req, http.StatusBadRequest, body)
450}
451
452func notImplemented(req *http.Request, body string) (*http.Response, error) {
453return compositeErrorHTTPResponse(req, http.StatusNotImplemented, body)
454}
455
456func requestedRangeNotSatisfiable(req *http.Request, body string) (*http.Response, error) {
457return compositeErrorHTTPResponse(req, http.StatusRequestedRangeNotSatisfiable, body)
458}
459