-
Notifications
You must be signed in to change notification settings - Fork 1
/
db.go
131 lines (121 loc) · 3.48 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package dbtools
import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"github.com/arsham/retry/v2"
"github.com/jackc/pgx/v5"
)
// PGX is a concurrent-safe object that can retry a transaction on a
// pgxpool.Pool connection until it succeeds.
//
// Transaction method will try the provided functions one-by-one until all of
// them return nil, then commits the transaction. If any of the functions
// return any error other than a *retry.StopError, it will retry the
// transaction until the retry count is exhausted. If a running function
// returns a *retry.StopError, the transaction will be rolled-back and stops
// retrying. Tryouts will be stopped when the passed contexts are cancelled.
//
// If all attempts return errors, the last error is returned. If a
// *retry.StopError is returned, transaction is rolled back and the Err inside
// the *retry.StopError is returned. There will be delays between tries defined
// by the retry.DelayMethod and Delay duration.
//
// Any panic in functions will be wrapped in an error and will be counted as an
// error.
type PGX struct {
pool Pool
loop retry.Retry
gracePeriod time.Duration
}
// NewPGX returns an error if conn is nil. It sets the retry attempts to 1 if
// the value is less than 1. The retry strategy can be set either by providing
// a retry.Retry method or the individual components. See the ConfigFunc
// helpers.
func NewPGX(conn Pool, conf ...ConfigFunc) (*PGX, error) {
if conn == nil {
return nil, ErrEmptyDatabase
}
t := &PGX{
pool: conn,
gracePeriod: 30 * time.Second,
loop: retry.Retry{
Attempts: 1,
Delay: 300 * time.Millisecond,
Method: retry.IncrementalDelay,
},
}
for _, fn := range conf {
fn(t)
}
if t.loop.Attempts < 1 {
t.loop.Attempts = 1
}
return t, nil
}
// Transaction returns an error if the connection is not set, or can't begin
// the transaction, or the after all retries, at least one of the fns returns
// an error, or the context is deadlined.
//
// It will wrap the commit/rollback methods if there are any. If in the last
// try any of the fns panics, it puts the stack trace of the panic in the error
// and returns.
//
// It stops retrying if any of the errors are wrapped in a *retry.StopError.
func (p *PGX) Transaction(ctx context.Context, fns ...func(pgx.Tx) error) error {
if p.pool == nil {
return ErrEmptyDatabase
}
return p.loop.Do(func() error {
tx, err := p.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("starting transaction: %w", err)
}
for _, fn := range fns {
select {
case <-ctx.Done():
err := p.rollbackWithErr(tx, ctx.Err())
return &retry.StopError{Err: err}
default:
}
var err error
func() {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case error:
err = e
default:
err = fmt.Errorf("%v", e)
}
err = fmt.Errorf("%w: %w\n%s", errPanic, err, debug.Stack())
}
}()
err = fn(tx)
}()
if err == nil {
continue
}
if errors.Is(err, context.Canceled) {
err = &retry.StopError{Err: err}
}
return p.rollbackWithErr(tx, err)
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("committing transaction: %w", err)
}
return nil
})
}
func (p *PGX) rollbackWithErr(tx pgx.Tx, err error) error {
ctx, cancel := context.WithTimeout(context.Background(), p.gracePeriod)
defer cancel()
er := tx.Rollback(ctx)
if er != nil {
er = fmt.Errorf("rolling back transaction: %w", er)
}
return errors.Join(er, err)
}