Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow River client to be created with nil database pool #30

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ jobs:
env:
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_testdb?sslmode=disable

- name: Test riverpgxv5
working-directory: ./riverdriver/riverpgxv5
run: go test -race ./...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL go test ./... will not run submodule tests and even go test ./riverdriver/riverpgxv5 will refuse to work.


cli:
runs-on: ubuntu-latest
timeout-minutes: 3
Expand Down
19 changes: 17 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ func (ts *clientTestSignals) Init() {
}

var (
errMissingConfig = errors.New("missing config")
errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)")
errMissingConfig = errors.New("missing config")
errMissingDatabasePoolWithQueues = errors.New("must have a non-nil database pool to execute jobs (either use a driver with database pool or don't configure Queues)")
errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)")
)

// NewClient creates a new Client with the given database driver and
Expand Down Expand Up @@ -434,6 +435,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// we're actually going to be working jobs (as opposed to just enqueueing
// them):
if config.willExecuteJobs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also make it a config error to specify Queues with no pool? Queues implies there will be workers working on those queues, which doesn't make sense if those workers could never be enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, so that occurred to me as well, but I was 50/50 on the best option. The slight downside of erroring on NewClient is that it makes it slightly harder to share one test client between everything (e.g. in our suite, sending a nil database pool driver combined with a config from our general newTestConfig would be an error because the latter includes Queues.

Let's try it out though. Pushed.

if driver.GetDBPool() == nil {
return nil, errMissingDatabasePoolWithQueues
}

// TODO: for now we only support a single instance per database/schema.
// If we want to provide isolation within a single database/schema,
// we'll need to add a config for this.
Expand Down Expand Up @@ -967,6 +972,8 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
return insertParams, nil
}

var errInsertNoDriverDBPool = fmt.Errorf("driver must have non-nil database pool to use Insert and InsertMany (try InsertTx or InsertManyTx instead")

// Insert inserts a new job with the provided args. Job opts can be used to
// override any defaults that may have been provided by an implementation of
// JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The
Expand All @@ -978,6 +985,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
// // handle error
// }
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*JobRow, error) {
if c.driver.GetDBPool() == nil {
return nil, errInsertNoDriverDBPool
}

if err := c.validateJobArgs(args); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1053,6 +1064,10 @@ type InsertManyParams struct {
// // handle error
// }
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) {
if c.driver.GetDBPool() == nil {
return 0, errInsertNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
Expand Down
75 changes: 74 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newTestClient(ctx context.Context, t *testing.T, config *Config) *Client[pg

dbPool := riverinternaltest.TestDB(ctx, t)

client, err := NewClient[pgx.Tx](riverpgxv5.New(dbPool), config)
client, err := NewClient(riverpgxv5.New(dbPool), config)
require.NoError(t, err)

client.testSignals.Init()
Expand Down Expand Up @@ -610,6 +610,20 @@ func Test_Client_Insert(t *testing.T) {
require.Equal(t, []string{"custom"}, jobRow.Tags)
})

t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, _ = setup(t)

client, err := NewClient(riverpgxv5.New(nil), &Config{
Logger: riverinternaltest.Logger(t),
})
require.NoError(t, err)

_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.ErrorIs(t, err, errInsertNoDriverDBPool)
})

t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -697,6 +711,22 @@ func Test_Client_InsertTx(t *testing.T) {
require.Equal(t, []string{"custom"}, jobRow.Tags)
})

// A client's allowed to send nil to their driver so they can, for example,
// easily use test transactions in their test suite.
t.Run("WithDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

client, err := NewClient(riverpgxv5.New(nil), &Config{
Logger: riverinternaltest.Logger(t),
})
require.NoError(t, err)

_, err = client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil)
require.NoError(t, err)
})

t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -757,6 +787,23 @@ func Test_Client_InsertMany(t *testing.T) {
require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind()))
})

t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, _ = setup(t)

client, err := NewClient(riverpgxv5.New(nil), &Config{
Logger: riverinternaltest.Logger(t),
})
require.NoError(t, err)

count, err := client.InsertMany(ctx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.ErrorIs(t, err, errInsertNoDriverDBPool)
require.Equal(t, int64(0), count)
})

t.Run("ErrorsWithZeroJobs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -856,6 +903,25 @@ func Test_Client_InsertManyTx(t *testing.T) {
require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind()))
})

// A client's allowed to send nil to their driver so they can, for example,
// easily use test transactions in their test suite.
t.Run("WithDriverWithoutPool", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

client, err := NewClient(riverpgxv5.New(nil), &Config{
Logger: riverinternaltest.Logger(t),
})
require.NoError(t, err)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.NoError(t, err)
require.Equal(t, int64(1), count)
})

t.Run("ErrorsWithZeroJobs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2137,6 +2203,13 @@ func Test_NewClient_MissingParameters(t *testing.T) {
_, err := NewClient[pgx.Tx](riverpgxv5.New(nil), nil)
require.ErrorIs(t, err, errMissingConfig)
})

t.Run("ErrorOnDriverWithNoDatabasePoolAndQueues", func(t *testing.T) {
t.Parallel()

_, err := NewClient[pgx.Tx](riverpgxv5.New(nil), newTestConfig(t, nil))
require.ErrorIs(t, err, errMissingDatabasePoolWithQueues)
})
}

func Test_NewClient_Validations(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -26,8 +25,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
10 changes: 9 additions & 1 deletion riverdriver/riverpgxv5/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ module github.com/riverqueue/river/riverdriver/riverpgxv5

go 1.21.0

require github.com/jackc/pgx/v5 v5.5.0
require (
github.com/jackc/pgx/v5 v5.5.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 13 additions & 0 deletions riverdriver/riverpgxv5/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -9,11 +10,21 @@ github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
Expand All @@ -23,6 +34,8 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
12 changes: 9 additions & 3 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ type Driver struct {
// New returns a new Pgx v5 River driver for use with river.Client.
//
// It takes a pgxpool.Pool to use for use with River. The pool should already be
// configured to use the schema specified in the client's Schema field.
// configured to use the schema specified in the client's Schema field. The pool
// must not be closed while the associated client is running (not until graceful
// shutdown has completed).
//
// The pool must not be closed while the associated client is running (not until
// graceful shutdown has completed).
// The database pool may be nil. If it is, a client that it's sent into will not
// be able to start up (calls to Start will error) and the Insert and InsertMany
// functions will be disabled, but the transactional-variants InsertTx and
// InsertManyTx continue to function. This behavior may be particularly useful
// in testing so that inserts can be performed and verified on a test
// transaction that will be rolled back.
func New(dbPool *pgxpool.Pool) *Driver {
return &Driver{dbPool: dbPool}
}
Expand Down
21 changes: 21 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package riverpgxv5

import (
"testing"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
t.Run("AllowsNilDatabasePool", func(t *testing.T) {
dbPool := &pgxpool.Pool{}
driver := New(dbPool)
require.Equal(t, dbPool, driver.dbPool)
})

t.Run("AllowsNilDatabasePool", func(t *testing.T) {
driver := New(nil)
require.Nil(t, driver.dbPool)
})
}
Loading