wal-g

Форк
0
/
uploader.go 
220 строк · 6.2 Кб
1
package internal
2

3
import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"path/filepath"
8
	"sync"
9
	"sync/atomic"
10

11
	"github.com/wal-g/wal-g/internal/abool"
12
	"github.com/wal-g/wal-g/internal/statistics"
13

14
	"github.com/wal-g/tracelog"
15
	"github.com/wal-g/wal-g/internal/compression"
16
	"github.com/wal-g/wal-g/internal/ioextensions"
17
	"github.com/wal-g/wal-g/pkg/storages/storage"
18
	"github.com/wal-g/wal-g/utility"
19
)
20

21
var ErrorSizeTrackingDisabled = fmt.Errorf("size tracking disabled by DisableSizeTracking method")
22

23
type Uploader interface {
24
	Upload(ctx context.Context, path string, content io.Reader) error
25
	UploadFile(ctx context.Context, file ioextensions.NamedReader) error
26
	PushStream(ctx context.Context, stream io.Reader) (string, error)
27
	PushStreamToDestination(ctx context.Context, stream io.Reader, dstPath string) error
28
	Compression() compression.Compressor
29
	DisableSizeTracking()
30
	UploadedDataSize() (int64, error)
31
	RawDataSize() (int64, error)
32
	ChangeDirectory(relativePath string)
33
	Folder() storage.Folder
34
	Clone() Uploader
35
	Failed() bool
36
	Finish()
37
}
38

39
// RegularUploader contains fields associated with uploading tarballs.
40
// Multiple tarballs can share one uploader.
41
type RegularUploader struct {
42
	UploadingFolder storage.Folder
43
	Compressor      compression.Compressor
44
	waitGroup       *sync.WaitGroup
45
	failed          *abool.AtomicBool
46
	tarSize         *int64
47
	dataSize        *int64
48
}
49

50
var _ Uploader = &RegularUploader{}
51

52
// SplitStreamUploader - new Uploader implementation that enable us to split upload streams into blocks
53
//
54
//	of blockSize bytes, then puts it in at most `partitions` streams that are compressed and pushed to storage
55
type SplitStreamUploader struct {
56
	Uploader
57
	partitions  int
58
	blockSize   int
59
	maxFileSize int
60
}
61

62
var _ Uploader = &SplitStreamUploader{}
63

64
// UploadObject
65
type UploadObject struct {
66
	Path    string
67
	Content io.Reader
68
}
69

70
func NewRegularUploader(
71
	compressor compression.Compressor,
72
	uploadingLocation storage.Folder,
73
) *RegularUploader {
74
	uploader := &RegularUploader{
75
		UploadingFolder: uploadingLocation,
76
		Compressor:      compressor,
77
		waitGroup:       &sync.WaitGroup{},
78
		tarSize:         new(int64),
79
		dataSize:        new(int64),
80
		failed:          abool.New(),
81
	}
82
	return uploader
83
}
84

85
func NewSplitStreamUploader(
86
	uploader Uploader,
87
	partitions int,
88
	blockSize int,
89
	maxFileSize int,
90
) Uploader {
91
	if partitions <= 1 && maxFileSize == 0 {
92
		// Fallback to old implementation in order to skip unneeded steps:
93
		return uploader
94
	}
95

96
	return &SplitStreamUploader{
97
		Uploader:    uploader,
98
		partitions:  partitions,
99
		blockSize:   blockSize,
100
		maxFileSize: maxFileSize,
101
	}
102
}
103

104
// UploadedDataSize returns 0 and error when SizeTracking disabled (see DisableSizeTracking)
105
func (uploader *RegularUploader) UploadedDataSize() (int64, error) {
106
	if uploader.tarSize == nil {
107
		return 0, ErrorSizeTrackingDisabled
108
	}
109
	return atomic.LoadInt64(uploader.tarSize), nil
110
}
111

112
// RawDataSize returns 0 and error when SizeTracking disabled (see DisableSizeTracking)
113
func (uploader *RegularUploader) RawDataSize() (int64, error) {
114
	if uploader.dataSize == nil {
115
		return 0, ErrorSizeTrackingDisabled
116
	}
117
	return atomic.LoadInt64(uploader.dataSize), nil
118
}
119

120
// Finish waits for all waiting parts to be uploaded. If an error occurs,
121
// prints alert to stderr.
122
func (uploader *RegularUploader) Finish() {
123
	uploader.waitGroup.Wait()
124
	if uploader.failed.IsSet() {
125
		tracelog.ErrorLogger.Printf("WAL-G could not complete upload.\n")
126
	}
127
}
128

129
// Clone creates similar Uploader with new WaitGroup
130
func (uploader *RegularUploader) Clone() Uploader {
131
	return &RegularUploader{
132
		UploadingFolder: uploader.UploadingFolder,
133
		Compressor:      uploader.Compressor,
134
		waitGroup:       &sync.WaitGroup{},
135
		failed:          abool.NewBool(uploader.Failed()),
136
		tarSize:         uploader.tarSize,
137
		dataSize:        uploader.dataSize,
138
	}
139
}
140

141
// TODO : unit tests
142
// UploadFile compresses a file and uploads it.
143
func (uploader *RegularUploader) UploadFile(ctx context.Context, file ioextensions.NamedReader) error {
144
	filename := file.Name()
145

146
	fileReader := file.(io.Reader)
147
	if uploader.dataSize != nil {
148
		fileReader = utility.NewWithSizeReader(fileReader, uploader.dataSize)
149
	}
150
	compressedFile := CompressAndEncrypt(fileReader, uploader.Compressor, ConfigureCrypter())
151
	dstPath := utility.SanitizePath(filepath.Base(filename) + "." + uploader.Compressor.FileExtension())
152

153
	err := uploader.Upload(ctx, dstPath, compressedFile)
154
	tracelog.InfoLogger.Println("FILE PATH:", dstPath)
155
	return err
156
}
157

158
// DisableSizeTracking stops bandwidth tracking
159
func (uploader *RegularUploader) DisableSizeTracking() {
160
	uploader.tarSize = nil
161
	uploader.dataSize = nil
162
}
163

164
// Compression returns configured compressor
165
func (uploader *RegularUploader) Compression() compression.Compressor {
166
	return uploader.Compressor
167
}
168

169
// TODO : unit tests
170
func (uploader *RegularUploader) Upload(ctx context.Context, path string, content io.Reader) error {
171
	uploader.waitGroup.Add(1)
172
	defer uploader.waitGroup.Done()
173

174
	statistics.WalgMetrics.UploadedFilesTotal.Inc()
175
	if uploader.tarSize != nil {
176
		content = utility.NewWithSizeReader(content, uploader.tarSize)
177
	}
178
	err := uploader.UploadingFolder.PutObjectWithContext(ctx, path, content)
179
	if err != nil {
180
		statistics.WalgMetrics.UploadedFilesFailedTotal.Inc()
181
		uploader.failed.Set()
182
		tracelog.ErrorLogger.Printf(tracelog.GetErrorFormatter()+"\n", err)
183
		return err
184
	}
185
	return nil
186
}
187

188
// UploadMultiple uploads multiple objects from the start of the slice,
189
// returning the first error if any. Note that this operation is not atomic
190
// TODO : unit tests / is it used?
191
func (uploader *RegularUploader) UploadMultiple(ctx context.Context, objects []UploadObject) error {
192
	for _, object := range objects {
193
		err := uploader.Upload(ctx, object.Path, object.Content)
194
		if err != nil {
195
			// possibly do a retry here
196
			return err
197
		}
198
	}
199
	return nil
200
}
201

202
func (uploader *RegularUploader) ChangeDirectory(relativePath string) {
203
	uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(relativePath)
204
}
205

206
func (uploader *RegularUploader) Folder() storage.Folder {
207
	return uploader.UploadingFolder
208
}
209

210
func (uploader *RegularUploader) Failed() bool {
211
	return uploader.failed.IsSet()
212
}
213

214
func (uploader *SplitStreamUploader) Clone() Uploader {
215
	return &SplitStreamUploader{
216
		Uploader:   uploader.Uploader.Clone(),
217
		partitions: uploader.partitions,
218
		blockSize:  uploader.blockSize,
219
	}
220
}
221

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.