wal-g

Форк
0
/
splitmerge_test.go 
106 строк · 2.4 Кб
1
package splitmerge
2

3
import (
4
	"bytes"
5
	"context"
6
	"fmt"
7
	"io"
8
	"math/rand"
9
	"testing"
10

11
	"github.com/stretchr/testify/assert"
12
	"golang.org/x/sync/errgroup"
13

14
	"github.com/wal-g/wal-g/internal/abool"
15
)
16

17
type BufferCloser struct {
18
	bytes.Buffer
19
	io.Closer
20
	closed abool.AtomicBool
21
}
22

23
func (b *BufferCloser) Write(p []byte) (n int, err error) {
24
	if b.closed.IsSet() {
25
		return 0, io.ErrClosedPipe
26
	}
27
	return b.Buffer.Write(p)
28
}
29

30
func (b *BufferCloser) Close() error {
31
	if !b.closed.SetToIf(false, true) {
32
		// The behavior of Close after the first call is undefined.
33
		// Specific implementations may document their own behavior.
34
		panic("UB")
35
	}
36
	return nil
37
}
38

39
//	┌─> copy data per 1 byte    ─>┐
40
//
41
// data ─> split ├─> copy data per ... bytes ─>├─> merge
42
//
43
//	└─> copy data per 42 bytes  ─>┘
44
func TestSplitMerge(t *testing.T) {
45
	const blockSize = 128
46
	const dataSize = 115249 // some prime number
47
	bufSizes := []int{1, blockSize + 1, blockSize - 1, 2*blockSize + 1, 4, 8, 15, 16, 23, 42}
48
	partitions := len(bufSizes)
49

50
	// in:
51
	inputData := generateDataset(dataSize)
52
	dataReader := bytes.NewReader(inputData)
53
	readers := SplitReader(context.Background(), dataReader, partitions, blockSize)
54

55
	// out:
56
	var sink BufferCloser
57
	writers := MergeWriter(&sink, partitions, blockSize)
58

59
	errGroup := new(errgroup.Group)
60
	for i := 0; i < partitions; i++ {
61
		// idx := i
62
		reader := readers[i]
63
		writer := writers[i]
64
		buffSize := bufSizes[i%len(bufSizes)]
65

66
		errGroup.Go(func() error {
67
			defer writer.Close()
68
			// read _all_ data first and only then send it to MergeWriter:
69
			allData, err := io.ReadAll(reader)
70
			if err != nil {
71
				return err
72
			}
73

74
			offset := 0
75
			for {
76
				data := make([]byte, buffSize, buffSize)
77
				rbytes := copy(data, allData[offset:])
78
				offset += rbytes
79
				// tracelog.InfoLogger.Printf("goroutine #%d: %d bytes fetched, err=%v", idx, rbytes, rerr)
80
				if rbytes == 0 {
81
					return nil
82
				}
83
				_, werr := writer.Write(data[:rbytes])
84
				if werr != nil {
85
					return werr
86
				} else {
87
					// tracelog.InfoLogger.Printf("goroutine #%d: %d bytes copied", idx, rbytes)
88
				}
89
			}
90
		})
91
	}
92

93
	// Wait for upload finished:
94
	assert.NoError(t, errGroup.Wait())
95

96
	fmt.Printf("%d\n", len(inputData))
97
	fmt.Printf("%d\n", sink.Len())
98

99
	assert.ElementsMatch(t, inputData, sink.Bytes())
100
}
101

102
func generateDataset(size int) []byte {
103
	result := make([]byte, size, size)
104
	rand.Read(result)
105
	return result
106
}
107

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.