12
"github.com/pkg/errors"
13
"github.com/wal-g/tracelog"
14
"github.com/wal-g/wal-g/internal/compression"
15
conf "github.com/wal-g/wal-g/internal/config"
16
"github.com/wal-g/wal-g/internal/crypto"
17
"github.com/wal-g/wal-g/utility"
18
"golang.org/x/sync/semaphore"
21
var MinExtractRetryWait = time.Minute
22
var MaxExtractRetryWait = 5 * time.Minute
24
type NoFilesToExtractError struct {
28
func newNoFilesToExtractError() NoFilesToExtractError {
29
return NoFilesToExtractError{errors.New("ExtractAll: did not provide files to extract")}
32
func (err NoFilesToExtractError) Error() string {
33
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
36
// UnsupportedFileTypeError is used to signal file types
37
// that are unsupported by WAL-G.
38
type UnsupportedFileTypeError struct {
42
func newUnsupportedFileTypeError(path string, fileFormat string) UnsupportedFileTypeError {
43
return UnsupportedFileTypeError{errors.Errorf("WAL-G does not support the file format '%s' in '%s'", fileFormat, path)}
46
func (err UnsupportedFileTypeError) Error() string {
47
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
50
// TarInterpreter behaves differently
51
// for different file types.
52
type TarInterpreter interface {
53
Interpret(reader io.Reader, header *tar.Header) error
56
type DevNullWriter struct {
62
func (e *DevNullWriter) Write(p []byte) (int, error) {
63
e.statPrinter.Do(func() {
66
time.Sleep(1 * time.Second)
67
tracelog.ErrorLogger.Printf("/dev/null size %d", e.totalBytes)
71
e.totalBytes += int64(len(p))
75
var _ io.Writer = &DevNullWriter{}
78
// Extract exactly one tar bundle.
79
func extractOneTar(tarInterpreter TarInterpreter, source io.Reader) error {
80
tarReader := tar.NewReader(source)
83
header, err := tarReader.Next()
88
return errors.Wrap(err, "extractOne: tar extract failed")
91
err = tarInterpreter.Interpret(tarReader, header)
93
return errors.Wrap(err, "extractOne: Interpret failed")
99
func extractNonTar(tarInterpreter TarInterpreter, source io.Reader, path string, fileType FileType, mode int64) error {
101
if fileType == RegularFileType {
102
typeFlag = tar.TypeReg
104
typeFlag = tar.TypeDir
106
return tarInterpreter.Interpret(source, &tar.Header{
113
// DecryptAndDecompressTar decrypts file and checks its extension.
114
// If it's tar, a decompression is not needed.
115
// Otherwise it uses corresponding decompressor. If none found an error will be returned.
116
func DecryptAndDecompressTar(reader io.Reader, filePath string, crypter crypto.Crypter) (io.ReadCloser, error) {
120
reader, err = crypter.Decrypt(reader)
122
return nil, errors.Wrap(err, "DecryptAndDecompressTar: decrypt failed")
126
fileExtension := utility.GetFileExtension(filePath)
128
if fileExtension == "tar" || fileExtension == "" {
129
return io.NopCloser(reader), nil
132
decompressor := compression.FindDecompressor(fileExtension)
133
if decompressor == nil {
134
return nil, newUnsupportedFileTypeError(filePath, fileExtension)
137
return decompressor.Decompress(reader)
140
// ExtractAll Handles all files passed in. Supports `.lzo`, `.lz4`, `.lzma`, and `.tar`.
141
// File type `.nop` is used for testing purposes. Each file is extracted
142
// in its own goroutine and ExtractAll will wait for all goroutines to finish.
143
// Retries unsuccessful attempts log2(MaxConcurrency) times, dividing concurrency by two each time.
144
func ExtractAll(tarInterpreter TarInterpreter, files []ReaderMaker) error {
145
return ExtractAllWithSleeper(tarInterpreter, files, NewExponentialSleeper(MinExtractRetryWait, MaxExtractRetryWait))
148
func ExtractAllWithSleeper(tarInterpreter TarInterpreter, files []ReaderMaker, sleeper Sleeper) error {
150
return newNoFilesToExtractError()
153
// Set maximum number of goroutines spun off by ExtractAll
154
downloadingConcurrency, err := conf.GetMaxDownloadConcurrency()
158
retries := conf.GetFetchRetries()
160
for currentRun := files; len(currentRun) > 0; {
161
failed := tryExtractFiles(currentRun, tarInterpreter, downloadingConcurrency)
162
if downloadingConcurrency > 1 {
163
downloadingConcurrency /= 2
164
} else if len(failed) == len(currentRun) && retries <= 0 {
165
return errors.Errorf("failed to extract files:\n%s\n",
166
strings.Join(readerMakersToFilePaths(failed), "\n"))
171
tracelog.WarningLogger.Printf("%d files failed to download: %s. Going to sleep and retry downloading them.\n",
172
len(failed), readerMakersToFilePaths(failed))
173
tracelog.WarningLogger.Printf("retries left: %d", retries)
181
// Extract single file from backup
182
// If it is .tar file unpack it and store internal files (there will be .tar file if you work with wal-g backup)
183
// Otherwise store this file (there will be regular file if you work with pgbackrest backup)
184
func extractFile(tarInterpreter TarInterpreter, extractingReader io.Reader, fileClosure ReaderMaker) error {
185
switch fileClosure.FileType() {
187
err := extractOneTar(tarInterpreter, extractingReader)
189
err = readTrailingZeros(extractingReader)
192
case RegularFileType:
193
return extractNonTar(tarInterpreter, extractingReader, fileClosure.LocalPath(), fileClosure.FileType(), fileClosure.Mode())
195
tracelog.InfoLogger.Print()
196
return errors.New("Unknown fileType " + string(fileClosure.FileType()))
201
func tryExtractFiles(files []ReaderMaker,
202
tarInterpreter TarInterpreter,
203
downloadingConcurrency int) (failed []ReaderMaker) {
204
downloadingContext := context.TODO()
205
downloadingSemaphore := semaphore.NewWeighted(int64(downloadingConcurrency))
206
crypter := ConfigureCrypter()
207
isFailed := sync.Map{}
209
for _, file := range files {
210
err := downloadingSemaphore.Acquire(downloadingContext, 1)
212
tracelog.ErrorLogger.Println(err)
213
return files //Should never happen, but if we are asked to cancel - consider all files unfinished
218
defer downloadingSemaphore.Release(1)
220
readCloser, err := fileClosure.Reader()
222
defer utility.LoggedClose(readCloser, "")
224
filePath := fileClosure.StoragePath()
225
var extractingReader io.ReadCloser
226
extractingReader, err = DecryptAndDecompressTar(readCloser, filePath, crypter)
228
defer extractingReader.Close()
229
err = extractFile(tarInterpreter, extractingReader, fileClosure)
230
err = errors.Wrapf(err, "Extraction error in %s", filePath)
231
tracelog.InfoLogger.Printf("Finished extraction of %s", filePath)
236
isFailed.Store(fileClosure, true)
237
tracelog.ErrorLogger.Println(err)
242
err := downloadingSemaphore.Acquire(downloadingContext, int64(downloadingConcurrency))
244
tracelog.ErrorLogger.Println(err)
245
return files //Should never happen, but if we are asked to cancel - consider all files unfinished
248
isFailed.Range(func(failedFile, _ interface{}) bool {
249
failed = append(failed, failedFile.(ReaderMaker))
255
func readTrailingZeros(r io.Reader) error {
256
// on first iteration we read small chunk
257
// in most cases we will return fast without memory allocation
258
b := make([]byte, 1024)
262
if !utility.AllZero(b[:n]) {
263
return io.ErrClosedPipe
272
if len(b) < utility.CompressedBlockMaxSize {
273
// but if we found zeroes, allocate large buffer to speed up reading
274
b = make([]byte, utility.CompressedBlockMaxSize)