go-transaction-manager
158 строк · 3.2 Кб
1// Package gorm is an implementation of trm.Transaction interface by Transaction for *gorm.DB.
2package gorm3
4import (5"context"6"database/sql"7"errors"8"sync"9
10"gorm.io/gorm"11
12"github.com/avito-tech/go-transaction-manager/trm/v2"13"github.com/avito-tech/go-transaction-manager/trm/v2/drivers"14)
15
16// Transaction is trm.Transaction for sqlx.Tx.
17type Transaction struct {18tx *gorm.DB19txMutex sync.Mutex20isClosed *drivers.IsClosed21isClosedClosure *drivers.IsClosed22}
23
24// NewTransaction creates trm.Transaction for sqlx.Tx.
25func NewTransaction(26ctx context.Context,27opts *sql.TxOptions,28db *gorm.DB,29) (context.Context, *Transaction, error) {30t := &Transaction{31tx: nil,32txMutex: sync.Mutex{},33isClosed: drivers.NewIsClosed(),34isClosedClosure: drivers.NewIsClosed(),35}36
37var err error38
39wg := sync.WaitGroup{}40wg.Add(1)41
42go func() {43db = db.WithContext(ctx)44// Used closure to avoid implementing nested transactions.45err = db.Transaction(func(tx *gorm.DB) error {46t.tx = tx47
48wg.Done()49
50<-t.isClosedClosure.Closed()51
52return t.isClosedClosure.Err()53}, opts)54
55t.txMutex.Lock()56defer t.txMutex.Unlock()57tx := t.tx58
59if tx != nil {60// Return error from transaction rollback61// Error from commit returns from db.Transaction closure62if errors.Is(err, drivers.ErrRollbackTr) &&63tx.Error != nil {64err = t.tx.Error65}66
67t.isClosed.CloseWithCause(err)68} else {69wg.Done()70}71}()72
73wg.Wait()74
75if err != nil {76return ctx, nil, err77}78
79go t.awaitDone(ctx)80
81return ctx, t, nil82}
83
84func (t *Transaction) awaitDone(ctx context.Context) {85if ctx.Done() == nil {86return87}88
89select {90case <-ctx.Done():91// Rollback will be called by context.Err()92t.isClosedClosure.Close()93case <-t.isClosed.Closed():94}95}
96
97// Transaction returns the real transaction sqlx.Tx.
98// trm.NestedTrFactory returns IsActive as true while trm.Transaction is opened.
99func (t *Transaction) Transaction() interface{} {100return t.tx101}
102
103// Begin nested transaction by save point.
104func (t *Transaction) Begin(ctx context.Context, s trm.Settings) (context.Context, trm.Transaction, error) {105t.txMutex.Lock()106defer t.txMutex.Unlock()107
108return NewDefaultFactory(t.tx)(ctx, s)109}
110
111// Commit closes the trm.Transaction.
112func (t *Transaction) Commit(_ context.Context) error {113select {114case <-t.isClosed.Closed():115t.txMutex.Lock()116defer t.txMutex.Unlock()117
118return t.tx.Commit().Error119default:120t.isClosedClosure.Close()121
122<-t.isClosed.Closed()123
124return t.isClosed.Err()125}126}
127
128// Rollback the trm.Transaction.
129func (t *Transaction) Rollback(_ context.Context) error {130select {131case <-t.isClosed.Closed():132t.txMutex.Lock()133defer t.txMutex.Unlock()134
135return t.tx.Rollback().Error136default:137t.isClosedClosure.CloseWithCause(drivers.ErrRollbackTr)138
139<-t.isClosed.Closed()140
141err := t.isClosed.Err()142if errors.Is(err, drivers.ErrRollbackTr) {143return nil144}145
146return err147}148}
149
150// IsActive returns true if the transaction started but not committed or rolled back.
151func (t *Transaction) IsActive() bool {152return t.isClosed.IsActive()153}
154
155// Closed returns a channel that's closed when transaction committed or rolled back.
156func (t *Transaction) Closed() <-chan struct{} {157return t.isClosed.Closed()158}
159