diff --git a/rivermigrate/example_migrate_test.go b/rivermigrate/example_migrate_test.go new file mode 100644 index 00000000..8b8bee0e --- /dev/null +++ b/rivermigrate/example_migrate_test.go @@ -0,0 +1,105 @@ +package rivermigrate_test + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" +) + +type SortArgs struct { + // Strings is a slice of strings to sort. + Strings []string `json:"strings"` +} + +func (SortArgs) Kind() string { return "sort" } + +type SortWorker struct { + river.WorkerDefaults[SortArgs] +} + +func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { + sort.Strings(job.Args.Strings) + fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) + return nil +} + +// Example_migrate demonstrates the use of River's Go migration API by migrating +// up and down. +func Example_migrate() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + tx, err := dbPool.Begin(ctx) + if err != nil { + panic(err) + } + defer tx.Rollback(ctx) + + migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + + // Our test database starts with a full River schema. Drop it so that we can + // demonstrate working migrations. This isn't necessary outside this test. + dropRiverSchema(ctx, migrator, tx) + + printVersions := func(res *rivermigrate.MigrateResult) { + for _, version := range res.Versions { + fmt.Printf("Migrated [%s] version %d\n", strings.ToUpper(string(res.Direction)), version.Version) + } + } + + // Migrate to version 3. An actual call may want to omit all MigrateOpts, + // which will default to applying all available up migrations. + res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + TargetVersion: 3, + }) + if err != nil { + panic(err) + } + printVersions(res) + + // Migrate down by three steps. Down migrating defaults to running only one + // step unless overridden by an option like MaxSteps or TargetVersion. + res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + MaxSteps: 3, + }) + if err != nil { + panic(err) + } + printVersions(res) + + // Roll back all changes applied so our test database is left unaffected. + if err := tx.Rollback(ctx); err != nil { + panic(err) + } + + // Output: + // Migrated [UP] version 1 + // Migrated [UP] version 2 + // Migrated [UP] version 3 + // Migrated [DOWN] version 3 + // Migrated [DOWN] version 2 + // Migrated [DOWN] version 1 +} + +func dropRiverSchema(ctx context.Context, migrator *rivermigrate.Migrator[pgx.Tx], tx pgx.Tx) { + _, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + TargetVersion: -1, + }) + if err != nil { + panic(err) + } +} diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 6d3a570a..d3d54151 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -141,6 +141,9 @@ type MigrateOpts struct { // MigrateResult is the result of a migrate operation. type MigrateResult struct { + // Direction is the direction that migration occurred (up or down). + Direction Direction + // Versions are migration versions that were added (for up migrations) or // removed (for down migrations) for this run. Versions []MigrateVersion @@ -333,7 +336,7 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, tx pgx.Tx, directio } } - res := &MigrateResult{Versions: make([]MigrateVersion, 0, len(sortedTargetMigrations))} + res := &MigrateResult{Direction: direction, Versions: make([]MigrateVersion, 0, len(sortedTargetMigrations))} // Short circuit early if there's nothing to do. if len(sortedTargetMigrations) < 1 { diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 2ddfc948..7cdb1ac9 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -90,6 +90,7 @@ func TestMigrator(t *testing.T) { { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) + require.Equal(t, DirectionDown, res.Direction) require.Equal(t, []int{3}, sliceutil.Map(res.Versions, migrateVersionToInt)) err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job") @@ -100,6 +101,7 @@ func TestMigrator(t *testing.T) { { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) + require.Equal(t, DirectionDown, res.Direction) require.Equal(t, []int{2}, sliceutil.Map(res.Versions, migrateVersionToInt)) err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job") @@ -236,6 +238,7 @@ func TestMigrator(t *testing.T) { { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion - 1, riverMigrationsWithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) @@ -252,6 +255,7 @@ func TestMigrator(t *testing.T) { { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) migrations, err := queries.RiverMigrationGetAll(ctx, bundle.tx)