Skip to content

Commit

Permalink
sql: add Database.WithConnection (#6445)
Browse files Browse the repository at this point in the history
## Motivation

In some situations (most notably, syncv2) a lot of database queries
need to be made with no need for a read or write transaction.
Database connection pool adds noticeable delays to the queries.
Using read transactions instead of simple connections has the side
effect of blocking WAL checkpoints.
  • Loading branch information
ivan4th committed Nov 15, 2024
1 parent c4e59b6 commit 9a0f4e8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 21 deletions.
109 changes: 91 additions & 18 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,21 +572,68 @@ type Interceptor func(query string) error
type Database interface {
Executor
QueryCache
// Close closes the database.
Close() error
// QueryCount returns the number of queries executed on the database.
QueryCount() int
// QueryCache returns the query cache for this database, if it's present,
// or nil otherwise.
QueryCache() QueryCache
// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
Tx(ctx context.Context) (Transaction, error)
// WithTx starts a new transaction and passes it to the exec function.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTx(ctx context.Context, exec func(Transaction) error) error
// TxImmediate begins a new immediate transaction on the database, that is,
// a transaction that starts a write immediately without waiting for a write
// statement.
// The transaction returned from this function must always be released by calling
// its Release method. Release rolls back the transaction if it hasn't been
// committed.
// If the context is canceled, the currently running SQL statement is interrupted.
TxImmediate(ctx context.Context) (Transaction, error)
// WithTxImmediate starts a new immediate transaction and passes it to the exec
// function.
// An immediate transaction is started immediately, without waiting for a write
// statement.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTxImmediate(ctx context.Context, exec func(Transaction) error) error
// WithConnection executes the provided function with a connection from the
// database pool.
// If many queries are to be executed in a row, but there's no need for an
// explicit transaction which may be long-running and thus block
// WAL checkpointing, it may be preferable to use a single connection for
// it to avoid database pool overhead.
// The connection is released back to the pool after the function returns.
// If the context is canceled, the currently running SQL statement is interrupted.
WithConnection(ctx context.Context, exec func(Executor) error) error
// Intercept adds an interceptor function to the database. The interceptor
// functions are invoked upon each query on the database, including queries
// executed within transactions.
// The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Intercept(key string, fn Interceptor)
// RemoveInterceptor removes the interceptor function with specified key from the database.
RemoveInterceptor(key string)
}

// Transaction represents a transaction.
type Transaction interface {
Executor
// Commit commits the transaction.
Commit() error
// Release releases the transaction. If the transaction hasn't been committed,
// it's rolled back.
Release() error
}

Expand Down Expand Up @@ -684,34 +731,22 @@ func (db *sqliteDatabase) startExclusive() error {
return nil
}

// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
// Tx implements Database.
func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginDefault)
}

// WithTx will pass initialized deferred transaction to exec callback.
// Will commit only if error is nil.
// WithTx implements Database.
func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginDefault, exec)
}

// TxImmediate creates immediate transaction.
//
// IMMEDIATE cause the database connection to start a new write immediately, without waiting
// for a write statement. The BEGIN IMMEDIATE might fail with SQLITE_BUSY if another write
// transaction is already active on another database connection.
// TxImmediate implements Database.
func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginImmediate)
}

// WithTxImmediate will pass initialized immediate transaction to exec callback.
// Will commit only if error is nil.
// WithTxImmediate implements Database.
func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginImmediate, exec)
}
Expand All @@ -727,7 +762,7 @@ func (db *sqliteDatabase) runInterceptors(query string) error {
return nil
}

// Exec statement using one of the connection from the pool.
// Exec implements Executor.
//
// If you care about atomicity of the operation (for example writing rewards to multiple accounts)
// Tx should be used. Otherwise sqlite will not guarantee that all side-effects of operations are
Expand Down Expand Up @@ -758,7 +793,7 @@ func (db *sqliteDatabase) Exec(query string, encoder Encoder, decoder Decoder) (
return exec(conn, query, encoder, decoder)
}

// Close closes all pooled connections.
// Close implements Database.
func (db *sqliteDatabase) Close() error {
db.closeMux.Lock()
defer db.closeMux.Unlock()
Expand All @@ -772,6 +807,23 @@ func (db *sqliteDatabase) Close() error {
return nil
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
if db.closed {
return ErrClosed
}
conCtx, cancel := context.WithCancel(ctx)
conn := db.getConn(conCtx)
defer func() {
cancel()
db.pool.Put(conn)
}()
if conn == nil {
return ErrNoConnection
}
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
}

// Intercept adds an interceptor function to the database. The interceptor functions
// are invoked upon each query. The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Expand Down Expand Up @@ -1093,6 +1145,27 @@ func (tx *sqliteTx) Exec(query string, encoder Encoder, decoder Decoder) (int, e
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}

c.db.queryCount.Add(1)
if c.db.latency != nil {
start := time.Now()
defer func() {
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()
}
return exec(c.conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
switch sqlite.ErrCode(err) {
case sqlite.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite.SQLITE_CONSTRAINT_UNIQUE:
Expand Down
21 changes: 21 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,24 @@ func TestExclusive(t *testing.T) {
})
}
}

func TestConnection(t *testing.T) {
db := InMemoryTest(t)
var r int
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
n, err := ex.Exec("select ?", func(stmt *Statement) {
stmt.BindInt64(1, 42)
}, func(stmt *Statement) bool {
r = stmt.ColumnInt(0)
return true
})
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 42, r)
return nil
}))

require.Error(t, db.WithConnection(context.Background(), func(Executor) error {
return errors.New("error")
}))
}
5 changes: 5 additions & 0 deletions sql/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import "go.uber.org/zap"

// Executor is an interface for executing raw statement.
type Executor interface {
// Exec executes a statement.
Exec(string, Encoder, Decoder) (int, error)
}

// Migration is interface for migrations provider.
type Migration interface {
// Apply applies the migration.
Apply(db Executor, logger *zap.Logger) error
// Rollback rolls back the migration.
Rollback() error
// Name returns the name of the migration.
Name() string
// Order returns the sequential number of the migration.
Order() int
}
9 changes: 6 additions & 3 deletions sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ func LoadDBSchemaScript(db Executor) (string, error) {
return "", err
}
fmt.Fprintf(&sb, "PRAGMA user_version = %d;\n", version)
// The following SQL query ensures that tables are listed first,
// ordered by name, and then all other objects, ordered by their table name
// and then by their own name.
if _, err = db.Exec(`
SELECT tbl_name, sql || ';'
FROM sqlite_master
WHERE sql IS NOT NULL AND tbl_name NOT LIKE 'sqlite_%'
ORDER BY
CASE WHEN type = 'table' THEN 1 ELSE 2 END, -- ensures tables are first
tbl_name, -- tables are sorted by name, then all other objects
name -- (indexes, triggers, etc.) also by name
CASE WHEN type = 'table' THEN 1 ELSE 2 END,
tbl_name,
name
`, nil, func(st *Statement) bool {
fmt.Fprintln(&sb, st.ColumnText(1))
return true
Expand Down

0 comments on commit 9a0f4e8

Please sign in to comment.