Skip to content

Commit

Permalink
Fully functional database/sql driver (#351)
Browse files Browse the repository at this point in the history
Here, implement the rest of driver functionality on `riverdatabasesql`,
the existing driver for Go's built-in `database/sql` package. Previously
it only supported a minimal interface allowing it to run migrations, but
nothing more sophisticated like inserting jobs.

The benefit of a fully functional driver is that it will allow River to
be integrated with with other Go database packages that aren't built
around Pgx like Bun (requested in #302) and GORM (requested in #58).
I'll need to write up some documentation, but this change should make
both of those integrations possible immediately.

It also lays the groundwork for future non-Postgres drivers. It's going
to be a little more still, but I want to take a stab at SQLite, and this
change will get us a lot of the way there.

There's no way with `database/sql` to support listen/notify, so here we
introduce the idea of a poll only driver. River's client checks whether
a driver can support listen/notify on initialization, and if not, it
enters poll only mode the same way as if configured with `PollOnly`.

An intuitive idiosyncrasy of this set up is that even when using the
`database/sql` driver bundled here, regardless of whether they're
working with Bun, GORM, or whatever,  users will generally still be
using Pgx under the hood since it's the only maintained and fully
functional Postgres driver in the Go ecosystem. With that said, the
driver still has to bundle in `lib/pq` for various constructs like
`pq.Array` because we're using sqlc, and sqlc's `database/sql` driver
always uses `lib/pq`. I tried to find a way around this, but came out
fairly convinced that there is none. To rid ourselves of `lib/pq`
completely we'd need sqlc to ship an alternative Pgx driver that used
Pgx internally, but exposed a `database/sql` interface using `*sql.Tx`
instead of `pgx.Tx`.
  • Loading branch information
brandur authored Jul 6, 2024
1 parent 7bb1c41 commit 91275aa
Show file tree
Hide file tree
Showing 24 changed files with 1,604 additions and 506 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Fully functional driver for `database/sql` for use with packages like Bun and GORM. [PR #351](https://github.com/riverqueue/river/pull/351).

### Changed

- Tags are now limited to 255 characters in length, and should match the regex `\A[\w][\w\-]+[\w]\z` (importantly, they can't contain commas). [PR #351](https://github.com/riverqueue/river/pull/351).

## [0.9.0] - 2024-07-04

### Added
Expand Down
29 changes: 21 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.subscriptionManager = newSubscriptionManager(archetype, nil)
client.services = append(client.services, client.completer, client.subscriptionManager)

// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
if driver.SupportsListener() {
// In poll only mode, we don't try to initialize a notifier that
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
}
} else {
logger.Info("Driver does not support listener; entering poll only mode")
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
Expand Down Expand Up @@ -1171,6 +1175,15 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}
if tags == nil {
tags = []string{}
} else {
for _, tag := range tags {
if len(tag) > 255 {
return nil, nil, errors.New("tags should be a maximum of 255 characters long")
}
if !tagRE.MatchString(tag) {
return nil, nil, errors.New("tags should match regex " + tagRE.String())
}
}
}

if priority > 4 {
Expand All @@ -1192,10 +1205,10 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf

insertParams := &riverdriver.JobInsertFastParams{
CreatedAt: createdAt,
EncodedArgs: encodedArgs,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Metadata: metadata,
Metadata: json.RawMessage(metadata),
Priority: priority,
Queue: queue,
State: rivertype.JobStateAvailable,
Expand Down
87 changes: 70 additions & 17 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -164,7 +166,7 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p
return client
}

func startClient(ctx context.Context, t *testing.T, client *Client[pgx.Tx]) {
func startClient[TTx any](ctx context.Context, t *testing.T, client *Client[TTx]) {
t.Helper()

if err := client.Start(ctx); err != nil {
Expand All @@ -187,6 +189,21 @@ func runNewTestClient(ctx context.Context, t *testing.T, config *Config) *Client
return client
}

func subscribe[TTx any](t *testing.T, client *Client[TTx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

func Test_Client(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -217,21 +234,6 @@ func Test_Client(t *testing.T) {
return newTestClient(t, bundle.dbPool, config), bundle
}

subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
EventKindQueuePaused,
EventKindQueueResumed,
)
t.Cleanup(cancel)
return subscribeChan
}

t.Run("StartInsertAndWork", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -640,7 +642,40 @@ func Test_Client(t *testing.T) {
}
})

t.Run("PollOnly", func(t *testing.T) {
t.Run("PollOnlyDriver", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
bundle.config.PollOnly = true

stdPool := stdlib.OpenDBFromPool(bundle.dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

client, err := NewClient(riverdatabasesql.New(stdPool), config)
require.NoError(t, err)

client.testSignals.Init()

// Notifier should not have been initialized at all.
require.Nil(t, client.notifier)

insertRes, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()

event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)
require.Equal(t, rivertype.JobStateCompleted, event.Job.State)
})

t.Run("PollOnlyOption", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
Expand Down Expand Up @@ -4492,6 +4527,24 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Equal(t, []string{"tag1", "tag2"}, insertParams.Tags)
})

t.Run("TagFormatValidated", func(t *testing.T) {
t.Parallel()

{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{strings.Repeat("h", 256)},
})
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
}

{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{"tag,with,comma"},
})
require.EqualError(t, err, "tags should match regex "+tagRE.String())
}
})

t.Run("UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down
53 changes: 26 additions & 27 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestDriverDatabaseSQL_Executor(t *testing.T) {
func TestDriverDatabaseSQL(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -29,42 +29,41 @@ func TestDriverDatabaseSQL_Executor(t *testing.T) {
stdPool := stdlib.OpenDBFromPool(dbPool)
t.Cleanup(func() { require.NoError(t, stdPool.Close()) })

driver := riverdatabasesql.New(nil)
riverdrivertest.ExerciseExecutorMigrationOnly(ctx, t, driver, func(ctx context.Context, t *testing.T) *sql.Tx {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[*sql.Tx] {
t.Helper()

tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })
return riverdatabasesql.New(stdPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

return tx
})
}

func TestDriverRiverPgxV5_Executor(t *testing.T) {
t.Parallel()

ctx := context.Background()
tx, err := stdPool.BeginTx(ctx, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = tx.Rollback() })

driver := riverpgxv5.New(nil)
riverdrivertest.ExerciseExecutorFull(ctx, t, driver, func(ctx context.Context, t *testing.T) pgx.Tx {
t.Helper()

return riverinternaltest.TestTx(ctx, t)
})
return riverdatabasesql.New(nil).UnwrapExecutor(tx)
})
}

func TestDriverRiverPgxV5_Listener(t *testing.T) {
func TestDriverRiverPgxV5(t *testing.T) {
t.Parallel()

ctx := context.Background()

riverdrivertest.ExerciseListener(ctx, t, func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()
riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
})
dbPool := riverinternaltest.TestDB(ctx, t)
return riverpgxv5.New(dbPool)
},
func(ctx context.Context, t *testing.T) riverdriver.Executor {
t.Helper()

tx := riverinternaltest.TestTx(ctx, t)
return riverpgxv5.New(nil).UnwrapExecutor(tx)
})
}

func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
Expand Down
12 changes: 12 additions & 0 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package river
import (
"errors"
"fmt"
"regexp"
"slices"
"time"

"github.com/riverqueue/river/rivertype"
)

// Regular expression to which the format of tags must comply. Mainly, no
// special characters, and with hyphens in the middle.
//
// A key property here (in case this is relaxed in the future) is that commas
// must never be allowed because they're used as a delimiter during batch job
// insertion for the `riverdatabasesql` driver.
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)

// InsertOpts are optional settings for a new job which can be provided at job
// insertion time. These will override any default InsertOpts settings provided
// by JobArgsWithInsertOpts, as well as any global defaults.
Expand Down Expand Up @@ -58,6 +67,9 @@ type InsertOpts struct {
// functional behavior and are meant entirely as a user-specified construct
// to help group and categorize jobs.
//
// Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum
// of 255 characters long. No special characters are allowed.
//
// If tags are specified from both a job args override and from options on
// Insert, the latter takes precedence. Tags are not merged.
Tags []string
Expand Down
23 changes: 23 additions & 0 deletions insert_opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ import (
"github.com/riverqueue/river/rivertype"
)

func TestTagRE(t *testing.T) {
t.Parallel()

require.Regexp(t, tagRE, "aaa")
require.Regexp(t, tagRE, "_aaa")
require.Regexp(t, tagRE, "aaa_")
require.Regexp(t, tagRE, "777")
require.Regexp(t, tagRE, "my-tag")
require.Regexp(t, tagRE, "my_tag")
require.Regexp(t, tagRE, "my-longer-tag")
require.Regexp(t, tagRE, "my_longer_tag")
require.Regexp(t, tagRE, "My_Capitalized_Tag")
require.Regexp(t, tagRE, "ALL_CAPS")
require.Regexp(t, tagRE, "1_2_3")

require.NotRegexp(t, tagRE, "a")
require.NotRegexp(t, tagRE, "aa")
require.NotRegexp(t, tagRE, "-aaa")
require.NotRegexp(t, tagRE, "aaa-")
require.NotRegexp(t, tagRE, "special@characters$banned")
require.NotRegexp(t, tagRE, "commas,never,allowed")
}

func TestJobUniqueOpts_validate(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 91275aa

Please sign in to comment.