wal-g

Форк
0
/
mergewriter.go 
75 строк · 2.0 Кб
1
package splitmerge
2

3
import (
4
	"io"
5

6
	"github.com/wal-g/wal-g/utility"
7

8
	"github.com/wal-g/tracelog"
9
)
10

11
// MergeWriter returns list of WriteCloser-s
12
// Then it reads data from each of n=`parts` WriteClosers in blocks of `blockSize` and writes data to `sink` writer.
13
// MergeWriter gets ownership over sink and will close it.
14
func MergeWriter(sink io.WriteCloser, parts int, blockSize int) []io.WriteCloser {
15
	result := make([]io.WriteCloser, 0)
16
	channels := make([]chan []byte, 0)
17
	writeResults := make([]chan writeResult, 0)
18
	sink = &utility.CloseOnce{WriteCloser: sink}
19

20
	for i := 0; i < parts; i++ {
21
		channels = append(channels, make(chan []byte))
22
		writeResults = append(writeResults, make(chan writeResult))
23
		cw := newChannelWriter(channels[i], writeResults[i])
24
		fbsw := newFixedBlockSizeWriter(cw, blockSize)
25
		result = append(result, fbsw)
26
	}
27

28
	// start MergeWriter:
29
	go func() {
30
		defer (func() {
31
			for _, wrch := range writeResults {
32
				close(wrch)
33
			}
34
		})()
35

36
		for {
37
			closed := 0
38
			for i, ch := range channels {
39
				block, ok := <-ch
40
				if !ok {
41
					tracelog.DebugLogger.Printf("MergeWriter. #%d closed", i)
42
					closed++
43
					continue
44
				}
45
				rbytes := len(block)
46
				wbytes, err := sink.Write(block)
47
				writeResults[i] <- writeResult{n: wbytes, err: err}
48
				if wbytes != rbytes {
49
					tracelog.DebugLogger.Printf("%d / %d bytes written due to %v", wbytes, rbytes, err)
50
				}
51
				if err != nil {
52
					tracelog.ErrorLogger.Printf("MergeWriter error: %v", err)
53
					// It is unrecoverable error - close sink. All consequent writes will return error.
54
					// This will ensure that all channels will be gracefully closed
55
					err = sink.Close()
56
					if err != nil {
57
						tracelog.ErrorLogger.Printf("MergeWriter error on sink close: %v", err)
58
					}
59
					continue
60
				}
61
			}
62

63
			if closed == len(channels) {
64
				tracelog.DebugLogger.Printf("MergeWriter: finished")
65
				err := sink.Close()
66
				if err != nil {
67
					tracelog.ErrorLogger.Printf("MergeWriter error on sink close: %v", err)
68
				}
69
				return
70
			}
71
		}
72
	}()
73

74
	return result
75
}
76

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.