Skip to content

Commit

Permalink
Add example test for River's Go migration API
Browse files Browse the repository at this point in the history
Follow up #67 by adding an example test for the new Go migration API.
Helps provide a more copy/pastable example for River's godoc, and
something we can link to from our other docs.
  • Loading branch information
brandur committed Dec 7, 2023
1 parent 0bbcd0c commit 02e3d70
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
105 changes: 105 additions & 0 deletions rivermigrate/example_migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rivermigrate_test

import (
"context"
"fmt"
"sort"
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)

type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

type SortWorker struct {
river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}

// Example_migrate demonstrates the use of River's Go migration API by migrating
// up and down.
func Example_migrate() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.Begin(ctx)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)

migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
fmt.Printf("Migrated [%s] version %d\n", strings.ToUpper(string(res.Direction)), version.Version)
}
}

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
panic(err)
}
printVersions(res)

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
panic(err)
}
printVersions(res)

// Roll back all changes applied so our test database is left unaffected.
if err := tx.Rollback(ctx); err != nil {
panic(err)
}

// Output:
// Migrated [UP] version 1
// Migrated [UP] version 2
// Migrated [UP] version 3
// Migrated [DOWN] version 3
// Migrated [DOWN] version 2
// Migrated [DOWN] version 1
}

func dropRiverSchema(ctx context.Context, migrator *rivermigrate.Migrator[pgx.Tx], tx pgx.Tx) {
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
if err != nil {
panic(err)
}
}
5 changes: 4 additions & 1 deletion rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type MigrateOpts struct {

// MigrateResult is the result of a migrate operation.
type MigrateResult struct {
// Direction is the direction that migration occurred (up or down).
Direction Direction

// Versions are migration versions that were added (for up migrations) or
// removed (for down migrations) for this run.
Versions []MigrateVersion
Expand Down Expand Up @@ -333,7 +336,7 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, tx pgx.Tx, directio
}
}

res := &MigrateResult{Versions: make([]MigrateVersion, 0, len(sortedTargetMigrations))}
res := &MigrateResult{Direction: direction, Versions: make([]MigrateVersion, 0, len(sortedTargetMigrations))}

// Short circuit early if there's nothing to do.
if len(sortedTargetMigrations) < 1 {
Expand Down
4 changes: 4 additions & 0 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestMigrator(t *testing.T) {
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, DirectionDown, res.Direction)
require.Equal(t, []int{3}, sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job")
Expand All @@ -100,6 +101,7 @@ func TestMigrator(t *testing.T) {
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, DirectionDown, res.Direction)
require.Equal(t, []int{2}, sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job")
Expand Down Expand Up @@ -236,6 +238,7 @@ func TestMigrator(t *testing.T) {
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, DirectionUp, res.Direction)
require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion - 1, riverMigrationsWithTestVersionsMaxVersion},
sliceutil.Map(res.Versions, migrateVersionToInt))

Expand All @@ -252,6 +255,7 @@ func TestMigrator(t *testing.T) {
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, DirectionUp, res.Direction)
require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt))

migrations, err := queries.RiverMigrationGetAll(ctx, bundle.tx)
Expand Down

0 comments on commit 02e3d70

Please sign in to comment.