Skip to content

Commit

Permalink
Allow River client to be created with nil database pool
Browse files Browse the repository at this point in the history
I realized as I was working on test helpers and thinking about
transactions that having to send a River client a database pool in a
test suite would be _very_ inconvenient. We make a database pool on a
test database relatively easily available in River (I say "relatively"
because the test suite still cannot support `-p > 1`), but for many
applications that won't be the case. At my work for example, sending
River a database pool in tests would create the risk of tainting the
test database which would be very undesirable.

Here, we add the capacity for a River client/driver to be initialized
with a `nil` database pool. It works like this:

* `Start` is not available and errors if call.
* The non-transactional variants of `Insert`/`InsertMany` are not
  available and error if called.
* `InsertTx`/`InsertManyTx` will continue to work happily and insert
  jobs as they always would.

So when testing something like an API endpoint, users would create and
inject a test transaction, make sure to always use `InsertTx` and
`InsertManyTx`, then reuse the same test transaction with the test
helpers to verify the inserts. They assert only that the jobs are
inserted and not that the worker work is performed. That's enough
assuming that River works (and we hope it does) and will help keep their
test suite much faster because not every job needs to actually round
trip through its worker.

There could even be cases where this would be useful in prod. On the API
side you might make sure to initialize the client with a `nil` database
pool so that an accidental job insertion outside of a transaction (i.e.
accidental call to `Insert` instead of `InsertTx`) will fail, and
hopefully be caught in the test suite. If I was using River in a
project, that's how I'd do it.
  • Loading branch information
brandur committed Nov 15, 2023
1 parent 03f52d0 commit f2a7103
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ jobs:
env:
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_testdb?sslmode=disable

- name: Test riverpgxv5
working-directory: ./riverdriver/riverpgxv5
run: go test -race ./...

cli:
runs-on: ubuntu-latest
timeout-minutes: 3
Expand Down
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// There are a number of internal components that are only needed/desired if
// we're actually going to be working jobs (as opposed to just enqueueing
// them):
if config.willExecuteJobs() {
if driver.GetDBPool() != nil && config.willExecuteJobs() {
// TODO: for now we only support a single instance per database/schema.
// If we want to provide isolation within a single database/schema,
// we'll need to add a config for this.
Expand Down Expand Up @@ -539,6 +539,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// jobs, but will also cancel the context for any currently-running jobs. If
// using StopAndCancel, there's no need to also call Stop.
func (c *Client[TTx]) Start(ctx context.Context) error {
if c.driver.GetDBPool() == nil {
return fmt.Errorf("driver must wrap a non-nil database pool for a client to start working")
}
if !c.config.willExecuteJobs() {
return fmt.Errorf("client Queues and Workers must be configured for a client to start working")
}
Expand Down Expand Up @@ -967,6 +970,8 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
return insertParams, nil
}

var errInsertNoDriverDBPool = fmt.Errorf("driver must have non-nil database pool to use Insert and InsertMany (try InsertTx or InsertManyTx instead")

// Insert inserts a new job with the provided args. Job opts can be used to
// override any defaults that may have been provided by an implementation of
// JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The
Expand All @@ -978,6 +983,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
// // handle error
// }
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*JobRow, error) {
if c.driver.GetDBPool() == nil {
return nil, errInsertNoDriverDBPool
}

if err := c.validateJobArgs(args); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1053,6 +1062,10 @@ type InsertManyParams struct {
// // handle error
// }
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) {
if c.driver.GetDBPool() == nil {
return 0, errInsertNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
Expand Down
78 changes: 77 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newTestClient(ctx context.Context, t *testing.T, config *Config) *Client[pg

dbPool := riverinternaltest.TestDB(ctx, t)

client, err := NewClient[pgx.Tx](riverpgxv5.New(dbPool), config)
client, err := NewClient(riverpgxv5.New(dbPool), config)
require.NoError(t, err)

client.testSignals.Init()
Expand Down Expand Up @@ -610,6 +610,19 @@ func Test_Client_Insert(t *testing.T) {
require.Equal(t, []string{"custom"}, jobRow.Tags)
})

t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, _ = setup(t)

config := newTestConfig(t, nil)
client, err := NewClient(riverpgxv5.New(nil), config)
require.NoError(t, err)

_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.ErrorIs(t, err, errInsertNoDriverDBPool)
})

t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -697,6 +710,21 @@ func Test_Client_InsertTx(t *testing.T) {
require.Equal(t, []string{"custom"}, jobRow.Tags)
})

// A client's allowed to send nil to their drive so they can, for example,
// easily use test transactions in their test suite.
t.Run("WithDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

config := newTestConfig(t, nil)
client, err := NewClient(riverpgxv5.New(nil), config)
require.NoError(t, err)

_, err = client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)
})

t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -757,6 +785,22 @@ func Test_Client_InsertMany(t *testing.T) {
require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind()))
})

t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, _ = setup(t)

config := newTestConfig(t, nil)
client, err := NewClient(riverpgxv5.New(nil), config)
require.NoError(t, err)

count, err := client.InsertMany(ctx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.ErrorIs(t, err, errInsertNoDriverDBPool)
require.Equal(t, int64(0), count)
})

t.Run("ErrorsWithZeroJobs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -856,6 +900,24 @@ func Test_Client_InsertManyTx(t *testing.T) {
require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind()))
})

// A client's allowed to send nil to their drive so they can, for example,
// easily use test transactions in their test suite.
t.Run("WithDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

config := newTestConfig(t, nil)
client, err := NewClient(riverpgxv5.New(nil), config)
require.NoError(t, err)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.NoError(t, err)
require.Equal(t, int64(1), count)
})

t.Run("ErrorsWithZeroJobs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1972,6 +2034,20 @@ func Test_Client_Start_Error(t *testing.T) {

ctx := context.Background()

t.Run("NoDriverPool", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
config.Queues = nil
config.Workers = nil

client, err := NewClient(riverpgxv5.New(nil), config)
require.NoError(t, err)

err = client.Start(ctx)
require.EqualError(t, err, "driver must wrap a non-nil database pool for a client to start working")
})

t.Run("NoQueueConfiguration", func(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -26,8 +25,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
10 changes: 9 additions & 1 deletion riverdriver/riverpgxv5/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ module github.com/riverqueue/river/riverdriver/riverpgxv5

go 1.21.0

require github.com/jackc/pgx/v5 v5.5.0
require (
github.com/jackc/pgx/v5 v5.5.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 13 additions & 0 deletions riverdriver/riverpgxv5/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -9,11 +10,21 @@ github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
Expand All @@ -23,6 +34,8 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
12 changes: 9 additions & 3 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ type Driver struct {
// New returns a new Pgx v5 River driver for use with river.Client.
//
// It takes a pgxpool.Pool to use for use with River. The pool should already be
// configured to use the schema specified in the client's Schema field.
// configured to use the schema specified in the client's Schema field. The pool
// must not be closed while the associated client is running (not until graceful
// shutdown has completed).
//
// The pool must not be closed while the associated client is running (not until
// graceful shutdown has completed).
// The database pool may be nil. If it is, a client that it's sent into will not
// be able to start up (calls to Start will error) and the Insert and InsertMany
// functions will be disabled, but the transactional-variants InsertTx and
// InsertManyTx continue to function. This behavior may be particularly useful
// in testing so that inserts can be performed and verified on a test
// transaction that will be rolled back.
func New(dbPool *pgxpool.Pool) *Driver {
return &Driver{dbPool: dbPool}
}
Expand Down
21 changes: 21 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package riverpgxv5

import (
"testing"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
t.Run("AllowsNilDatabasePool", func(t *testing.T) {
dbPool := &pgxpool.Pool{}
driver := New(dbPool)
require.Equal(t, dbPool, driver.dbPool)
})

t.Run("AllowsNilDatabasePool", func(t *testing.T) {
driver := New(nil)
require.Nil(t, driver.dbPool)
})
}

0 comments on commit f2a7103

Please sign in to comment.