Skip to content

Commit

Permalink
Migrate only one step down by default + TargetVersion set to -1 for…
Browse files Browse the repository at this point in the history
… unlimited

A few tweaks to the originally proposed migration API:

* When migrating down, defaults to a maximum of one step by default.
  This is meant as a safety feature so that it's harder to remove the
  River schema completely by accident.

* When migrating down, `TargetVersion` can be set to `-1` explicitly to
  move an unlimited number of steps. This adds a way to remove the River
  schema completely by migrating to the equivalent of version zero (we
  can't use `0` for this because that's the default value of `int`).
  • Loading branch information
brandur committed Dec 3, 2023
1 parent 3e6da84 commit 29b1f00
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
51 changes: 32 additions & 19 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,26 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]
// MigrateOpts are options for a migrate operation.
type MigrateOpts struct {
// MaxSteps is the maximum number of migrations to apply either up or down.
// Leave zero for an unlimited number. Set to -1 to apply no migrations (for
// testing/checking purposes).
// When migrating in the up direction, migrates an unlimited number of steps
// by default. When migrating in the down direction, migrates only a single
// step by default (set TargetVersion to -1 to apply unlimited steps down).
// Set to -1 to apply no migrations (for testing/checking purposes).
MaxSteps int

// TargetVersion is a specific migration version to apply migrations to. The
// version must exist and it must be in the possible list of migrations to
// apply. e.g. If requesting an up migration with version 3, version 3 not
// already be applied.
// apply. e.g. If requesting an up migration with version 3, version 3 must
// not already be applied.
//
// When applying migrations up, migrations are applied including the target
// version, so when starting at version 0 and requesting version 3, versions
// 1, 2, and 3 would be applied. When applying migrations down, down
// migrations are applied excluding the target version, so when starting at
// version 5 an requesting version 3, down migrations for versions 5 and 4
// would be applied, leaving the final schema at version 3.
//
// When migrating down, TargetVersion can be set to the special value of -1
// to apply all down migrations (i.e. River schema is removed completely).
TargetVersion int
}

Expand Down Expand Up @@ -160,12 +165,12 @@ const (
// Migrate migrates the database in the given direction (up or down). The opts
// parameter may be omitted for convenience.
//
// By default, applies all outstanding migrations possible in either direction.
// When migrating up all outstanding migrations are applied, and when migrating
// down all existing migrations are unapplied.
//
// When migrating down, use with caution. MigrateOpts.MaxSteps should be set to
// 1 to only migrate down one step.
// By default, applies all outstanding migrations when moving in the up
// direction, but for safety, only one step when moving in the down direction.
// To migrate more than one step down, MigrateOpts.MaxSteps or
// MigrateOpts.TargetVersion are available. Setting MigrateOpts.TargetVersion to
// -1 will apply every available downstep so that River's schema is removed
// completely.
//
// res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
// if err != nil {
Expand All @@ -187,12 +192,12 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
// Migrate migrates the database in the given direction (up or down). The opts
// parameter may be omitted for convenience.
//
// By default, applies all outstanding migrations possible in either direction.
// When migrating up all outstanding migrations are applied, and when migrating
// down all existing migrations are unapplied.
//
// When migrating down, use with caution. MigrateOpts.MaxSteps should be set to
// 1 to only migrate down one step.
// By default, applies all outstanding migrations when moving in the up
// direction, but for safety, only one step when moving in the down direction.
// To migrate more than one step down, MigrateOpts.MaxSteps or
// MigrateOpts.TargetVersion are available. Setting MigrateOpts.TargetVersion to
// -1 will apply every available downstep so that River's schema is removed
// completely.
//
// res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, nil)
// if err != nil {
Expand Down Expand Up @@ -292,11 +297,19 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, tx pgx.Tx, directio
opts = &MigrateOpts{}
}

var maxSteps int
switch {
case opts.MaxSteps != 0:
maxSteps = opts.MaxSteps
case direction == DirectionDown && opts.TargetVersion == 0:
maxSteps = 1
}

switch {
case opts.MaxSteps < 0:
case maxSteps < 0:
sortedTargetMigrations = []*migrationBundle{}
case opts.MaxSteps > 0:
sortedTargetMigrations = sortedTargetMigrations[0:min(opts.MaxSteps, len(sortedTargetMigrations))]
case maxSteps > 0:
sortedTargetMigrations = sortedTargetMigrations[0:min(maxSteps, len(sortedTargetMigrations))]
}

if opts.TargetVersion > 0 {
Expand Down
47 changes: 31 additions & 16 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,29 @@ func TestMigrator(t *testing.T) {
return migrator, bundle
}

t.Run("MigrateDown", func(t *testing.T) {
t.Run("MigrateDownDefault", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

// Run an initial time
// Run an initial time. Defaults to only running one step when moving in
// the down direction.
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, seqToOne(3), sliceutil.Map(res.Versions, migrateVersionToInt))
require.Equal(t, []int{3}, sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration")
require.Error(t, err)
err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job")
require.NoError(t, err)
}

// Run once more to verify idempotency
// Run once more to go down one more step
{
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, []int{}, sliceutil.Map(res.Versions, migrateVersionToInt))
require.Equal(t, []int{2}, sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration")
err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_job")
require.Error(t, err)
}
})
Expand All @@ -116,10 +117,7 @@ func TestMigrator(t *testing.T) {

res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{})
require.NoError(t, err)
require.Equal(t, seqToOne(riverMigrationsWithTestVersionsMaxVersion), sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT * FROM river_migration")
require.Error(t, err)
require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt))
})

t.Run("MigrateDownWithMaxSteps", func(t *testing.T) {
Expand All @@ -130,14 +128,14 @@ func TestMigrator(t *testing.T) {
_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 1})
res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{MaxSteps: 2})
require.NoError(t, err)
require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion},
require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion, riverMigrationsWithTestVersionsMaxVersion - 1},
sliceutil.Map(res.Versions, migrateVersionToInt))

migrations, err := queries.RiverMigrationGetAll(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, seqOneTo(riverMigrationsWithTestVersionsMaxVersion-1),
require.Equal(t, seqOneTo(riverMigrationsWithTestVersionsMaxVersion-2),
sliceutil.Map(migrations, riverMigrationToInt))

err = dbExecError(ctx, bundle.tx, "SELECT name FROM test_table")
Expand Down Expand Up @@ -184,6 +182,23 @@ func TestMigrator(t *testing.T) {
require.Error(t, err)
})

t.Run("MigrateDownWithTargetVersionMinusOne", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{TargetVersion: -1})
require.NoError(t, err)
require.Equal(t, seqToOne(5),
sliceutil.Map(res.Versions, migrateVersionToInt))

err = dbExecError(ctx, bundle.tx, "SELECT name FROM river_migrate")
require.Error(t, err)
})

t.Run("MigrateDownWithTargetVersionInvalid", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -212,7 +227,7 @@ func TestMigrator(t *testing.T) {
require.Equal(t, []int{4, 5}, sliceutil.Map(res.Versions, migrateVersionToInt))
})

t.Run("MigrateUp", func(t *testing.T) {
t.Run("MigrateUpDefault", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)
Expand Down

0 comments on commit 29b1f00

Please sign in to comment.