Dragonfly2

Форк
0
386 строк · 9.7 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package dfget
18

19
import (
20
	"context"
21
	"errors"
22
	"fmt"
23
	"io"
24
	"net/url"
25
	"os"
26
	"path"
27
	"path/filepath"
28
	"regexp"
29
	"strings"
30
	"time"
31

32
	"github.com/gammazero/deque"
33
	"github.com/go-http-utils/headers"
34
	"github.com/schollz/progressbar/v3"
35

36
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
37
	dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1"
38

39
	"d7y.io/dragonfly/v2/client/config"
40
	logger "d7y.io/dragonfly/v2/internal/dflog"
41
	"d7y.io/dragonfly/v2/pkg/digest"
42
	dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
43
	"d7y.io/dragonfly/v2/pkg/source"
44
	pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
45
)
46

47
func Download(cfg *config.DfgetConfig, client dfdaemonclient.V1) error {
48
	var (
49
		ctx       = context.Background()
50
		cancel    context.CancelFunc
51
		wLog      = logger.With("url", cfg.URL)
52
		downError error
53
	)
54

55
	wLog.Info("init success and start to download")
56
	fmt.Println("init success and start to download")
57

58
	if cfg.Timeout > 0 {
59
		ctx, cancel = context.WithTimeout(ctx, cfg.Timeout)
60
	} else {
61
		ctx, cancel = context.WithCancel(ctx)
62
	}
63

64
	go func() {
65
		downError = download(ctx, client, cfg, wLog)
66
		cancel()
67
	}()
68

69
	<-ctx.Done()
70

71
	if ctx.Err() == context.DeadlineExceeded {
72
		return fmt.Errorf("download timeout(%s)", cfg.Timeout)
73
	}
74
	return downError
75
}
76

77
func download(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
78
	if cfg.Recursive {
79
		return recursiveDownload(ctx, client, cfg)
80
	}
81
	return singleDownload(ctx, client, cfg, wLog)
82
}
83

84
func singleDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig, wLog *logger.SugaredLoggerOnWith) error {
85
	hdr := parseHeader(cfg.Header)
86

87
	if client == nil {
88
		return downloadFromSource(ctx, cfg, hdr)
89
	}
90

91
	var (
92
		start     = time.Now()
93
		stream    dfdaemonv1.Daemon_DownloadClient
94
		result    *dfdaemonv1.DownResult
95
		pb        *progressbar.ProgressBar
96
		request   = newDownRequest(cfg, hdr)
97
		downError error
98
	)
99

100
	if stream, downError = client.Download(ctx, request); downError != nil {
101
		goto processError
102
	}
103

104
	if cfg.ShowProgress {
105
		pb = newProgressBar(-1)
106
	}
107

108
	for {
109
		if result, downError = stream.Recv(); downError != nil {
110
			break
111
		}
112

113
		if result.CompletedLength > 0 && pb != nil {
114
			_ = pb.Set64(int64(result.CompletedLength))
115
		}
116

117
		// success
118
		if result.Done {
119
			if pb != nil {
120
				pb.Describe("Downloaded")
121
				_ = pb.Close()
122
			}
123

124
			wLog.Infof("download from daemon success, length: %d bytes, cost: %d ms", result.CompletedLength, time.Since(start).Milliseconds())
125
			fmt.Printf("finish total length %d bytes\n", result.CompletedLength)
126

127
			break
128
		}
129
	}
130

131
processError:
132
	if downError != nil && !cfg.KeepOriginalOffset {
133
		wLog.Warnf("daemon downloads file error: %v", downError)
134
		fmt.Printf("daemon downloads file error: %v\n", downError)
135
		downError = downloadFromSource(ctx, cfg, hdr)
136
	}
137

138
	return downError
139
}
140

141
func downloadFromSource(ctx context.Context, cfg *config.DfgetConfig, hdr map[string]string) (err error) {
142
	if cfg.DisableBackSource {
143
		return errors.New("try to download from source but back source is disabled")
144
	}
145

146
	var (
147
		wLog     = logger.With("url", cfg.URL)
148
		start    = time.Now()
149
		tempFile *os.File
150
		response *source.Response
151
		written  int64
152
		renameOK bool
153
	)
154

155
	wLog.Info("try to download from source and ignore rate limit")
156
	fmt.Println("try to download from source and ignore rate limit")
157

158
	if tempFile, err = os.CreateTemp(filepath.Dir(cfg.Output), ".df_"); err != nil {
159
		return err
160
	}
161
	defer func() {
162
		if !renameOK {
163
			tempPath := path.Join(filepath.Dir(cfg.Output), tempFile.Name())
164
			removeErr := os.Remove(tempPath)
165
			if removeErr != nil {
166
				wLog.Infof("remove temporary file %s error: %s", tempPath, removeErr)
167
				fmt.Printf("remove temporary file %s error: %s\n", tempPath, removeErr)
168
			}
169
		}
170
		if cerr := tempFile.Close(); cerr != nil {
171
			err = errors.Join(err, cerr)
172
		}
173
	}()
174

175
	downloadRequest, err := source.NewRequestWithContext(ctx, cfg.URL, hdr)
176
	if err != nil {
177
		return err
178
	}
179
	if response, err = source.Download(downloadRequest); err != nil {
180
		return err
181
	}
182
	defer func() {
183
		if cerr := response.Body.Close(); cerr != nil {
184
			err = errors.Join(err, cerr)
185
		}
186
	}()
187
	if err = response.Validate(); err != nil {
188
		return err
189
	}
190

191
	if written, err = io.Copy(tempFile, response.Body); err != nil {
192
		return err
193
	}
194

195
	if !pkgstrings.IsBlank(cfg.Digest) {
196
		d, err := digest.Parse(cfg.Digest)
197
		if err != nil {
198
			return err
199
		}
200

201
		encoded, err := digest.HashFile(tempFile.Name(), d.Algorithm)
202
		if err != nil {
203
			return err
204
		}
205

206
		if encoded != "" && encoded != d.Encoded {
207
			return fmt.Errorf("%s digest is not matched: real[%s] expected[%s]", d.Algorithm, encoded, d.Encoded)
208
		}
209
	}
210

211
	// change file owner
212
	if err = os.Chown(tempFile.Name(), os.Getuid(), os.Getgid()); err != nil {
213
		return fmt.Errorf("change file owner to uid[%d] gid[%d]: %w", os.Getuid(), os.Getgid(), err)
214
	}
215

216
	if err = os.Rename(tempFile.Name(), cfg.Output); err != nil {
217
		return err
218
	}
219
	renameOK = true
220

221
	wLog.Infof("download from source success, length: %d bytes, cost: %d ms", written, time.Since(start).Milliseconds())
222
	fmt.Printf("finish total length %d bytes\n", written)
223

224
	return nil
225
}
226

227
func parseHeader(s []string) map[string]string {
228
	hdr := make(map[string]string)
229
	var key, value string
230
	for _, h := range s {
231
		idx := strings.Index(h, ":")
232
		if idx > 0 {
233
			key = strings.TrimSpace(h[:idx])
234
			value = strings.TrimSpace(h[idx+1:])
235
			hdr[key] = value
236
		}
237
	}
238

239
	return hdr
240
}
241

242
func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemonv1.DownRequest {
243
	var rg string
244
	if r, ok := hdr[headers.Range]; ok {
245
		rg = strings.TrimPrefix(r, "bytes=")
246
	} else {
247
		rg = cfg.Range
248
	}
249
	request := &dfdaemonv1.DownRequest{
250
		Url:               cfg.URL,
251
		Output:            cfg.Output,
252
		Timeout:           uint64(cfg.Timeout),
253
		Limit:             float64(cfg.RateLimit.Limit),
254
		DisableBackSource: cfg.DisableBackSource,
255
		UrlMeta: &commonv1.UrlMeta{
256
			Digest:      cfg.Digest,
257
			Tag:         cfg.Tag,
258
			Range:       rg,
259
			Filter:      cfg.Filter,
260
			Header:      hdr,
261
			Application: cfg.Application,
262
			Priority:    commonv1.Priority(cfg.Priority),
263
		},
264
		Uid:                int64(os.Getuid()),
265
		Gid:                int64(os.Getgid()),
266
		KeepOriginalOffset: cfg.KeepOriginalOffset,
267
	}
268

269
	_url, err := url.Parse(cfg.URL)
270
	if err == nil {
271
		director, ok := source.HasDirector(_url.Scheme)
272
		if ok {
273
			err = director.Direct(_url, request.UrlMeta)
274
			if err == nil {
275
				// write back new url
276
				request.Url = _url.String()
277
			} else {
278
				logger.Errorf("direct resource error: %s", err)
279
			}
280
		}
281
	}
282

283
	return request
284
}
285

286
func newProgressBar(max int64) *progressbar.ProgressBar {
287
	return progressbar.DefaultBytes(-1, "Downloading")
288
}
289

290
func accept(u string, accept, reject string) bool {
291
	if !acceptRegex(u, accept) {
292
		logger.Debugf("url %s not accept by regex: %s", u, accept)
293
		return false
294
	}
295
	if rejectRegex(u, reject) {
296
		logger.Debugf("url %s rejected by regex: %s", u, reject)
297
		return false
298
	}
299
	return true
300
}
301

302
func acceptRegex(u string, accept string) bool {
303
	if accept == "" {
304
		return true
305
	}
306
	return regexp.MustCompile(accept).Match([]byte(u))
307
}
308

309
func rejectRegex(u string, reject string) bool {
310
	if reject == "" {
311
		return false
312
	}
313
	return regexp.MustCompile(reject).Match([]byte(u))
314
}
315

316
// recursiveDownload breadth-first download all resources
317
func recursiveDownload(ctx context.Context, client dfdaemonclient.V1, cfg *config.DfgetConfig) error {
318
	// if recursive level is 0, skip recursive level check
319
	var skipLevel bool
320
	if cfg.RecursiveLevel == 0 {
321
		skipLevel = true
322
	}
323
	var queue deque.Deque[*config.DfgetConfig]
324
	queue.PushBack(cfg)
325
	downloadMap := map[url.URL]struct{}{}
326
	for {
327
		if queue.Len() == 0 {
328
			break
329
		}
330
		parentCfg := queue.PopFront()
331
		if !skipLevel {
332
			if parentCfg.RecursiveLevel == 0 {
333
				logger.Infof("%s recursive level reached, skip", parentCfg.URL)
334
				continue
335
			}
336
			parentCfg.RecursiveLevel--
337
		}
338
		request, err := source.NewRequestWithContext(ctx, parentCfg.URL, parseHeader(parentCfg.Header))
339
		if err != nil {
340
			return err
341
		}
342
		// prevent loop downloading
343
		if _, exist := downloadMap[*request.URL]; exist {
344
			continue
345
		}
346
		downloadMap[*request.URL] = struct{}{}
347

348
		urlEntries, err := source.List(request)
349
		if err != nil {
350
			logger.Errorf("url [%v] source lister error: %v", request.URL, err)
351
		}
352
		for _, urlEntry := range urlEntries {
353
			childCfg := *parentCfg //create new cfg
354
			childCfg.Output = path.Join(parentCfg.Output, urlEntry.Name)
355
			fmt.Printf("%s\n", strings.TrimPrefix(childCfg.Output, cfg.Output))
356
			u := urlEntry.URL
357
			childCfg.URL = u.String()
358

359
			if !accept(childCfg.URL, childCfg.RecursiveAcceptRegex, childCfg.RecursiveRejectRegex) {
360
				logger.Infof("url %s is not accepted, skip", childCfg.URL)
361
				continue
362
			}
363

364
			if urlEntry.IsDir {
365
				logger.Infof("download directory %s to %s", childCfg.URL, childCfg.Output)
366
				queue.PushBack(&childCfg)
367
				continue
368
			}
369

370
			if childCfg.RecursiveList {
371
				continue
372
			}
373
			childCfg.Recursive = false
374
			// validate new dfget config
375
			if err = childCfg.Validate(); err != nil {
376
				logger.Errorf("validate failed: %s", err)
377
				return err
378
			}
379
			logger.Infof("download file %s to %s", childCfg.URL, childCfg.Output)
380
			if err = singleDownload(ctx, client, &childCfg, logger.With("url", childCfg.URL)); err != nil {
381
				return err
382
			}
383
		}
384
	}
385
	return nil
386
}
387

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.