11
"github.com/wal-g/wal-g/internal/abool"
12
"github.com/wal-g/wal-g/internal/statistics"
14
"github.com/wal-g/tracelog"
15
"github.com/wal-g/wal-g/internal/compression"
16
"github.com/wal-g/wal-g/internal/ioextensions"
17
"github.com/wal-g/wal-g/pkg/storages/storage"
18
"github.com/wal-g/wal-g/utility"
21
var ErrorSizeTrackingDisabled = fmt.Errorf("size tracking disabled by DisableSizeTracking method")
23
type Uploader interface {
24
Upload(ctx context.Context, path string, content io.Reader) error
25
UploadFile(ctx context.Context, file ioextensions.NamedReader) error
26
PushStream(ctx context.Context, stream io.Reader) (string, error)
27
PushStreamToDestination(ctx context.Context, stream io.Reader, dstPath string) error
28
Compression() compression.Compressor
30
UploadedDataSize() (int64, error)
31
RawDataSize() (int64, error)
32
ChangeDirectory(relativePath string)
33
Folder() storage.Folder
41
type RegularUploader struct {
42
UploadingFolder storage.Folder
43
Compressor compression.Compressor
44
waitGroup *sync.WaitGroup
45
failed *abool.AtomicBool
50
var _ Uploader = &RegularUploader{}
55
type SplitStreamUploader struct {
62
var _ Uploader = &SplitStreamUploader{}
65
type UploadObject struct {
70
func NewRegularUploader(
71
compressor compression.Compressor,
72
uploadingLocation storage.Folder,
74
uploader := &RegularUploader{
75
UploadingFolder: uploadingLocation,
76
Compressor: compressor,
77
waitGroup: &sync.WaitGroup{},
85
func NewSplitStreamUploader(
91
if partitions <= 1 && maxFileSize == 0 {
96
return &SplitStreamUploader{
98
partitions: partitions,
100
maxFileSize: maxFileSize,
105
func (uploader *RegularUploader) UploadedDataSize() (int64, error) {
106
if uploader.tarSize == nil {
107
return 0, ErrorSizeTrackingDisabled
109
return atomic.LoadInt64(uploader.tarSize), nil
113
func (uploader *RegularUploader) RawDataSize() (int64, error) {
114
if uploader.dataSize == nil {
115
return 0, ErrorSizeTrackingDisabled
117
return atomic.LoadInt64(uploader.dataSize), nil
122
func (uploader *RegularUploader) Finish() {
123
uploader.waitGroup.Wait()
124
if uploader.failed.IsSet() {
125
tracelog.ErrorLogger.Printf("WAL-G could not complete upload.\n")
130
func (uploader *RegularUploader) Clone() Uploader {
131
return &RegularUploader{
132
UploadingFolder: uploader.UploadingFolder,
133
Compressor: uploader.Compressor,
134
waitGroup: &sync.WaitGroup{},
135
failed: abool.NewBool(uploader.Failed()),
136
tarSize: uploader.tarSize,
137
dataSize: uploader.dataSize,
143
func (uploader *RegularUploader) UploadFile(ctx context.Context, file ioextensions.NamedReader) error {
144
filename := file.Name()
146
fileReader := file.(io.Reader)
147
if uploader.dataSize != nil {
148
fileReader = utility.NewWithSizeReader(fileReader, uploader.dataSize)
150
compressedFile := CompressAndEncrypt(fileReader, uploader.Compressor, ConfigureCrypter())
151
dstPath := utility.SanitizePath(filepath.Base(filename) + "." + uploader.Compressor.FileExtension())
153
err := uploader.Upload(ctx, dstPath, compressedFile)
154
tracelog.InfoLogger.Println("FILE PATH:", dstPath)
159
func (uploader *RegularUploader) DisableSizeTracking() {
160
uploader.tarSize = nil
161
uploader.dataSize = nil
165
func (uploader *RegularUploader) Compression() compression.Compressor {
166
return uploader.Compressor
170
func (uploader *RegularUploader) Upload(ctx context.Context, path string, content io.Reader) error {
171
uploader.waitGroup.Add(1)
172
defer uploader.waitGroup.Done()
174
statistics.WalgMetrics.UploadedFilesTotal.Inc()
175
if uploader.tarSize != nil {
176
content = utility.NewWithSizeReader(content, uploader.tarSize)
178
err := uploader.UploadingFolder.PutObjectWithContext(ctx, path, content)
180
statistics.WalgMetrics.UploadedFilesFailedTotal.Inc()
181
uploader.failed.Set()
182
tracelog.ErrorLogger.Printf(tracelog.GetErrorFormatter()+"\n", err)
191
func (uploader *RegularUploader) UploadMultiple(ctx context.Context, objects []UploadObject) error {
192
for _, object := range objects {
193
err := uploader.Upload(ctx, object.Path, object.Content)
202
func (uploader *RegularUploader) ChangeDirectory(relativePath string) {
203
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(relativePath)
206
func (uploader *RegularUploader) Folder() storage.Folder {
207
return uploader.UploadingFolder
210
func (uploader *RegularUploader) Failed() bool {
211
return uploader.failed.IsSet()
214
func (uploader *SplitStreamUploader) Clone() Uploader {
215
return &SplitStreamUploader{
216
Uploader: uploader.Uploader.Clone(),
217
partitions: uploader.partitions,
218
blockSize: uploader.blockSize,