Dragonfly2
226 строк · 5.9 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination piece_downloader_mock.go -source piece_downloader.go -package peer
18
19package peer
20
21import (
22"context"
23"crypto/tls"
24"crypto/x509"
25"fmt"
26"io"
27"net"
28"net/http"
29"net/url"
30"time"
31
32"go.opentelemetry.io/otel"
33"go.opentelemetry.io/otel/propagation"
34"google.golang.org/grpc/status"
35
36commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
37
38"d7y.io/dragonfly/v2/client/daemon/storage"
39logger "d7y.io/dragonfly/v2/internal/dflog"
40"d7y.io/dragonfly/v2/pkg/digest"
41"d7y.io/dragonfly/v2/pkg/source"
42)
43
44type DownloadPieceRequest struct {
45piece *commonv1.PieceInfo
46log *logger.SugaredLoggerOnWith
47storage storage.TaskStorageDriver
48TaskID string
49PeerID string
50DstPid string
51DstAddr string
52CalcDigest bool
53}
54
55type DownloadPieceResult struct {
56// Size of piece
57Size int64
58// BeginTime nanosecond
59BeginTime int64
60// FinishTime nanosecond
61FinishTime int64
62DstPeerID string
63Fail bool
64pieceInfo *commonv1.PieceInfo
65}
66
67type PieceDownloader interface {
68DownloadPiece(context.Context, *DownloadPieceRequest) (io.Reader, io.Closer, error)
69}
70
71type PieceDownloaderOption func(*pieceDownloader) error
72
73type pieceDownloader struct {
74scheme string
75httpClient *http.Client
76}
77
78type pieceDownloadError struct {
79connectionError bool
80status string
81statusCode int
82target string
83err error
84}
85
86type backSourceError struct {
87err error
88st *status.Status
89}
90
91func isConnectionError(err error) bool {
92if e, ok := err.(*pieceDownloadError); ok {
93return e.connectionError
94}
95return false
96}
97
98func isPieceNotFound(err error) bool {
99if e, ok := err.(*pieceDownloadError); ok {
100return e.statusCode == http.StatusNotFound
101}
102return false
103}
104
105func isBackSourceError(err error) bool {
106if _, ok := err.(*backSourceError); ok {
107return true
108}
109if _, ok := err.(*source.UnexpectedStatusCodeError); ok {
110return true
111}
112return false
113}
114
115func (e *pieceDownloadError) Error() string {
116if e.connectionError {
117return fmt.Sprintf("connect with %s with error: %s", e.target, e.err)
118}
119return fmt.Sprintf("download %s with error status: %s", e.target, e.status)
120}
121
122func (e *backSourceError) Error() string {
123if e.st != nil {
124return e.st.Err().Error()
125}
126return e.err.Error()
127}
128
129var _ PieceDownloader = (*pieceDownloader)(nil)
130
131var defaultTransport http.RoundTripper = &http.Transport{
132Proxy: http.ProxyFromEnvironment,
133DialContext: (&net.Dialer{
134Timeout: 2 * time.Second,
135KeepAlive: 30 * time.Second,
136DualStack: true,
137}).DialContext,
138MaxIdleConns: 100,
139IdleConnTimeout: 90 * time.Second,
140ResponseHeaderTimeout: 2 * time.Second,
141TLSHandshakeTimeout: 10 * time.Second,
142ExpectContinueTimeout: 2 * time.Second,
143}
144
145func NewPieceDownloader(timeout time.Duration, caCertPool *x509.CertPool) PieceDownloader {
146pd := &pieceDownloader{
147scheme: "http",
148httpClient: &http.Client{
149Transport: defaultTransport,
150Timeout: timeout,
151},
152}
153
154if caCertPool != nil {
155pd.scheme = "https"
156defaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{
157ClientCAs: caCertPool,
158RootCAs: caCertPool,
159}
160}
161
162return pd
163}
164
165func (p *pieceDownloader) DownloadPiece(ctx context.Context, req *DownloadPieceRequest) (io.Reader, io.Closer, error) {
166httpRequest, err := p.buildDownloadPieceHTTPRequest(ctx, req)
167if err != nil {
168return nil, nil, err
169}
170resp, err := p.httpClient.Do(httpRequest)
171if err != nil {
172logger.Errorf("task id: %s, piece num: %d, dst: %s, download piece failed: %s",
173req.TaskID, req.piece.PieceNum, req.DstAddr, err)
174return nil, nil, &pieceDownloadError{
175target: httpRequest.URL.String(),
176err: err,
177connectionError: true,
178}
179}
180if resp.StatusCode > 299 {
181_, _ = io.Copy(io.Discard, resp.Body)
182_ = resp.Body.Close()
183return nil, nil, &pieceDownloadError{
184target: httpRequest.URL.String(),
185err: err,
186connectionError: false,
187status: resp.Status,
188statusCode: resp.StatusCode,
189}
190}
191reader, closer := resp.Body.(io.Reader), resp.Body.(io.Closer)
192if req.CalcDigest {
193req.log.Debugf("calculate digest for piece %d, digest: %s", req.piece.PieceNum, req.piece.PieceMd5)
194reader, err = digest.NewReader(digest.AlgorithmMD5, io.LimitReader(resp.Body, int64(req.piece.RangeSize)), digest.WithEncoded(req.piece.PieceMd5), digest.WithLogger(req.log))
195if err != nil {
196_ = closer.Close()
197req.log.Errorf("init digest reader error: %s", err.Error())
198return nil, nil, err
199}
200}
201return reader, closer, nil
202}
203
204func (p *pieceDownloader) buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) (*http.Request, error) {
205if len(d.TaskID) <= 3 {
206return nil, fmt.Errorf("invalid task id")
207}
208// FIXME switch to https when tls enabled
209targetURL := url.URL{
210Scheme: p.scheme,
211Host: d.DstAddr,
212Path: fmt.Sprintf("download/%s/%s", d.TaskID[:3], d.TaskID),
213RawQuery: fmt.Sprintf("peerId=%s", d.DstPid),
214}
215
216logger.Debugf("built request url: %s", targetURL.String())
217req, _ := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil)
218
219// TODO use string.Builder
220req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",
221d.piece.RangeStart, d.piece.RangeStart+uint64(d.piece.RangeSize)-1))
222
223// inject trace id into request header
224otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
225return req, nil
226}
227