go-transaction-manager

Форк
0
101 строка · 2.0 Кб
1
package pgxv4
2

3
import (
4
	"context"
5
	"sync"
6

7
	"github.com/jackc/pgx/v4"
8

9
	"github.com/avito-tech/go-transaction-manager/trm/v2"
10
	"github.com/avito-tech/go-transaction-manager/trm/v2/drivers"
11
)
12

13
// Transaction is trm.Transaction for pgx.Tx.
14
type Transaction struct {
15
	mu       sync.Mutex
16
	tx       pgx.Tx
17
	isClosed *drivers.IsClosed
18
}
19

20
func newDefaultTransaction(tx pgx.Tx) *Transaction {
21
	return &Transaction{
22
		mu:       sync.Mutex{},
23
		tx:       tx,
24
		isClosed: drivers.NewIsClosed(),
25
	}
26
}
27

28
// NewTransaction creates trm.Transaction for pgx.Tx.
29
func NewTransaction(
30
	ctx context.Context,
31
	opts pgx.TxOptions,
32
	db Transactional,
33
) (context.Context, *Transaction, error) {
34
	tx, err := db.BeginTx(ctx, opts)
35
	if err != nil {
36
		return ctx, nil, err
37
	}
38

39
	tr := newDefaultTransaction(tx)
40

41
	go tr.awaitDone(ctx)
42

43
	return ctx, tr, nil
44
}
45

46
func (t *Transaction) awaitDone(ctx context.Context) {
47
	if ctx.Done() == nil {
48
		return
49
	}
50

51
	select {
52
	case <-ctx.Done():
53
		_ = t.Rollback(ctx)
54
	case <-t.isClosed.Closed():
55
	}
56
}
57

58
// Transaction returns the real transaction pgx.Tx.
59
func (t *Transaction) Transaction() interface{} {
60
	return t.tx
61
}
62

63
// Begin nested transaction by save point.
64
func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Context, trm.Transaction, error) {
65
	tx, err := t.tx.Begin(ctx)
66
	if err != nil {
67
		return ctx, nil, err
68
	}
69

70
	tr := newDefaultTransaction(tx)
71

72
	return ctx, tr, nil
73
}
74

75
// Commit the trm.Transaction.
76
func (t *Transaction) Commit(ctx context.Context) error {
77
	t.mu.Lock()
78
	defer t.mu.Unlock()
79
	defer t.isClosed.Close()
80

81
	return t.tx.Commit(ctx)
82
}
83

84
// Rollback the trm.Transaction.
85
func (t *Transaction) Rollback(ctx context.Context) error {
86
	t.mu.Lock()
87
	defer t.mu.Unlock()
88
	defer t.isClosed.Close()
89

90
	return t.tx.Rollback(ctx)
91
}
92

93
// IsActive returns true if the transaction started but not committed or rolled back.
94
func (t *Transaction) IsActive() bool {
95
	return t.isClosed.IsActive()
96
}
97

98
// Closed returns a channel that's closed when transaction committed or rolled back.
99
func (t *Transaction) Closed() <-chan struct{} {
100
	return t.isClosed.Closed()
101
}
102

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.