wal-g

Форк
0
/
stream_push_helper.go 
116 строк · 4.0 Кб
1
package internal
2

3
import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"path"
8

9
	"github.com/wal-g/tracelog"
10
	"golang.org/x/sync/errgroup"
11

12
	"github.com/wal-g/wal-g/internal/splitmerge"
13
	"github.com/wal-g/wal-g/utility"
14
)
15

16
const (
17
	StreamPrefix           = "stream_"
18
	StreamBackupNameLength = 23 // len(StreamPrefix) + len(utility.BackupTimeFormat)
19
)
20

21
// TODO : unit tests
22
// PushStream compresses a stream and push it
23
func (uploader *RegularUploader) PushStream(ctx context.Context, stream io.Reader) (string, error) {
24
	backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format(utility.BackupTimeFormat)
25
	dstPath := GetStreamName(backupName, uploader.Compressor.FileExtension())
26
	err := uploader.PushStreamToDestination(ctx, stream, dstPath)
27

28
	return backupName, err
29
}
30

31
// TODO : unit tests
32
// returns backup_prefix
33
// (Note: individual parition names are built by adding '_0000.br' or '_0000_0000.br' suffix)
34
func (uploader *SplitStreamUploader) PushStream(ctx context.Context, stream io.Reader) (string, error) {
35
	backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format(utility.BackupTimeFormat)
36

37
	// Upload Stream:
38
	errGroup, ctx := errgroup.WithContext(ctx)
39
	var readers = splitmerge.SplitReader(ctx, stream, uploader.partitions, uploader.blockSize)
40
	for partNumber := 0; partNumber < uploader.partitions; partNumber++ {
41
		reader := readers[partNumber]
42
		if uploader.maxFileSize != 0 {
43
			currentPartNumber := partNumber
44
			errGroup.Go(func() error {
45
				idx := 0
46
				for {
47
					fileReader := io.LimitReader(reader, int64(uploader.maxFileSize))
48
					read := int64(0)
49
					fileReader = utility.NewWithSizeReader(fileReader, &read)
50

51
					tracelog.DebugLogger.Printf("Get file reader %d of part %d\n", idx, currentPartNumber)
52
					dstPath := GetPartitionedSteamMultipartName(backupName, uploader.Compression().FileExtension(), currentPartNumber, idx)
53
					err := uploader.PushStreamToDestination(ctx, fileReader, dstPath)
54
					if err != nil {
55
						return err
56
					}
57
					if read == 0 {
58
						err = uploader.Folder().DeleteObjects([]string{dstPath})
59
						return err
60
					}
61
					idx++
62
				}
63
			})
64
		} else {
65
			dstPath := GetPartitionedStreamName(backupName, uploader.Compression().FileExtension(), partNumber)
66
			errGroup.Go(func() error {
67
				return uploader.PushStreamToDestination(ctx, reader, dstPath)
68
			})
69
		}
70
	}
71

72
	// Wait for upload finished:
73
	if err := errGroup.Wait(); err != nil {
74
		tracelog.WarningLogger.Printf("Failed to upload part of backup: %v", err)
75
		return backupName, err
76
	}
77

78
	// Upload StreamMetadata
79
	meta := BackupStreamMetadata{
80
		Type:        SplitMergeStreamBackup,
81
		Partitions:  uint(uploader.partitions),
82
		BlockSize:   uint(uploader.blockSize),
83
		Compression: uploader.Compression().FileExtension(),
84
	}
85
	uploaderClone := uploader.Clone()
86
	uploaderClone.DisableSizeTracking() // don't count metadata.json in backup size
87
	err := UploadBackupStreamMetadata(uploader, meta, backupName)
88

89
	return backupName, err
90
}
91

92
// TODO : unit tests
93
// PushStreamToDestination compresses a stream and push it to specifyed destination
94
func (uploader *RegularUploader) PushStreamToDestination(ctx context.Context, stream io.Reader, dstPath string) error {
95
	if uploader.dataSize != nil {
96
		stream = utility.NewWithSizeReader(stream, uploader.dataSize)
97
	}
98
	compressed := CompressAndEncrypt(stream, uploader.Compressor, ConfigureCrypter())
99
	err := uploader.Upload(ctx, dstPath, compressed)
100
	tracelog.InfoLogger.Println("FILE PATH:", dstPath)
101

102
	return err
103
}
104

105
func GetStreamName(backupName string, extension string) string {
106
	return utility.SanitizePath(path.Join(backupName, "stream.")) + extension
107
}
108

109
func GetPartitionedStreamName(backupName string, extension string, partIdx int) string {
110
	return fmt.Sprintf("%s_%04d.%s", utility.SanitizePath(path.Join(backupName, "part")), partIdx, extension)
111
}
112

113
func GetPartitionedSteamMultipartName(backupName string, extension string, partIdx int, fileNumber int) string {
114
	return fmt.Sprintf("%s_%04d_%04d.%s", utility.SanitizePath(path.Join(backupName, "part")),
115
		partIdx, fileNumber, extension)
116
}
117

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.