go-transaction-manager
101 строка · 2.0 Кб
1package pgxv42
3import (4"context"5"sync"6
7"github.com/jackc/pgx/v4"8
9"github.com/avito-tech/go-transaction-manager/trm/v2"10"github.com/avito-tech/go-transaction-manager/trm/v2/drivers"11)
12
13// Transaction is trm.Transaction for pgx.Tx.
14type Transaction struct {15mu sync.Mutex16tx pgx.Tx17isClosed *drivers.IsClosed18}
19
20func newDefaultTransaction(tx pgx.Tx) *Transaction {21return &Transaction{22mu: sync.Mutex{},23tx: tx,24isClosed: drivers.NewIsClosed(),25}26}
27
28// NewTransaction creates trm.Transaction for pgx.Tx.
29func NewTransaction(30ctx context.Context,31opts pgx.TxOptions,32db Transactional,33) (context.Context, *Transaction, error) {34tx, err := db.BeginTx(ctx, opts)35if err != nil {36return ctx, nil, err37}38
39tr := newDefaultTransaction(tx)40
41go tr.awaitDone(ctx)42
43return ctx, tr, nil44}
45
46func (t *Transaction) awaitDone(ctx context.Context) {47if ctx.Done() == nil {48return49}50
51select {52case <-ctx.Done():53_ = t.Rollback(ctx)54case <-t.isClosed.Closed():55}56}
57
58// Transaction returns the real transaction pgx.Tx.
59func (t *Transaction) Transaction() interface{} {60return t.tx61}
62
63// Begin nested transaction by save point.
64func (t *Transaction) Begin(ctx context.Context, _ trm.Settings) (context.Context, trm.Transaction, error) {65tx, err := t.tx.Begin(ctx)66if err != nil {67return ctx, nil, err68}69
70tr := newDefaultTransaction(tx)71
72return ctx, tr, nil73}
74
75// Commit the trm.Transaction.
76func (t *Transaction) Commit(ctx context.Context) error {77t.mu.Lock()78defer t.mu.Unlock()79defer t.isClosed.Close()80
81return t.tx.Commit(ctx)82}
83
84// Rollback the trm.Transaction.
85func (t *Transaction) Rollback(ctx context.Context) error {86t.mu.Lock()87defer t.mu.Unlock()88defer t.isClosed.Close()89
90return t.tx.Rollback(ctx)91}
92
93// IsActive returns true if the transaction started but not committed or rolled back.
94func (t *Transaction) IsActive() bool {95return t.isClosed.IsActive()96}
97
98// Closed returns a channel that's closed when transaction committed or rolled back.
99func (t *Transaction) Closed() <-chan struct{} {100return t.isClosed.Closed()101}
102