Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for database/sql in migrations + framework for multi-driver River #98

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ jobs:
env:
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_testdb?sslmode=disable

- name: Test riverpgxv5
- name: Test riverdriver
working-directory: ./riverdriver
run: go test -race ./...

- name: Test riverdriver/riverdatabasesql
working-directory: ./riverdriver/riverdatabasesql
run: go test -race ./...

- name: Test riverdriver/riverpgxv5
working-directory: ./riverdriver/riverpgxv5
run: go test -race ./...

Expand Down Expand Up @@ -117,10 +125,13 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
env:
GOLANGCI_LINT_VERSION: v1.55.2
permissions:
contents: read
# allow read access to pull request. Use with `only-new-issues` option.
pull-requests: read

steps:
- uses: actions/setup-go@v4
with:
Expand All @@ -130,13 +141,33 @@ jobs:
- name: Checkout
uses: actions/checkout@v3

- name: golangci-lint
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
# Optional: show only new issues if it's a pull request. The default value is `false`.
only-new-issues: true
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: .

version: v1.55.2
- name: Lint riverdriver
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver

- name: Lint riverdriver/riverdatabasesql
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver/riverdatabasesql

- name: Lint riverdriver/riverpgxv5
uses: golangci/golangci-lint-action@v3
with:
only-new-issues: true # Optional: show only new issues if it's a pull request. The default value is `false`.
version: ${{ env.GOLANGCI_LINT_VERSION }}
working-directory: ./riverdriver/riverpgxv5

producer_sample:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -204,7 +235,6 @@ jobs:
sqlc-version: "1.22.0"

- name: Run sqlc diff
working-directory: ./internal/dbsqlc
run: |
echo "Please make sure that all sqlc changes are checked in!"
sqlc diff
make verify
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ linters-settings:
- Default
- Prefix(github.com/riverqueue)

gomoddirectives:
replace-local: true

gosec:
excludes:
- G404 # use of non-crypto random; overly broad for our use case
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `riverdriver/riverdatabasesql` driver to enable River Go migrations through Go's built in `database/sql` package. [PR #98](https://github.com/riverqueue/river/pull/98).

### Changed

- Errored jobs that have a very short duration before their next retry (<5 seconds) are set to `available` immediately instead of being made `scheduled` and having to wait for the scheduler to make a pass to make them workable. [PR #105](https://github.com/riverqueue/river/pull/105).
- `riverdriver` becomes its own submodule. It contains types that `riverdriver/riverdatabasesql` and `riverdriver/riverpgxv5` need to reference. [PR #98](https://github.com/riverqueue/river/pull/98).

## [0.0.12] - 2023-12-02

Expand Down
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,16 @@ generate: generate/sqlc

.PHONY: generate/sqlc
generate/sqlc:
cd internal/dbsqlc && sqlc generate
cd internal/dbsqlc && sqlc generate
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate

.PHONY: verify
verify:
verify: verify/sqlc

.PHONY: verify/sqlc
verify/sqlc:
cd internal/dbsqlc && sqlc diff
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc diff
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
2 changes: 2 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ queries. After changing an sqlc `.sql` file, generate Go with:
```shell
git checkout master && git pull --rebase
VERSION=v0.0.x
git tag riverdriver/VERSION -m "release riverdriver/VERSION"
git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION"
git tag riverdriver/riverdatabasesql/$VERSION -m "release riverdriver/riverdatabasesql/$VERSION"
git tag $VERSION
git push --tags
```
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
module github.com/riverqueue/river

go 1.21.0
go 1.21.4

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5
replace github.com/riverqueue/river/riverdriver => ./riverdriver

replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ./riverdriver/riverpgxv5

replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ./riverdriver/riverdatabasesql

require (
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa
github.com/jackc/pgx/v5 v5.5.1
github.com/jackc/puddle/v2 v2.2.1
github.com/oklog/ulid/v2 v2.1.0
github.com/riverqueue/river/riverdriver v0.0.0-00010101000000-000000000000
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.0-00010101000000-000000000000
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand All @@ -23,6 +29,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12 h1:mcDBnqwzEXY9WDOwbkd8xmFdSr/H6oHb1F3NCNCmLDY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.12/go.mod h1:k6hsPkW9Fl3qURzyLHbvxUCqWDpit0WrZ3oEaKezD3E=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
6 changes: 0 additions & 6 deletions internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ sql:
queries:
- river_job.sql
- river_leader.sql
- river_migration.sql
schema:
- river_job.sql
- river_leader.sql
- river_migration.sql
gen:
go:
package: "dbsqlc"
Expand Down
29 changes: 25 additions & 4 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log"
"log/slog"
"net/url"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -62,19 +63,39 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
}

func DatabaseConfig(databaseName string) *pgxpool.Config {
databaseURL := valutil.ValOrDefault(os.Getenv("TEST_DATABASE_URL"), "postgres:///river_testdb?sslmode=disable")

config, err := pgxpool.ParseConfig(databaseURL)
config, err := pgxpool.ParseConfig(DatabaseURL(databaseName))
if err != nil {
panic(fmt.Sprintf("error parsing database URL: %v", err))
}
config.MaxConns = dbPoolMaxConns
config.ConnConfig.ConnectTimeout = 10 * time.Second
config.ConnConfig.Database = databaseName
config.ConnConfig.RuntimeParams["timezone"] = "UTC"
return config
}

// DatabaseURL gets a test database URL from TEST_DATABASE_URL or falls back on
// a default pointing to `river_testdb`. If databaseName is set, it replaces the
// database in the URL, although the host and other parameters are preserved.
//
// Most of the time DatabaseConfig should be used instead of this function, but
// it may be useful in non-pgx situations like for examples showing the use of
// `database/sql`.
func DatabaseURL(databaseName string) string {
u, err := url.Parse(valutil.ValOrDefault(
os.Getenv("TEST_DATABASE_URL"),
"postgres://localhost/river_testdb?sslmode=disable"),
)
if err != nil {
panic(err)
}

if databaseName != "" {
u.Path = databaseName
}

return u.String()
}

// DiscardContinuously drains continuously out of the given channel and discards
// anything that comes out of it. Returns a stop function that should be invoked
// to stop draining. Stop must be invoked before tests finish to stop an
Expand Down
33 changes: 33 additions & 0 deletions internal/util/dbutil/db_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jackc/pgx/v5"

"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/riverdriver"
)

// Executor is an interface for a type that can begin a transaction and also
Expand Down Expand Up @@ -56,3 +57,35 @@ func WithTxV[T any](ctx context.Context, txBeginner TxBeginner, innerFunc func(c

return res, nil
}

// WithExecutorTx starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) error) error {
_, err := WithExecutorTxV(ctx, exec, func(ctx context.Context, tx riverdriver.ExecutorTx) (struct{}, error) {
return struct{}{}, innerFunc(ctx, tx)
})
return err
}

// WithExecutorTxV starts and commits a transaction on a driver executor around
// the given function, allowing the return of a generic value.
func WithExecutorTxV[T any](ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, tx riverdriver.ExecutorTx) (T, error)) (T, error) {
var defaultRes T

tx, err := exec.Begin(ctx)
if err != nil {
return defaultRes, fmt.Errorf("error beginning transaction: %w", err)
}
defer tx.Rollback(ctx)

res, err := innerFunc(ctx, tx)
if err != nil {
return defaultRes, err
}

if err := tx.Commit(ctx); err != nil {
return defaultRes, fmt.Errorf("error committing transaction: %w", err)
}

return res, nil
}
35 changes: 35 additions & 0 deletions internal/util/dbutil/db_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func TestWithTx(t *testing.T) {
Expand Down Expand Up @@ -40,3 +42,36 @@ func TestWithTxV(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 7, ret)
}

func TestWithExecutorTx(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

err := WithExecutorTx(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) error {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return nil
})
require.NoError(t, err)
}

func TestWithExecutorTxV(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPool := riverinternaltest.TestDB(ctx, t)
driver := riverpgxv5.New(dbPool)

ret, err := WithExecutorTxV(ctx, driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (int, error) {
_, err := tx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

return 7, nil
})
require.NoError(t, err)
require.Equal(t, 7, ret)
}
14 changes: 14 additions & 0 deletions riverdriver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/riverqueue/river/riverdriver

go 1.21.4

require github.com/jackc/pgx/v5 v5.5.0

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
28 changes: 28 additions & 0 deletions riverdriver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading