Dragonfly2
386 строк · 9.7 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package dfget
18
19import (
20"context"
21"errors"
22"fmt"
23"io"
24"net/url"
25"os"
26"path"
27"path/filepath"
28"regexp"
29"strings"
30"time"
31
32"github.com/gammazero/deque"
33"github.com/go-http-utils/headers"
34"github.com/schollz/progressbar/v3"
35
36commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
37dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
38
39"d7y.io/dragonfly/v2/client/config"
40logger "d7y.io/dragonfly/v2/internal/dflog"
41"d7y.io/dragonfly/v2/pkg/digest"
42dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
43"d7y.io/dragonfly/v2/pkg/source"
44pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
45)
46
47func Download(cfg *config.DfgetConfig, client dfdaemonclient.V1) error {
48var (
49ctx = context.Background()
50cancel context.CancelFunc
51wLog = logger.With("url", cfg.URL)
52downError error
53)
54
55wLog.Info("init success and start to download")
56fmt.Println("init success and start to download")
57
58if cfg.Timeout > 0 {
59ctx, cancel = context.WithTimeout(ctx, cfg.Timeout)
60} else {
61ctx, cancel = context.WithCancel(ctx)
62}
63
64go func() {
65downError = download(ctx, client, cfg, wLog)
66cancel()
67}()
68
69<-ctx.Done()
70
71if ctx.Err() == context.DeadlineExceeded {
72return fmt.Errorf("download timeout(%s)", cfg.Timeout)
73}
74return downError
75}
76
77func download(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
78if cfg.Recursive {
79return recursiveDownload(ctx, client, cfg)
80}
81return singleDownload(ctx, client, cfg, wLog)
82}
83
84func singleDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
85hdr := parseHeader(cfg.Header)
86
87if client == nil {
88return downloadFromSource(ctx, cfg, hdr)
89}
90
91var (
92start = time.Now()
93stream dfdaemonv1.Daemon_DownloadClient
94result *dfdaemonv1.DownResult
95pb *progressbar.ProgressBar
96request = newDownRequest(cfg, hdr)
97downError error
98)
99
100if stream, downError = client.Download(ctx, request); downError != nil {
101goto processError
102}
103
104if cfg.ShowProgress {
105pb = newProgressBar(-1)
106}
107
108for {
109if result, downError = stream.Recv(); downError != nil {
110break
111}
112
113if result.CompletedLength > 0 && pb != nil {
114_ = pb.Set64(int64(result.CompletedLength))
115}
116
117// success
118if result.Done {
119if pb != nil {
120pb.Describe("Downloaded")
121_ = pb.Close()
122}
123
124wLog.Infof("download from daemon success, length: %d bytes, cost: %d ms", result.CompletedLength, time.Since(start).Milliseconds())
125fmt.Printf("finish total length %d bytes\n", result.CompletedLength)
126
127break
128}
129}
130
131processError:
132if downError != nil && !cfg.KeepOriginalOffset {
133wLog.Warnf("daemon downloads file error: %v", downError)
134fmt.Printf("daemon downloads file error: %v\n", downError)
135downError = downloadFromSource(ctx, cfg, hdr)
136}
137
138return downError
139}
140
141func downloadFromSource(ctx context.Context, cfg *config.DfgetConfig, hdr map[string]string) (err error) {
142if cfg.DisableBackSource {
143return errors.New("try to download from source but back source is disabled")
144}
145
146var (
147wLog = logger.With("url", cfg.URL)
148start = time.Now()
149tempFile *os.File
150response *source.Response
151written int64
152renameOK bool
153)
154
155wLog.Info("try to download from source and ignore rate limit")
156fmt.Println("try to download from source and ignore rate limit")
157
158if tempFile, err = os.CreateTemp(filepath.Dir(cfg.Output), ".df_"); err != nil {
159return err
160}
161defer func() {
162if !renameOK {
163tempPath := path.Join(filepath.Dir(cfg.Output), tempFile.Name())
164removeErr := os.Remove(tempPath)
165if removeErr != nil {
166wLog.Infof("remove temporary file %s error: %s", tempPath, removeErr)
167fmt.Printf("remove temporary file %s error: %s\n", tempPath, removeErr)
168}
169}
170if cerr := tempFile.Close(); cerr != nil {
171err = errors.Join(err, cerr)
172}
173}()
174
175downloadRequest, err := source.NewRequestWithContext(ctx, cfg.URL, hdr)
176if err != nil {
177return err
178}
179if response, err = source.Download(downloadRequest); err != nil {
180return err
181}
182defer func() {
183if cerr := response.Body.Close(); cerr != nil {
184err = errors.Join(err, cerr)
185}
186}()
187if err = response.Validate(); err != nil {
188return err
189}
190
191if written, err = io.Copy(tempFile, response.Body); err != nil {
192return err
193}
194
195if !pkgstrings.IsBlank(cfg.Digest) {
196d, err := digest.Parse(cfg.Digest)
197if err != nil {
198return err
199}
200
201encoded, err := digest.HashFile(tempFile.Name(), d.Algorithm)
202if err != nil {
203return err
204}
205
206if encoded != "" && encoded != d.Encoded {
207return fmt.Errorf("%s digest is not matched: real[%s] expected[%s]", d.Algorithm, encoded, d.Encoded)
208}
209}
210
211// change file owner
212if err = os.Chown(tempFile.Name(), os.Getuid(), os.Getgid()); err != nil {
213return fmt.Errorf("change file owner to uid[%d] gid[%d]: %w", os.Getuid(), os.Getgid(), err)
214}
215
216if err = os.Rename(tempFile.Name(), cfg.Output); err != nil {
217return err
218}
219renameOK = true
220
221wLog.Infof("download from source success, length: %d bytes, cost: %d ms", written, time.Since(start).Milliseconds())
222fmt.Printf("finish total length %d bytes\n", written)
223
224return nil
225}
226
227func parseHeader(s []string) map[string]string {
228hdr := make(map[string]string)
229var key, value string
230for _, h := range s {
231idx := strings.Index(h, ":")
232if idx > 0 {
233key = strings.TrimSpace(h[:idx])
234value = strings.TrimSpace(h[idx+1:])
235hdr[key] = value
236}
237}
238
239return hdr
240}
241
242func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemonv1.DownRequest {
243var rg string
244if r, ok := hdr[headers.Range]; ok {
245rg = strings.TrimPrefix(r, "bytes=")
246} else {
247rg = cfg.Range
248}
249request := &dfdaemonv1.DownRequest{
250Url: cfg.URL,
251Output: cfg.Output,
252Timeout: uint64(cfg.Timeout),
253Limit: float64(cfg.RateLimit.Limit),
254DisableBackSource: cfg.DisableBackSource,
255UrlMeta: &commonv1.UrlMeta{
256Digest: cfg.Digest,
257Tag: cfg.Tag,
258Range: rg,
259Filter: cfg.Filter,
260Header: hdr,
261Application: cfg.Application,
262Priority: commonv1.Priority(cfg.Priority),
263},
264Uid: int64(os.Getuid()),
265Gid: int64(os.Getgid()),
266KeepOriginalOffset: cfg.KeepOriginalOffset,
267}
268
269_url, err := url.Parse(cfg.URL)
270if err == nil {
271director, ok := source.HasDirector(_url.Scheme)
272if ok {
273err = director.Direct(_url, request.UrlMeta)
274if err == nil {
275// write back new url
276request.Url = _url.String()
277} else {
278logger.Errorf("direct resource error: %s", err)
279}
280}
281}
282
283return request
284}
285
286func newProgressBar(max int64) *progressbar.ProgressBar {
287return progressbar.DefaultBytes(-1, "Downloading")
288}
289
290func accept(u string, accept, reject string) bool {
291if !acceptRegex(u, accept) {
292logger.Debugf("url %s not accept by regex: %s", u, accept)
293return false
294}
295if rejectRegex(u, reject) {
296logger.Debugf("url %s rejected by regex: %s", u, reject)
297return false
298}
299return true
300}
301
302func acceptRegex(u string, accept string) bool {
303if accept == "" {
304return true
305}
306return regexp.MustCompile(accept).Match([]byte(u))
307}
308
309func rejectRegex(u string, reject string) bool {
310if reject == "" {
311return false
312}
313return regexp.MustCompile(reject).Match([]byte(u))
314}
315
316// recursiveDownload breadth-first download all resources
317func recursiveDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig) error {
318// if recursive level is 0, skip recursive level check
319var skipLevel bool
320if cfg.RecursiveLevel == 0 {
321skipLevel = true
322}
323var queue deque.Deque[*config.DfgetConfig]
324queue.PushBack(cfg)
325downloadMap := map[url.URL]struct{}{}
326for {
327if queue.Len() == 0 {
328break
329}
330parentCfg := queue.PopFront()
331if !skipLevel {
332if parentCfg.RecursiveLevel == 0 {
333logger.Infof("%s recursive level reached, skip", parentCfg.URL)
334continue
335}
336parentCfg.RecursiveLevel--
337}
338request, err := source.NewRequestWithContext(ctx, parentCfg.URL, parseHeader(parentCfg.Header))
339if err != nil {
340return err
341}
342// prevent loop downloading
343if _, exist := downloadMap[*request.URL]; exist {
344continue
345}
346downloadMap[*request.URL] = struct{}{}
347
348urlEntries, err := source.List(request)
349if err != nil {
350logger.Errorf("url [%v] source lister error: %v", request.URL, err)
351}
352for _, urlEntry := range urlEntries {
353childCfg := *parentCfg //create new cfg
354childCfg.Output = path.Join(parentCfg.Output, urlEntry.Name)
355fmt.Printf("%s\n", strings.TrimPrefix(childCfg.Output, cfg.Output))
356u := urlEntry.URL
357childCfg.URL = u.String()
358
359if !accept(childCfg.URL, childCfg.RecursiveAcceptRegex, childCfg.RecursiveRejectRegex) {
360logger.Infof("url %s is not accepted, skip", childCfg.URL)
361continue
362}
363
364if urlEntry.IsDir {
365logger.Infof("download directory %s to %s", childCfg.URL, childCfg.Output)
366queue.PushBack(&childCfg)
367continue
368}
369
370if childCfg.RecursiveList {
371continue
372}
373childCfg.Recursive = false
374// validate new dfget config
375if err = childCfg.Validate(); err != nil {
376logger.Errorf("validate failed: %s", err)
377return err
378}
379logger.Infof("download file %s to %s", childCfg.URL, childCfg.Output)
380if err = singleDownload(ctx, client, &childCfg, logger.With("url", childCfg.URL)); err != nil {
381return err
382}
383}
384}
385return nil
386}
387