diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 447e3877..c9b4f511 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,6 +67,10 @@ jobs: env: TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_testdb?sslmode=disable + - name: Test riverpgxv5 + working-directory: ./riverdriver/riverpgxv5 + run: go test -race ./... + cli: runs-on: ubuntu-latest timeout-minutes: 3 diff --git a/client.go b/client.go index a355f5d5..62f1bd5f 100644 --- a/client.go +++ b/client.go @@ -309,8 +309,9 @@ func (ts *clientTestSignals) Init() { } var ( - errMissingConfig = errors.New("missing config") - errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)") + errMissingConfig = errors.New("missing config") + errMissingDatabasePoolWithQueues = errors.New("must have a non-nil database pool to execute jobs (either use a driver with database pool or don't configure Queues)") + errMissingDriver = errors.New("missing database driver (try wrapping a Pgx pool with river/riverdriver/riverpgxv5.New)") ) // NewClient creates a new Client with the given database driver and @@ -434,6 +435,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // we're actually going to be working jobs (as opposed to just enqueueing // them): if config.willExecuteJobs() { + if driver.GetDBPool() == nil { + return nil, errMissingDatabasePoolWithQueues + } + // TODO: for now we only support a single instance per database/schema. // If we want to provide isolation within a single database/schema, // we'll need to add a config for this. @@ -967,6 +972,8 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad return insertParams, nil } +var errInsertNoDriverDBPool = fmt.Errorf("driver must have non-nil database pool to use Insert and InsertMany (try InsertTx or InsertManyTx instead") + // Insert inserts a new job with the provided args. Job opts can be used to // override any defaults that may have been provided by an implementation of // JobArgsWithInsertOpts.InsertOpts, as well as any global defaults. The @@ -978,6 +985,10 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad // // handle error // } func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*JobRow, error) { + if c.driver.GetDBPool() == nil { + return nil, errInsertNoDriverDBPool + } + if err := c.validateJobArgs(args); err != nil { return nil, err } @@ -1053,6 +1064,10 @@ type InsertManyParams struct { // // handle error // } func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) { + if c.driver.GetDBPool() == nil { + return 0, errInsertNoDriverDBPool + } + insertParams, err := c.insertManyParams(params) if err != nil { return 0, err diff --git a/client_test.go b/client_test.go index 7f6a4f23..475f65d1 100644 --- a/client_test.go +++ b/client_test.go @@ -134,7 +134,7 @@ func newTestClient(ctx context.Context, t *testing.T, config *Config) *Client[pg dbPool := riverinternaltest.TestDB(ctx, t) - client, err := NewClient[pgx.Tx](riverpgxv5.New(dbPool), config) + client, err := NewClient(riverpgxv5.New(dbPool), config) require.NoError(t, err) client.testSignals.Init() @@ -610,6 +610,20 @@ func Test_Client_Insert(t *testing.T) { require.Equal(t, []string{"custom"}, jobRow.Tags) }) + t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, _ = setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riverinternaltest.Logger(t), + }) + require.NoError(t, err) + + _, err = client.Insert(ctx, &noOpArgs{}, nil) + require.ErrorIs(t, err, errInsertNoDriverDBPool) + }) + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { t.Parallel() @@ -697,6 +711,22 @@ func Test_Client_InsertTx(t *testing.T) { require.Equal(t, []string{"custom"}, jobRow.Tags) }) + // A client's allowed to send nil to their driver so they can, for example, + // easily use test transactions in their test suite. + t.Run("WithDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riverinternaltest.Logger(t), + }) + require.NoError(t, err) + + _, err = client.InsertTx(ctx, bundle.tx, &noOpArgs{}, nil) + require.NoError(t, err) + }) + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { t.Parallel() @@ -757,6 +787,23 @@ func Test_Client_InsertMany(t *testing.T) { require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind())) }) + t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, _ = setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riverinternaltest.Logger(t), + }) + require.NoError(t, err) + + count, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.ErrorIs(t, err, errInsertNoDriverDBPool) + require.Equal(t, int64(0), count) + }) + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { t.Parallel() @@ -856,6 +903,25 @@ func Test_Client_InsertManyTx(t *testing.T) { require.Len(t, jobs, 2, fmt.Sprintf("Expected to find exactly two jobs of kind: %s", (noOpArgs{}).Kind())) }) + // A client's allowed to send nil to their driver so they can, for example, + // easily use test transactions in their test suite. + t.Run("WithDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riverinternaltest.Logger(t), + }) + require.NoError(t, err) + + count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.NoError(t, err) + require.Equal(t, int64(1), count) + }) + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { t.Parallel() @@ -2137,6 +2203,13 @@ func Test_NewClient_MissingParameters(t *testing.T) { _, err := NewClient[pgx.Tx](riverpgxv5.New(nil), nil) require.ErrorIs(t, err, errMissingConfig) }) + + t.Run("ErrorOnDriverWithNoDatabasePoolAndQueues", func(t *testing.T) { + t.Parallel() + + _, err := NewClient[pgx.Tx](riverpgxv5.New(nil), newTestConfig(t, nil)) + require.ErrorIs(t, err, errMissingDatabasePoolWithQueues) + }) } func Test_NewClient_Validations(t *testing.T) { diff --git a/go.mod b/go.mod index 7d6a1439..755dd9dc 100644 --- a/go.mod +++ b/go.mod @@ -23,9 +23,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/crypto v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 0b76528e..0b8d209b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,4 @@ github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,8 +25,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= diff --git a/riverdriver/riverpgxv5/go.mod b/riverdriver/riverpgxv5/go.mod index e791133e..e6027196 100644 --- a/riverdriver/riverpgxv5/go.mod +++ b/riverdriver/riverpgxv5/go.mod @@ -2,13 +2,21 @@ module github.com/riverqueue/river/riverdriver/riverpgxv5 go 1.21.0 -require github.com/jackc/pgx/v5 v5.5.0 +require ( + github.com/jackc/pgx/v5 v5.5.0 + github.com/stretchr/testify v1.8.1 +) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect golang.org/x/crypto v0.15.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/text v0.14.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/riverdriver/riverpgxv5/go.sum b/riverdriver/riverpgxv5/go.sum index b9c08498..d4f10657 100644 --- a/riverdriver/riverpgxv5/go.sum +++ b/riverdriver/riverpgxv5/go.sum @@ -1,3 +1,4 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -9,11 +10,21 @@ github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw= github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= @@ -23,6 +34,8 @@ golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index fac0dbd2..9c803088 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -18,10 +18,16 @@ type Driver struct { // New returns a new Pgx v5 River driver for use with river.Client. // // It takes a pgxpool.Pool to use for use with River. The pool should already be -// configured to use the schema specified in the client's Schema field. +// configured to use the schema specified in the client's Schema field. The pool +// must not be closed while the associated client is running (not until graceful +// shutdown has completed). // -// The pool must not be closed while the associated client is running (not until -// graceful shutdown has completed). +// The database pool may be nil. If it is, a client that it's sent into will not +// be able to start up (calls to Start will error) and the Insert and InsertMany +// functions will be disabled, but the transactional-variants InsertTx and +// InsertManyTx continue to function. This behavior may be particularly useful +// in testing so that inserts can be performed and verified on a test +// transaction that will be rolled back. func New(dbPool *pgxpool.Pool) *Driver { return &Driver{dbPool: dbPool} } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go new file mode 100644 index 00000000..16e5556c --- /dev/null +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go @@ -0,0 +1,21 @@ +package riverpgxv5 + +import ( + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + t.Run("AllowsNilDatabasePool", func(t *testing.T) { + dbPool := &pgxpool.Pool{} + driver := New(dbPool) + require.Equal(t, dbPool, driver.dbPool) + }) + + t.Run("AllowsNilDatabasePool", func(t *testing.T) { + driver := New(nil) + require.Nil(t, driver.dbPool) + }) +}