9
"github.com/wal-g/tracelog"
10
"golang.org/x/sync/errgroup"
12
"github.com/wal-g/wal-g/internal/splitmerge"
13
"github.com/wal-g/wal-g/utility"
17
StreamPrefix = "stream_"
18
StreamBackupNameLength = 23 // len(StreamPrefix) + len(utility.BackupTimeFormat)
22
// PushStream compresses a stream and push it
23
func (uploader *RegularUploader) PushStream(ctx context.Context, stream io.Reader) (string, error) {
24
backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format(utility.BackupTimeFormat)
25
dstPath := GetStreamName(backupName, uploader.Compressor.FileExtension())
26
err := uploader.PushStreamToDestination(ctx, stream, dstPath)
28
return backupName, err
32
// returns backup_prefix
33
// (Note: individual parition names are built by adding '_0000.br' or '_0000_0000.br' suffix)
34
func (uploader *SplitStreamUploader) PushStream(ctx context.Context, stream io.Reader) (string, error) {
35
backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format(utility.BackupTimeFormat)
38
errGroup, ctx := errgroup.WithContext(ctx)
39
var readers = splitmerge.SplitReader(ctx, stream, uploader.partitions, uploader.blockSize)
40
for partNumber := 0; partNumber < uploader.partitions; partNumber++ {
41
reader := readers[partNumber]
42
if uploader.maxFileSize != 0 {
43
currentPartNumber := partNumber
44
errGroup.Go(func() error {
47
fileReader := io.LimitReader(reader, int64(uploader.maxFileSize))
49
fileReader = utility.NewWithSizeReader(fileReader, &read)
51
tracelog.DebugLogger.Printf("Get file reader %d of part %d\n", idx, currentPartNumber)
52
dstPath := GetPartitionedSteamMultipartName(backupName, uploader.Compression().FileExtension(), currentPartNumber, idx)
53
err := uploader.PushStreamToDestination(ctx, fileReader, dstPath)
58
err = uploader.Folder().DeleteObjects([]string{dstPath})
65
dstPath := GetPartitionedStreamName(backupName, uploader.Compression().FileExtension(), partNumber)
66
errGroup.Go(func() error {
67
return uploader.PushStreamToDestination(ctx, reader, dstPath)
72
// Wait for upload finished:
73
if err := errGroup.Wait(); err != nil {
74
tracelog.WarningLogger.Printf("Failed to upload part of backup: %v", err)
75
return backupName, err
78
// Upload StreamMetadata
79
meta := BackupStreamMetadata{
80
Type: SplitMergeStreamBackup,
81
Partitions: uint(uploader.partitions),
82
BlockSize: uint(uploader.blockSize),
83
Compression: uploader.Compression().FileExtension(),
85
uploaderClone := uploader.Clone()
86
uploaderClone.DisableSizeTracking() // don't count metadata.json in backup size
87
err := UploadBackupStreamMetadata(uploader, meta, backupName)
89
return backupName, err
93
// PushStreamToDestination compresses a stream and push it to specifyed destination
94
func (uploader *RegularUploader) PushStreamToDestination(ctx context.Context, stream io.Reader, dstPath string) error {
95
if uploader.dataSize != nil {
96
stream = utility.NewWithSizeReader(stream, uploader.dataSize)
98
compressed := CompressAndEncrypt(stream, uploader.Compressor, ConfigureCrypter())
99
err := uploader.Upload(ctx, dstPath, compressed)
100
tracelog.InfoLogger.Println("FILE PATH:", dstPath)
105
func GetStreamName(backupName string, extension string) string {
106
return utility.SanitizePath(path.Join(backupName, "stream.")) + extension
109
func GetPartitionedStreamName(backupName string, extension string, partIdx int) string {
110
return fmt.Sprintf("%s_%04d.%s", utility.SanitizePath(path.Join(backupName, "part")), partIdx, extension)
113
func GetPartitionedSteamMultipartName(backupName string, extension string, partIdx int, fileNumber int) string {
114
return fmt.Sprintf("%s_%04d_%04d.%s", utility.SanitizePath(path.Join(backupName, "part")),
115
partIdx, fileNumber, extension)