From c239588238f00f33c3167fe6fd78f566396771bf Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 15 Oct 2019 19:02:59 +0200 Subject: [PATCH 1/2] support/db: Support concurrent queries in a transaction --- support/db/main.go | 16 +++++++++++--- support/db/session.go | 45 ++++++++++++++++++++++++++++++++++++-- support/db/session_test.go | 39 +++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/support/db/main.go b/support/db/main.go index d1a8a76f85..a363476339 100644 --- a/support/db/main.go +++ b/support/db/main.go @@ -102,8 +102,17 @@ type UpdateBuilder struct { // utilities such as automatic query logging and transaction management. NOTE: // A Session is designed to be lightweight and temporarily lived (usually // request scoped) which is one reason it is acceptable for it to store a -// context. It is not presently intended to cross goroutine boundaries and is -// not concurrency safe. +// context. +// When Session is not in a DB transaction all queries are made in separate +// database connection. If there is no idle connection query will wait until +// there is one. +// When Session is in a DB transaction it is using a single DB connection. +// To support running queries from multiple go routines it implements locking: +// can run arbitrary number of Exec but only a single Get/Select query (it's +// using sync.RWLock internally). This is done because Postgres protocol does +// not allow Exec if all data has not been read from previous Get/Select. +// Keep this in mind when using methods like `Query` that retuns `*sqlx.Rows` +// so there's not possible to implement locking inside Session. type Session struct { // DB is the database connection that queries should be executed against. DB *sqlx.DB @@ -111,7 +120,8 @@ type Session struct { // Ctx is the optional context in which the repo is operating under. Ctx context.Context - tx *sqlx.Tx + txSingleQueryMutex wrMutex + tx *sqlx.Tx } // Table helps to build sql queries against a given table. It logically diff --git a/support/db/session.go b/support/db/session.go index 82fb395edd..c563345eef 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -109,6 +109,8 @@ func (s *Session) Get(dest interface{}, query sq.Sqlizer) error { if err != nil { return err } + + // Calls s.txSingleQueryMutex.queryLock() inside: return s.GetRaw(dest, sql, args...) } @@ -121,7 +123,13 @@ func (s *Session) GetRaw(dest interface{}, query string, args ...interface{}) er } start := time.Now() + if s.inTransaction() { + s.txSingleQueryMutex.queryLock() + } err = s.conn().Get(dest, query, args...) + if s.inTransaction() { + s.txSingleQueryMutex.queryUnlock() + } s.log("get", start, query, args) if err == nil { @@ -145,7 +153,13 @@ func (s *Session) GetTable(name string) *Table { func (s *Session) TruncateTables(tables []string) error { truncateCmd := fmt.Sprintf("truncate %s restart identity cascade", strings.Join(tables[:], ",")) + if s.inTransaction() { + s.txSingleQueryMutex.execLock() + } _, err := s.ExecRaw(truncateCmd) + if s.inTransaction() { + s.txSingleQueryMutex.execUnlock() + } return err } @@ -155,6 +169,7 @@ func (s *Session) Exec(query sq.Sqlizer) (sql.Result, error) { if err != nil { return nil, err } + // Calls s.txSingleQueryMutex.execLock() inside: return s.ExecRaw(sql, args...) } @@ -186,7 +201,13 @@ func (s *Session) ExecRaw(query string, args ...interface{}) (sql.Result, error) } start := time.Now() + if s.inTransaction() { + s.txSingleQueryMutex.execLock() + } result, err := s.conn().Exec(query, args...) + if s.inTransaction() { + s.txSingleQueryMutex.execUnlock() + } s.log("exec", start, query, args) if err == nil { @@ -206,16 +227,24 @@ func (s *Session) NoRows(err error) bool { return err == sql.ErrNoRows } -// Query runs `query`, returns a *sqlx.Rows instance +// Query runs `query`, returns a *sqlx.Rows instance. +// Please note that this method may not work in a transaction because in +// Postgres you can't send Exec if you did not read all the data from previous +// Query in a single DB connection. func (s *Session) Query(query sq.Sqlizer) (*sqlx.Rows, error) { sql, args, err := s.build(query) if err != nil { return nil, err } + + // Returns Rows so locking won't help return s.QueryRaw(sql, args...) } -// QueryRaw runs `query` with `args` +// QueryRaw runs `query` with `args`. +// Please note that this method may not work in a transaction because in +// Postgres you can't send Exec if you did not read all the data from previous +// Query in a single DB connection. func (s *Session) QueryRaw(query string, args ...interface{}) (*sqlx.Rows, error) { query, err := s.ReplacePlaceholders(query) if err != nil { @@ -223,6 +252,7 @@ func (s *Session) QueryRaw(query string, args ...interface{}) (*sqlx.Rows, error } start := time.Now() + // Returns Rows so locking won't help result, err := s.conn().Queryx(query, args...) s.log("query", start, query, args) @@ -249,6 +279,10 @@ func (s *Session) ReplacePlaceholders(query string) (string, error) { return format.ReplacePlaceholders(query) } +func (s *Session) inTransaction() bool { + return s.tx != nil +} + // Rollback rolls back the current transaction func (s *Session) Rollback() error { if s.tx == nil { @@ -267,6 +301,7 @@ func (s *Session) Select(dest interface{}, query sq.Sqlizer) error { if err != nil { return err } + // Calls s.txSingleQueryMutex.queryLock() inside: return s.SelectRaw(dest, sql, args...) } @@ -283,7 +318,13 @@ func (s *Session) SelectRaw( } start := time.Now() + if s.inTransaction() { + s.txSingleQueryMutex.queryLock() + } err = s.conn().Select(dest, query, args...) + if s.inTransaction() { + s.txSingleQueryMutex.queryUnlock() + } s.log("select", start, query, args) if err == nil { diff --git a/support/db/session_test.go b/support/db/session_test.go index 37bf2b89e5..99a2bd32f7 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -1,6 +1,8 @@ package db import ( + "fmt" + "sync" "testing" "github.com/stellar/go/support/db/dbtest" @@ -8,6 +10,43 @@ import ( "github.com/stretchr/testify/require" ) +func TestConcurrentQueriesTransaction(t *testing.T) { + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + sess := &Session{DB: db.Open()} + defer sess.DB.Close() + + err := sess.Begin() + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(i int) { + istr := fmt.Sprintf("%d", i) + var err error + if i%3 == 0 { + var names []string + err = sess.SelectRaw(&names, "SELECT name FROM people") + } else if i%3 == 1 { + var name string + err = sess.GetRaw(&name, "SELECT name FROM people LIMIT 1") + } else { + _, err = sess.ExecRaw( + "INSERT INTO people (name, hunger_level) VALUES ('bartek" + istr + "', " + istr + ")", + ) + } + assert.NoError(t, err) + wg.Done() + }(i) + } + + wg.Wait() + err = sess.Rollback() + assert.NoError(t, err) +} + func TestSession(t *testing.T) { db := dbtest.Postgres(t).Load(testSchema) defer db.Close() From a1235607cb6269cf954c9961b1e90e8ee3248a88 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 15 Oct 2019 19:06:35 +0200 Subject: [PATCH 2/2] Add wrMutex --- support/db/wrmutex.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 support/db/wrmutex.go diff --git a/support/db/wrmutex.go b/support/db/wrmutex.go new file mode 100644 index 0000000000..83d1a88453 --- /dev/null +++ b/support/db/wrmutex.go @@ -0,0 +1,25 @@ +package db + +import "sync" + +// wrMutex works exactly like `sync.RWMutex` except that lock can be held by an +// arbitrary number of writers (exec) or a single reader (query). +type wrMutex struct { + m sync.RWMutex +} + +func (w *wrMutex) execLock() { + w.m.RLock() +} + +func (w *wrMutex) execUnlock() { + w.m.RUnlock() +} + +func (w *wrMutex) queryLock() { + w.m.Lock() +} + +func (w *wrMutex) queryUnlock() { + w.m.Unlock() +}