Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support/db: Support concurrent queries in a transaction #1838

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,26 @@ type UpdateBuilder struct {
// utilities such as automatic query logging and transaction management. NOTE:
// A Session is designed to be lightweight and temporarily lived (usually
// request scoped) which is one reason it is acceptable for it to store a
// context. It is not presently intended to cross goroutine boundaries and is
// not concurrency safe.
// context.
// When Session is not in a DB transaction all queries are made in separate
// database connection. If there is no idle connection query will wait until
// there is one.
// When Session is in a DB transaction it is using a single DB connection.
// To support running queries from multiple go routines it implements locking:
// can run arbitrary number of Exec but only a single Get/Select query (it's
// using sync.RWLock internally). This is done because Postgres protocol does
// not allow Exec if all data has not been read from previous Get/Select.
// Keep this in mind when using methods like `Query` that retuns `*sqlx.Rows`
// so there's not possible to implement locking inside Session.
type Session struct {
// DB is the database connection that queries should be executed against.
DB *sqlx.DB

// Ctx is the optional context in which the repo is operating under.
Ctx context.Context

tx *sqlx.Tx
txSingleQueryMutex wrMutex
tx *sqlx.Tx
}

// Table helps to build sql queries against a given table. It logically
Expand Down
45 changes: 43 additions & 2 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (s *Session) Get(dest interface{}, query sq.Sqlizer) error {
if err != nil {
return err
}

// Calls s.txSingleQueryMutex.queryLock() inside:
return s.GetRaw(dest, sql, args...)
}

Expand All @@ -121,7 +123,13 @@ func (s *Session) GetRaw(dest interface{}, query string, args ...interface{}) er
}

start := time.Now()
if s.inTransaction() {
s.txSingleQueryMutex.queryLock()
}
err = s.conn().Get(dest, query, args...)
if s.inTransaction() {
s.txSingleQueryMutex.queryUnlock()
}
s.log("get", start, query, args)

if err == nil {
Expand All @@ -145,7 +153,13 @@ func (s *Session) GetTable(name string) *Table {

func (s *Session) TruncateTables(tables []string) error {
truncateCmd := fmt.Sprintf("truncate %s restart identity cascade", strings.Join(tables[:], ","))
if s.inTransaction() {
s.txSingleQueryMutex.execLock()
}
_, err := s.ExecRaw(truncateCmd)
if s.inTransaction() {
s.txSingleQueryMutex.execUnlock()
}
return err
}

Expand All @@ -155,6 +169,7 @@ func (s *Session) Exec(query sq.Sqlizer) (sql.Result, error) {
if err != nil {
return nil, err
}
// Calls s.txSingleQueryMutex.execLock() inside:
return s.ExecRaw(sql, args...)
}

Expand Down Expand Up @@ -186,7 +201,13 @@ func (s *Session) ExecRaw(query string, args ...interface{}) (sql.Result, error)
}

start := time.Now()
if s.inTransaction() {
s.txSingleQueryMutex.execLock()
}
result, err := s.conn().Exec(query, args...)
if s.inTransaction() {
s.txSingleQueryMutex.execUnlock()
}
s.log("exec", start, query, args)

if err == nil {
Expand All @@ -206,23 +227,32 @@ func (s *Session) NoRows(err error) bool {
return err == sql.ErrNoRows
}

// Query runs `query`, returns a *sqlx.Rows instance
// Query runs `query`, returns a *sqlx.Rows instance.
// Please note that this method may not work in a transaction because in
// Postgres you can't send Exec if you did not read all the data from previous
// Query in a single DB connection.
func (s *Session) Query(query sq.Sqlizer) (*sqlx.Rows, error) {
sql, args, err := s.build(query)
if err != nil {
return nil, err
}

// Returns Rows so locking won't help
return s.QueryRaw(sql, args...)
}

// QueryRaw runs `query` with `args`
// QueryRaw runs `query` with `args`.
// Please note that this method may not work in a transaction because in
// Postgres you can't send Exec if you did not read all the data from previous
// Query in a single DB connection.
func (s *Session) QueryRaw(query string, args ...interface{}) (*sqlx.Rows, error) {
query, err := s.ReplacePlaceholders(query)
if err != nil {
return nil, errors.Wrap(err, "replace placeholders failed")
}

start := time.Now()
// Returns Rows so locking won't help
result, err := s.conn().Queryx(query, args...)
s.log("query", start, query, args)

Expand All @@ -249,6 +279,10 @@ func (s *Session) ReplacePlaceholders(query string) (string, error) {
return format.ReplacePlaceholders(query)
}

func (s *Session) inTransaction() bool {
return s.tx != nil
}

// Rollback rolls back the current transaction
func (s *Session) Rollback() error {
if s.tx == nil {
Expand All @@ -267,6 +301,7 @@ func (s *Session) Select(dest interface{}, query sq.Sqlizer) error {
if err != nil {
return err
}
// Calls s.txSingleQueryMutex.queryLock() inside:
return s.SelectRaw(dest, sql, args...)
}

Expand All @@ -283,7 +318,13 @@ func (s *Session) SelectRaw(
}

start := time.Now()
if s.inTransaction() {
s.txSingleQueryMutex.queryLock()
}
err = s.conn().Select(dest, query, args...)
if s.inTransaction() {
s.txSingleQueryMutex.queryUnlock()
}
s.log("select", start, query, args)

if err == nil {
Expand Down
39 changes: 39 additions & 0 deletions support/db/session_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
package db

import (
"fmt"
"sync"
"testing"

"github.com/stellar/go/support/db/dbtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConcurrentQueriesTransaction(t *testing.T) {
db := dbtest.Postgres(t).Load(testSchema)
defer db.Close()

sess := &Session{DB: db.Open()}
defer sess.DB.Close()

err := sess.Begin()
assert.NoError(t, err)

var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
istr := fmt.Sprintf("%d", i)
var err error
if i%3 == 0 {
var names []string
err = sess.SelectRaw(&names, "SELECT name FROM people")
} else if i%3 == 1 {
var name string
err = sess.GetRaw(&name, "SELECT name FROM people LIMIT 1")
} else {
_, err = sess.ExecRaw(
"INSERT INTO people (name, hunger_level) VALUES ('bartek" + istr + "', " + istr + ")",
)
}
assert.NoError(t, err)
wg.Done()
}(i)
}

wg.Wait()
err = sess.Rollback()
assert.NoError(t, err)
}

func TestSession(t *testing.T) {
db := dbtest.Postgres(t).Load(testSchema)
defer db.Close()
Expand Down
25 changes: 25 additions & 0 deletions support/db/wrmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package db

import "sync"

// wrMutex works exactly like `sync.RWMutex` except that lock can be held by an
// arbitrary number of writers (exec) or a single reader (query).
type wrMutex struct {
m sync.RWMutex
}

func (w *wrMutex) execLock() {
w.m.RLock()
}

func (w *wrMutex) execUnlock() {
w.m.RUnlock()
}

func (w *wrMutex) queryLock() {
w.m.Lock()
}

func (w *wrMutex) queryUnlock() {
w.m.Unlock()
}