From a4ed879100b18d5760ecb44f8f19921127f9b3e2 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 2 Dec 2023 15:54:48 -0800 Subject: [PATCH] Migrate only one step down by default + `TargetVersion` set to -1 for unlimited A few tweaks to the originally proposed migration API: * When migrating down, defaults to a maximum of one step by default. This is meant as a safety feature so that it's harder to remove the River schema completely by accident. * When migrating down, `TargetVersion` can be set to `-1` explicitly to move an unlimited number of steps. This adds a way to remove the River schema completely by migrating to the equivalent of version zero (we can't use `0` for this because that's the default value of `int`). --- rivermigrate/river_migrate.go | 27 ++++++++++++----- rivermigrate/river_migrate_test.go | 47 ++++++++++++++++++++---------- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 007c111e..9341d9ab 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -116,14 +116,16 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] // MigrateOpts are options for a migrate operation. type MigrateOpts struct { // MaxSteps is the maximum number of migrations to apply either up or down. - // Leave zero for an unlimited number. Set to -1 to apply no migrations (for - // testing/checking purposes). + // When migrating in the up direction, migrates an unlimited number of steps + // by default. When migrating in the down direction, migrates only a single + // step by default (set TargetVersion to -1 to apply unlimited steps down). + // Set to -1 to apply no migrations (for testing/checking purposes). MaxSteps int // TargetVersion is a specific migration version to apply migrations to. The // version must exist and it must be in the possible list of migrations to - // apply. e.g. If requesting an up migration with version 3, version 3 not - // already be applied. + // apply. e.g. If requesting an up migration with version 3, version 3 must + // not already be applied. // // When applying migrations up, migrations are applied including the target // version, so when starting at version 0 and requesting version 3, versions @@ -131,6 +133,9 @@ type MigrateOpts struct { // migrations are applied excluding the target version, so when starting at // version 5 an requesting version 3, down migrations for versions 5 and 4 // would be applied, leaving the final schema at version 3. + // + // When migrating down, TargetVersion can be set to the special value of -1 + // to apply all down migrations (i.e. River schema is removed completely). TargetVersion int } @@ -292,11 +297,19 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, tx pgx.Tx, directio opts = &MigrateOpts{} } + var maxSteps int + switch { + case opts.MaxSteps != 0: + maxSteps = opts.MaxSteps + case direction == DirectionDown && opts.TargetVersion == 0: + maxSteps = 1 + } + switch { - case opts.MaxSteps < 0: + case maxSteps < 0: sortedTargetMigrations = []*migrationBundle{} - case opts.MaxSteps > 0: - sortedTargetMigrations = sortedTargetMigrations[0:min(opts.MaxSteps, len(sortedTargetMigrations))] + case maxSteps > 0: + sortedTargetMigrations = sortedTargetMigrations[0:min(maxSteps, len(sortedTargetMigrations))] } if opts.TargetVersion > 0 { diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index a0200eef..2ddfc948 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -80,28 +80,29 @@ func TestMigrator(t *testing.T) { return migrator, bundle } - t.Run("MigrateDown", func(t *testing.T) { + t.Run("MigrateDownDefault", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t) - // Run an initial time + // Run an initial time. Defaults to only running one step when moving in + // the down direction. { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) - require.Equal(t, seqToOne(3), sliceutil.Map(res.Versions, migrateVersionToInt)) + require.Equal(t, []int{3}, sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration") - require.Error(t, err) + err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job") + require.NoError(t, err) } - // Run once more to verify idempotency + // Run once more to go down one more step { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) - require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt)) + require.Equal(t, []int{2}, sliceutil.Map(res.Versions, migrateVersionToInt)) - err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration") + err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job") require.Error(t, err) } }) @@ -116,10 +117,7 @@ func TestMigrator(t *testing.T) { res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{}) require.NoError(t, err) - require.Equal(t, seqToOne(riverMigrationsWithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt)) - - err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration") - require.Error(t, err) + require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) t.Run("MigrateDownWithMaxSteps", func(t *testing.T) { @@ -130,14 +128,14 @@ func TestMigrator(t *testing.T) { _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) require.NoError(t, err) - res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 1}) + res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 2}) require.NoError(t, err) - require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion}, + require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion, riverMigrationsWithTestVersionsMaxVersion - 1}, sliceutil.Map(res.Versions, migrateVersionToInt)) migrations, err := queries.RiverMigrationGetAll(ctx, bundle.tx) require.NoError(t, err) - require.Equal(t, seqOneTo(riverMigrationsWithTestVersionsMaxVersion-1), + require.Equal(t, seqOneTo(riverMigrationsWithTestVersionsMaxVersion-2), sliceutil.Map(migrations, riverMigrationToInt)) err = dbExecError(ctx, bundle.tx, "SELECT name FROM test_table") @@ -184,6 +182,23 @@ func TestMigrator(t *testing.T) { require.Error(t, err) }) + t.Run("MigrateDownWithTargetVersionMinusOne", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + + res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1}) + require.NoError(t, err) + require.Equal(t, seqToOne(5), + sliceutil.Map(res.Versions, migrateVersionToInt)) + + err = dbExecError(ctx, bundle.tx, "SELECT name FROM river_migrate") + require.Error(t, err) + }) + t.Run("MigrateDownWithTargetVersionInvalid", func(t *testing.T) { t.Parallel() @@ -212,7 +227,7 @@ func TestMigrator(t *testing.T) { require.Equal(t, []int{4, 5}, sliceutil.Map(res.Versions, migrateVersionToInt)) }) - t.Run("MigrateUp", func(t *testing.T) { + t.Run("MigrateUpDefault", func(t *testing.T) { t.Parallel() migrator, bundle := setup(t)