go-transaction-manager

Форк
0
138 строк · 2.8 Кб
1
package sql
2

3
import (
4
	"context"
5
	"database/sql"
6
	"fmt"
7
	"sync/atomic"
8

9
	"go.uber.org/multierr"
10

11
	"github.com/avito-tech/go-transaction-manager/trm/v2"
12
	"github.com/avito-tech/go-transaction-manager/trm/v2/drivers"
13
)
14

15
// Transaction is trm.Transaction for sql.Tx.
16
// trm.NestedTrFactory returns IsActive as true while trm.Transaction is opened.
17
type Transaction struct {
18
	tx        *sql.Tx
19
	savePoint SavePoint
20
	saves     int64
21
	isClosed  *drivers.IsClosed
22
}
23

24
// NewTransaction creates trm.Transaction for sql.Tx.
25
func NewTransaction(
26
	ctx context.Context,
27
	sp SavePoint,
28
	opts *sql.TxOptions,
29
	db *sql.DB,
30
) (context.Context, *Transaction, error) {
31
	tx, err := db.BeginTx(ctx, opts)
32
	if err != nil {
33
		return ctx, nil, err
34
	}
35

36
	tr := &Transaction{
37
		tx:        tx,
38
		savePoint: sp,
39
		saves:     0,
40
		isClosed:  drivers.NewIsClosed(),
41
	}
42

43
	go tr.awaitDone(ctx)
44

45
	return ctx, tr, nil
46
}
47

48
func (t *Transaction) awaitDone(ctx context.Context) {
49
	if ctx.Done() == nil {
50
		return
51
	}
52

53
	select {
54
	case <-ctx.Done():
55
		t.isClosed.Close()
56
	case <-t.isClosed.Closed():
57
	}
58
}
59

60
// Transaction returns the real transaction sqlx.Tx.
61
func (t *Transaction) Transaction() interface{} {
62
	return t.tx
63
}
64

65
// Begin nested transaction by save point.
66
func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Context, trm.Transaction, error) {
67
	_, err := t.tx.ExecContext(ctx, t.savePoint.Create(t.incrementID()))
68
	if err != nil {
69
		// decrement save point ID after error
70
		t.decrementID()
71

72
		return ctx, nil, err
73
	}
74

75
	return ctx, t, nil
76
}
77

78
// Commit the trm.Transaction.
79
func (t *Transaction) Commit(ctx context.Context) error {
80
	if t.hasSavePoint() {
81
		_, err := t.tx.ExecContext(ctx, t.savePoint.Release(t.decrementID()))
82
		if err != nil {
83
			return multierr.Combine(trm.ErrNestedCommit, err)
84
		}
85

86
		return nil
87
	}
88

89
	defer t.isClosed.Close()
90

91
	return t.tx.Commit()
92
}
93

94
// Rollback the trm.Transaction.
95
func (t *Transaction) Rollback(ctx context.Context) error {
96
	if t.hasSavePoint() {
97
		_, err := t.tx.ExecContext(ctx, t.savePoint.Rollback(t.decrementID()))
98
		if err != nil {
99
			return multierr.Combine(trm.ErrNestedRollback, err)
100
		}
101

102
		return nil
103
	}
104

105
	defer t.isClosed.Close()
106

107
	return t.tx.Rollback()
108
}
109

110
// IsActive returns true if the transaction started but not committed or rolled back.
111
func (t *Transaction) IsActive() bool {
112
	return t.isClosed.IsActive()
113
}
114

115
// Closed returns a channel that's closed when transaction committed or rolled back.
116
func (t *Transaction) Closed() <-chan struct{} {
117
	return t.isClosed.Closed()
118
}
119

120
func (t *Transaction) hasSavePoint() bool {
121
	return atomic.LoadInt64(&t.saves) > 0
122
}
123

124
func (t *Transaction) incrementID() string {
125
	atomic.AddInt64(&t.saves, 1)
126

127
	return t.id()
128
}
129

130
func (t *Transaction) decrementID() string {
131
	defer atomic.AddInt64(&t.saves, -1)
132

133
	return t.id()
134
}
135

136
func (t *Transaction) id() string {
137
	return fmt.Sprintf("tx_%d", atomic.LoadInt64(&t.saves))
138
}
139

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.