wal-g

Форк
0
/
extract.go 
277 строк · 7.9 Кб
1
package internal
2

3
import (
4
	"archive/tar"
5
	"context"
6
	"fmt"
7
	"io"
8
	"strings"
9
	"sync"
10
	"time"
11

12
	"github.com/pkg/errors"
13
	"github.com/wal-g/tracelog"
14
	"github.com/wal-g/wal-g/internal/compression"
15
	conf "github.com/wal-g/wal-g/internal/config"
16
	"github.com/wal-g/wal-g/internal/crypto"
17
	"github.com/wal-g/wal-g/utility"
18
	"golang.org/x/sync/semaphore"
19
)
20

21
var MinExtractRetryWait = time.Minute
22
var MaxExtractRetryWait = 5 * time.Minute
23

24
type NoFilesToExtractError struct {
25
	error
26
}
27

28
func newNoFilesToExtractError() NoFilesToExtractError {
29
	return NoFilesToExtractError{errors.New("ExtractAll: did not provide files to extract")}
30
}
31

32
func (err NoFilesToExtractError) Error() string {
33
	return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
34
}
35

36
// UnsupportedFileTypeError is used to signal file types
37
// that are unsupported by WAL-G.
38
type UnsupportedFileTypeError struct {
39
	error
40
}
41

42
func newUnsupportedFileTypeError(path string, fileFormat string) UnsupportedFileTypeError {
43
	return UnsupportedFileTypeError{errors.Errorf("WAL-G does not support the file format '%s' in '%s'", fileFormat, path)}
44
}
45

46
func (err UnsupportedFileTypeError) Error() string {
47
	return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
48
}
49

50
// TarInterpreter behaves differently
51
// for different file types.
52
type TarInterpreter interface {
53
	Interpret(reader io.Reader, header *tar.Header) error
54
}
55

56
type DevNullWriter struct {
57
	io.WriteCloser
58
	statPrinter sync.Once
59
	totalBytes  int64
60
}
61

62
func (e *DevNullWriter) Write(p []byte) (int, error) {
63
	e.statPrinter.Do(func() {
64
		go func() {
65
			for {
66
				time.Sleep(1 * time.Second)
67
				tracelog.ErrorLogger.Printf("/dev/null size %d", e.totalBytes)
68
			}
69
		}()
70
	})
71
	e.totalBytes += int64(len(p))
72
	return len(p), nil
73
}
74

75
var _ io.Writer = &DevNullWriter{}
76

77
// TODO : unit tests
78
// Extract exactly one tar bundle.
79
func extractOneTar(tarInterpreter TarInterpreter, source io.Reader) error {
80
	tarReader := tar.NewReader(source)
81

82
	for {
83
		header, err := tarReader.Next()
84
		if err == io.EOF {
85
			break
86
		}
87
		if err != nil {
88
			return errors.Wrap(err, "extractOne: tar extract failed")
89
		}
90

91
		err = tarInterpreter.Interpret(tarReader, header)
92
		if err != nil {
93
			return errors.Wrap(err, "extractOne: Interpret failed")
94
		}
95
	}
96
	return nil
97
}
98

99
func extractNonTar(tarInterpreter TarInterpreter, source io.Reader, path string, fileType FileType, mode int64) error {
100
	var typeFlag byte
101
	if fileType == RegularFileType {
102
		typeFlag = tar.TypeReg
103
	} else {
104
		typeFlag = tar.TypeDir
105
	}
106
	return tarInterpreter.Interpret(source, &tar.Header{
107
		Name:     path,
108
		Mode:     mode,
109
		Typeflag: typeFlag,
110
	})
111
}
112

113
// DecryptAndDecompressTar decrypts file and checks its extension.
114
// If it's tar, a decompression is not needed.
115
// Otherwise it uses corresponding decompressor. If none found an error will be returned.
116
func DecryptAndDecompressTar(reader io.Reader, filePath string, crypter crypto.Crypter) (io.ReadCloser, error) {
117
	var err error
118

119
	if crypter != nil {
120
		reader, err = crypter.Decrypt(reader)
121
		if err != nil {
122
			return nil, errors.Wrap(err, "DecryptAndDecompressTar: decrypt failed")
123
		}
124
	}
125

126
	fileExtension := utility.GetFileExtension(filePath)
127

128
	if fileExtension == "tar" || fileExtension == "" {
129
		return io.NopCloser(reader), nil
130
	}
131

132
	decompressor := compression.FindDecompressor(fileExtension)
133
	if decompressor == nil {
134
		return nil, newUnsupportedFileTypeError(filePath, fileExtension)
135
	}
136

137
	return decompressor.Decompress(reader)
138
}
139

140
// ExtractAll Handles all files passed in. Supports `.lzo`, `.lz4`, `.lzma`, and `.tar`.
141
// File type `.nop` is used for testing purposes. Each file is extracted
142
// in its own goroutine and ExtractAll will wait for all goroutines to finish.
143
// Retries unsuccessful attempts log2(MaxConcurrency) times, dividing concurrency by two each time.
144
func ExtractAll(tarInterpreter TarInterpreter, files []ReaderMaker) error {
145
	return ExtractAllWithSleeper(tarInterpreter, files, NewExponentialSleeper(MinExtractRetryWait, MaxExtractRetryWait))
146
}
147

148
func ExtractAllWithSleeper(tarInterpreter TarInterpreter, files []ReaderMaker, sleeper Sleeper) error {
149
	if len(files) == 0 {
150
		return newNoFilesToExtractError()
151
	}
152

153
	// Set maximum number of goroutines spun off by ExtractAll
154
	downloadingConcurrency, err := conf.GetMaxDownloadConcurrency()
155
	if err != nil {
156
		return err
157
	}
158
	retries := conf.GetFetchRetries()
159

160
	for currentRun := files; len(currentRun) > 0; {
161
		failed := tryExtractFiles(currentRun, tarInterpreter, downloadingConcurrency)
162
		if downloadingConcurrency > 1 {
163
			downloadingConcurrency /= 2
164
		} else if len(failed) == len(currentRun) && retries <= 0 {
165
			return errors.Errorf("failed to extract files:\n%s\n",
166
				strings.Join(readerMakersToFilePaths(failed), "\n"))
167
		}
168
		retries--
169
		currentRun = failed
170
		if len(failed) > 0 {
171
			tracelog.WarningLogger.Printf("%d files failed to download: %s. Going to sleep and retry downloading them.\n",
172
				len(failed), readerMakersToFilePaths(failed))
173
			tracelog.WarningLogger.Printf("retries left: %d", retries)
174
			sleeper.Sleep()
175
		}
176
	}
177

178
	return nil
179
}
180

181
// Extract single file from backup
182
// If it is .tar file unpack it and store internal files (there will be .tar file if you work with wal-g backup)
183
// Otherwise store this file (there will be regular file if you work with pgbackrest backup)
184
func extractFile(tarInterpreter TarInterpreter, extractingReader io.Reader, fileClosure ReaderMaker) error {
185
	switch fileClosure.FileType() {
186
	case TarFileType:
187
		err := extractOneTar(tarInterpreter, extractingReader)
188
		if err == nil {
189
			err = readTrailingZeros(extractingReader)
190
		}
191
		return err
192
	case RegularFileType:
193
		return extractNonTar(tarInterpreter, extractingReader, fileClosure.LocalPath(), fileClosure.FileType(), fileClosure.Mode())
194
	default:
195
		tracelog.InfoLogger.Print()
196
		return errors.New("Unknown fileType " + string(fileClosure.FileType()))
197
	}
198
}
199

200
// TODO : unit tests
201
func tryExtractFiles(files []ReaderMaker,
202
	tarInterpreter TarInterpreter,
203
	downloadingConcurrency int) (failed []ReaderMaker) {
204
	downloadingContext := context.TODO()
205
	downloadingSemaphore := semaphore.NewWeighted(int64(downloadingConcurrency))
206
	crypter := ConfigureCrypter()
207
	isFailed := sync.Map{}
208

209
	for _, file := range files {
210
		err := downloadingSemaphore.Acquire(downloadingContext, 1)
211
		if err != nil {
212
			tracelog.ErrorLogger.Println(err)
213
			return files //Should never happen, but if we are asked to cancel - consider all files unfinished
214
		}
215
		fileClosure := file
216

217
		go func() {
218
			defer downloadingSemaphore.Release(1)
219

220
			readCloser, err := fileClosure.Reader()
221
			if err == nil {
222
				defer utility.LoggedClose(readCloser, "")
223

224
				filePath := fileClosure.StoragePath()
225
				var extractingReader io.ReadCloser
226
				extractingReader, err = DecryptAndDecompressTar(readCloser, filePath, crypter)
227
				if err == nil {
228
					defer extractingReader.Close()
229
					err = extractFile(tarInterpreter, extractingReader, fileClosure)
230
					err = errors.Wrapf(err, "Extraction error in %s", filePath)
231
					tracelog.InfoLogger.Printf("Finished extraction of %s", filePath)
232
				}
233
			}
234

235
			if err != nil {
236
				isFailed.Store(fileClosure, true)
237
				tracelog.ErrorLogger.Println(err)
238
			}
239
		}()
240
	}
241

242
	err := downloadingSemaphore.Acquire(downloadingContext, int64(downloadingConcurrency))
243
	if err != nil {
244
		tracelog.ErrorLogger.Println(err)
245
		return files //Should never happen, but if we are asked to cancel - consider all files unfinished
246
	}
247

248
	isFailed.Range(func(failedFile, _ interface{}) bool {
249
		failed = append(failed, failedFile.(ReaderMaker))
250
		return true
251
	})
252
	return failed
253
}
254

255
func readTrailingZeros(r io.Reader) error {
256
	// on first iteration we read small chunk
257
	// in most cases we will return fast without memory allocation
258
	b := make([]byte, 1024)
259
	for {
260
		n, err := r.Read(b)
261
		if n > 0 {
262
			if !utility.AllZero(b[:n]) {
263
				return io.ErrClosedPipe
264
			}
265
		}
266
		if err != nil {
267
			if err == io.EOF {
268
				return nil
269
			}
270
			return err
271
		}
272
		if len(b) < utility.CompressedBlockMaxSize {
273
			// but if we found zeroes, allocate large buffer to speed up reading
274
			b = make([]byte, utility.CompressedBlockMaxSize)
275
		}
276
	}
277
}
278

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.