wal-g
75 строк · 2.0 Кб
1package splitmerge
2
3import (
4"io"
5
6"github.com/wal-g/wal-g/utility"
7
8"github.com/wal-g/tracelog"
9)
10
11// MergeWriter returns list of WriteCloser-s
12// Then it reads data from each of n=`parts` WriteClosers in blocks of `blockSize` and writes data to `sink` writer.
13// MergeWriter gets ownership over sink and will close it.
14func MergeWriter(sink io.WriteCloser, parts int, blockSize int) []io.WriteCloser {
15result := make([]io.WriteCloser, 0)
16channels := make([]chan []byte, 0)
17writeResults := make([]chan writeResult, 0)
18sink = &utility.CloseOnce{WriteCloser: sink}
19
20for i := 0; i < parts; i++ {
21channels = append(channels, make(chan []byte))
22writeResults = append(writeResults, make(chan writeResult))
23cw := newChannelWriter(channels[i], writeResults[i])
24fbsw := newFixedBlockSizeWriter(cw, blockSize)
25result = append(result, fbsw)
26}
27
28// start MergeWriter:
29go func() {
30defer (func() {
31for _, wrch := range writeResults {
32close(wrch)
33}
34})()
35
36for {
37closed := 0
38for i, ch := range channels {
39block, ok := <-ch
40if !ok {
41tracelog.DebugLogger.Printf("MergeWriter. #%d closed", i)
42closed++
43continue
44}
45rbytes := len(block)
46wbytes, err := sink.Write(block)
47writeResults[i] <- writeResult{n: wbytes, err: err}
48if wbytes != rbytes {
49tracelog.DebugLogger.Printf("%d / %d bytes written due to %v", wbytes, rbytes, err)
50}
51if err != nil {
52tracelog.ErrorLogger.Printf("MergeWriter error: %v", err)
53// It is unrecoverable error - close sink. All consequent writes will return error.
54// This will ensure that all channels will be gracefully closed
55err = sink.Close()
56if err != nil {
57tracelog.ErrorLogger.Printf("MergeWriter error on sink close: %v", err)
58}
59continue
60}
61}
62
63if closed == len(channels) {
64tracelog.DebugLogger.Printf("MergeWriter: finished")
65err := sink.Close()
66if err != nil {
67tracelog.ErrorLogger.Printf("MergeWriter error on sink close: %v", err)
68}
69return
70}
71}
72}()
73
74return result
75}
76