db-migrator.go
156 строк · 3.4 Кб
1package connection
2
3import (
4"context"
5"database/sql"
6"fmt"
7"strings"
8
9"github.com/pkg/errors"
10)
11
12type ContextKey string
13
14const contextKeyTX ContextKey = "tx"
15
16type Connection struct {
17driver Driver
18dsn string
19db *sql.DB
20ping bool
21}
22
23func New(dsn string) (*Connection, error) {
24switch {
25case strings.HasPrefix(dsn, "clickhouse://"):
26return clickhouse(dsn)
27case strings.HasPrefix(dsn, "postgres://"):
28return postgres(dsn)
29case strings.HasPrefix(dsn, "mysql://"):
30return mysql(dsn)
31default:
32return nil, fmt.Errorf("driver \"%s\" doesn't support", dsn)
33}
34}
35
36// DSN returns the connection string.
37func (c *Connection) DSN() string {
38return c.dsn
39}
40
41// Driver returns the driver name used to connect to the database.
42func (c *Connection) Driver() Driver {
43return c.driver
44}
45
46// Ping checks connection
47func (c *Connection) Ping() error {
48if c.ping {
49return nil
50}
51if err := c.db.Ping(); err != nil {
52return errors.Wrapf(err, "ping %v connection: %v", c.Driver(), c.dsn)
53}
54c.ping = true
55return nil
56}
57
58// QueryContext executes a query that returns rows, typically a SELECT.
59// The args are for any placeholder parameters in the query.
60func (c *Connection) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
61if err := c.Ping(); err != nil {
62return nil, err
63}
64return c.db.QueryContext(ctx, query, args...)
65}
66
67// ExecContext executes a query without returning any rows.
68// The args are for any placeholder parameters in the query.
69func (c *Connection) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
70if err := c.Ping(); err != nil {
71return nil, err
72}
73v := ctx.Value(contextKeyTX)
74if v != nil {
75if tx, ok := v.(*sql.Tx); ok {
76stmt, err := tx.PrepareContext(ctx, query)
77if err != nil {
78return nil, err
79}
80return stmt.ExecContext(ctx, args...)
81}
82}
83return c.db.ExecContext(ctx, query, args...)
84}
85
86// Transaction executes body in func txFn into transaction.
87func (c *Connection) Transaction(ctx context.Context, txFn func(ctx context.Context) error) error {
88if err := c.Ping(); err != nil {
89return err
90}
91if v := ctx.Value(contextKeyTX); v != nil {
92return errors.New("active transaction does not close")
93}
94
95tx, err := c.db.BeginTx(ctx, nil)
96if err != nil {
97return err
98}
99
100ctxWithTX := context.WithValue(ctx, contextKeyTX, tx)
101
102if err := txFn(ctxWithTX); err != nil {
103if err2 := tx.Rollback(); err2 != nil {
104return errors.Wrap(err, err2.Error())
105}
106return err
107}
108
109return tx.Commit()
110}
111
112// clickhouse returns repository with clickhouse configuration.
113func clickhouse(dsn string) (*Connection, error) {
114dsn, err := NormalizeClickhouseDSN(dsn)
115if err != nil {
116return nil, err
117}
118db, err := sql.Open("clickhouse", dsn)
119if err != nil {
120return nil, err
121}
122
123return &Connection{
124driver: DriverClickhouse,
125dsn: dsn,
126db: db,
127}, nil
128}
129
130// postgres returns repository with postgres configuration.
131func postgres(dsn string) (*Connection, error) {
132db, err := sql.Open(DriverPostgres.String(), dsn)
133if err != nil {
134return nil, errors.Wrap(err, "open postgres connection")
135}
136
137return &Connection{
138driver: DriverPostgres,
139dsn: dsn,
140db: db,
141}, nil
142}
143
144// mysql returns repository with mysql configuration.
145func mysql(dsn string) (*Connection, error) {
146db, err := sql.Open(DriverMySQL.String(), dsn[8:])
147if err != nil {
148return nil, errors.Wrap(err, "open mysql connection")
149}
150
151return &Connection{
152driver: DriverMySQL,
153dsn: dsn,
154db: db,
155}, nil
156}
157