Skip to content

Commit

Permalink
New rivermigrate Go API for running migrations from code + target v…
Browse files Browse the repository at this point in the history
…ersion support

Here, as requested by several users, add a Go API that's able to run
migrations as an alternative to migrating via the CLI. We add a new
package `rivermigrate` to mirror the conventions of `riverdriver` and
`rivertest`.

`rivermigrate` is largely just a refactor of the existing internal
package `internal/dbmigrate` with a couple important tweaks:

* We give it the ability to take a driver the same way as we do for
  `river.Client` and `rivertest`.

* To mirror client and `rivertest` we add a transaction-specific variant
  that takes a `TTx` (`MigrateTx`).

* Because we now have the non-transaction and transaction variants, I
  take away the `Down`/`Up` function distinctions, and make up/down a
  parameter to `Migrate`/`MigrateTx` instead (so we only need two
  functions instead of four).

I also added a new option for `TargetVersion` so that it's possible to
do a limited migration to a specific version instead of going all the
way up or down. This is a pretty standard feature in most migration
frameworks.

This still can't support Goose because it takes an `sql.DB`, but I think
we can get there by adding a minimal driver for the `sql` package that
can run migrations and nothing else. More of this to be explored later.
  • Loading branch information
brandur committed Nov 26, 2023
1 parent 42cf318 commit 4f5d988
Show file tree
Hide file tree
Showing 14 changed files with 814 additions and 560 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.10] - 2023-11-26

### Added

- Added `river/rivermigrate` package to enable migrations from Go code as an alternative to using the CLI.

## [0.0.9] - 2023-11-23

### Fixed
Expand Down
27 changes: 9 additions & 18 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ package main
import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/dbmigrate"
"github.com/riverqueue/river/internal/util/slogutil"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)

func main() {
Expand Down Expand Up @@ -87,21 +85,14 @@ restricted with --max-steps.
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", -1, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "Maximum number of steps to migrate")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

execHandlingError(rootCmd.Execute)
}

func baseServiceArchetype() *baseservice.Archetype {
return &baseservice.Archetype{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelInfo}),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}

func openDBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
const (
defaultIdleInTransactionSessionTimeout = 11 * time.Second // should be greater than statement timeout because statements count towards idle-in-transaction
Expand Down Expand Up @@ -159,10 +150,10 @@ func migrateDown(ctx context.Context, opts *migrateDownOpts) error {
}
defer dbPool.Close()

migrator := dbmigrate.NewMigrator(baseServiceArchetype())
migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

_, err = migrator.Down(ctx, dbPool, &dbmigrate.MigrateOptions{
MaxSteps: &opts.MaxSteps,
_, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: opts.MaxSteps,
})
return err
}
Expand Down Expand Up @@ -191,10 +182,10 @@ func migrateUp(ctx context.Context, opts *migrateUpOpts) error {
}
defer dbPool.Close()

migrator := dbmigrate.NewMigrator(baseServiceArchetype())
migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

_, err = migrator.Up(ctx, dbPool, &dbmigrate.MigrateOptions{
MaxSteps: &opts.MaxSteps,
_, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
MaxSteps: opts.MaxSteps,
})
return err
}
16 changes: 4 additions & 12 deletions internal/cmd/testdbman/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"log/slog"
"os"
"runtime"
"time"
Expand All @@ -12,8 +11,8 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/dbmigrate"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)

func init() { //nolint:gochecknoinits
Expand Down Expand Up @@ -57,16 +56,9 @@ runtime.NumCPU() (a choice that comes from pgx's default connection pool size).
}
defer dbPool.Close()

archetype := &baseservice.Archetype{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
})),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

migrator := dbmigrate.NewMigrator(archetype)
migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

if _, err = migrator.Up(ctx, dbPool, &dbmigrate.MigrateOptions{}); err != nil {
if _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{}); err != nil {
return err
}
fmt.Printf("Loaded schema in %s.\n", dbName)
Expand Down
Loading

0 comments on commit 4f5d988

Please sign in to comment.