-
Notifications
You must be signed in to change notification settings - Fork 0
/
tx.go
211 lines (177 loc) · 4.72 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package anystore
import (
"context"
"strconv"
"sync"
"sync/atomic"
"unsafe"
"github.com/anyproto/any-store/internal/driver"
)
// WriteTx represents a read-write transaction.
type WriteTx interface {
// ReadTx is embedded to provide read-only transaction methods.
ReadTx
// Rollback rolls back the transaction.
// Returns an error if the rollback fails.
Rollback() error
}
// ReadTx represents a read-only transaction.
type ReadTx interface {
// Context returns the context associated with the transaction.
Context() context.Context
// Commit commits the transaction.
// Returns an error if the commit fails.
Commit() error
// Done returns true if the transaction is completed (committed or rolled back).
Done() bool
conn() *driver.Conn
instanceId() string
}
type commonTx struct {
db *db
ctx context.Context
initialCtx context.Context
con *driver.Conn
done atomic.Bool
}
func (tx *commonTx) conn() *driver.Conn {
return tx.con
}
func (tx *commonTx) reset() {
tx.done.Store(false)
}
func (tx *commonTx) instanceId() string {
return tx.db.instanceId
}
var readTxPool = &sync.Pool{
New: func() any {
return &readTx{}
},
}
type readTx struct {
commonTx
}
func (r *readTx) Context() context.Context {
return r.ctx
}
func (r *readTx) Commit() error {
if r.done.CompareAndSwap(false, true) {
defer r.db.cm.ReleaseRead(r.con)
defer readTxPool.Put(r)
return r.con.Commit(context.Background())
}
return nil
}
func (r *readTx) Done() bool {
return r.done.Load()
}
var writeTxPool = &sync.Pool{
New: func() any {
return &writeTx{}
},
}
type writeTx struct {
readTx
}
func (w *writeTx) Context() context.Context {
return w.ctx
}
func (w *writeTx) Rollback() error {
if w.done.CompareAndSwap(false, true) {
defer w.db.cm.ReleaseWrite(w.con)
defer writeTxPool.Put(w)
return w.con.Rollback(context.Background())
}
return nil
}
func (w *writeTx) Commit() error {
if w.done.CompareAndSwap(false, true) {
defer w.db.cm.ReleaseWrite(w.con)
defer writeTxPool.Put(w)
return w.con.Commit(context.Background())
}
return nil
}
var savepointIds atomic.Uint64
var savepointPool = &sync.Pool{
New: func() any {
return &savepointTx{}
},
}
func newSavepointTx(ctx context.Context, wrTx WriteTx) (WriteTx, error) {
tx := savepointPool.Get().(*savepointTx)
tx.reset(wrTx)
if err := tx.conn().Exec(ctx, unsafe.String(unsafe.SliceData(tx.createQuery), len(tx.createQuery)), nil, driver.StmtExecNoResults); err != nil {
return nil, err
}
return tx, nil
}
const (
savepointCreateQuery = "SAVEPOINT sp"
savepointReleaseQuery = "RELEASE SAVEPOINT sp"
savepointRollbackQuery = "ROLLBACK TO SAVEPOINT sp"
)
type savepointTx struct {
WriteTx
id uint64
createQuery []byte
releaseQuery []byte
rollbackQuery []byte
done atomic.Bool
}
func (tx *savepointTx) reset(wtx WriteTx) {
tx.id = savepointIds.Add(1)
tx.WriteTx = wtx
tx.done.Store(false)
if len(tx.createQuery) == 0 {
tx.createQuery = make([]byte, 0, len(savepointCreateQuery)+10)
tx.createQuery = append(tx.createQuery, []byte(savepointCreateQuery)...)
tx.createQuery = strconv.AppendUint(tx.createQuery, tx.id, 10)
} else {
tx.createQuery = strconv.AppendUint(tx.createQuery[:len(savepointCreateQuery)], tx.id, 10)
}
if len(tx.releaseQuery) == 0 {
tx.releaseQuery = make([]byte, 0, len(savepointReleaseQuery)+10)
tx.releaseQuery = append(tx.releaseQuery, []byte(savepointReleaseQuery)...)
tx.releaseQuery = strconv.AppendUint(tx.releaseQuery, tx.id, 10)
} else {
tx.releaseQuery = strconv.AppendUint(tx.releaseQuery[:len(savepointReleaseQuery)], tx.id, 10)
}
if len(tx.rollbackQuery) == 0 {
tx.rollbackQuery = make([]byte, 0, len(savepointRollbackQuery)+10)
tx.rollbackQuery = append(tx.rollbackQuery, []byte(savepointRollbackQuery)...)
tx.rollbackQuery = strconv.AppendUint(tx.rollbackQuery, tx.id, 10)
} else {
tx.rollbackQuery = strconv.AppendUint(tx.rollbackQuery[:len(savepointRollbackQuery)], tx.id, 10)
}
}
func (tx *savepointTx) Commit() error {
if tx.done.CompareAndSwap(false, true) {
if err := tx.conn().Exec(context.TODO(), unsafe.String(unsafe.SliceData(tx.releaseQuery), len(tx.releaseQuery)), nil, driver.StmtExecNoResults); err != nil {
return err
}
savepointPool.Put(tx)
}
return nil
}
func (tx *savepointTx) Rollback() error {
if tx.done.CompareAndSwap(false, true) {
if err := tx.conn().Exec(context.TODO(), unsafe.String(unsafe.SliceData(tx.rollbackQuery), len(tx.rollbackQuery)), nil, driver.StmtExecNoResults); err != nil {
return err
}
savepointPool.Put(tx)
}
return nil
}
func (tx *savepointTx) Done() bool {
return tx.done.Load()
}
type noOpTx struct {
ReadTx
}
func (noOpTx) Commit() error {
return nil
}
func (noOpTx) Rollback() error {
return nil
}