Skip to content

Commit

Permalink
Move backfill options to start and migrate command
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Feb 6, 2025
1 parent bbdb2f8 commit 4dcd097
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 156 deletions.
10 changes: 0 additions & 10 deletions cli-definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,6 @@
}
],
"flags": [
{
"name": "backfill-batch-delay",
"description": "Duration of delay between batch backfills (eg. 1s, 1000ms)",
"default": "0s"
},
{
"name": "backfill-batch-size",
"description": "Number of rows backfilled in each batch",
"default": "1000"
},
{
"name": "lock-timeout",
"description": "Postgres lock timeout in milliseconds for pgroll DDL operations",
Expand Down
18 changes: 16 additions & 2 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"os"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/xataio/pgroll/cmd/flags"
"github.com/xataio/pgroll/pkg/backfill"
)

func migrateCmd() *cobra.Command {
Expand Down Expand Up @@ -59,20 +63,30 @@ func migrateCmd() *cobra.Command {
return nil
}

backfillConfig := backfill.NewConfig(
backfill.WithBatchSize(flags.BackfillBatchSize()),
backfill.WithBatchDelay(flags.BackfillBatchDelay()),
)

// Run all migrations after the latest version up to the final migration,
// completing each one.
for _, mig := range migs[:len(migs)-1] {
if err := runMigration(ctx, m, mig, true); err != nil {
if err := runMigration(ctx, m, mig, true, backfillConfig); err != nil {
return fmt.Errorf("failed to run migration file %q: %w", mig.Name, err)
}
}

// Run the final migration, completing it only if requested.
return runMigration(ctx, m, migs[len(migs)-1], complete)
return runMigration(ctx, m, migs[len(migs)-1], complete, backfillConfig)
},
}

migrateCmd.PersistentFlags().Int("backfill-batch-size", backfill.DefaultBatchSize, "Number of rows backfilled in each batch")
migrateCmd.PersistentFlags().Duration("backfill-batch-delay", backfill.DefaultDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
migrateCmd.Flags().BoolVarP(&complete, "complete", "c", false, "complete the final migration rather than leaving it active")

viper.BindPFlag("BACKFILL_BATCH_SIZE", migrateCmd.PersistentFlags().Lookup("backfill-batch-size"))
viper.BindPFlag("BACKFILL_BATCH_DELAY", migrateCmd.PersistentFlags().Lookup("backfill-batch-delay"))

return migrateCmd
}
8 changes: 0 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
stateSchema := flags.StateSchema()
lockTimeout := flags.LockTimeout()
role := flags.Role()
backfillBatchSize := flags.BackfillBatchSize()
backfillBatchDelay := flags.BackfillBatchDelay()
skipValidation := flags.SkipValidation()

state, err := state.New(ctx, pgURL, stateSchema)
Expand All @@ -34,8 +32,6 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
return roll.New(ctx, pgURL, schema, state,
roll.WithLockTimeoutMs(lockTimeout),
roll.WithRole(role),
roll.WithBackfillBatchSize(backfillBatchSize),
roll.WithBackfillBatchDelay(backfillBatchDelay),
roll.WithSkipValidation(skipValidation),
)
}
Expand All @@ -54,16 +50,12 @@ func Prepare() *cobra.Command {
rootCmd.PersistentFlags().String("schema", "public", "Postgres schema to use for the migration")
rootCmd.PersistentFlags().String("pgroll-schema", "pgroll", "Postgres schema to use for pgroll internal state")
rootCmd.PersistentFlags().Int("lock-timeout", 500, "Postgres lock timeout in milliseconds for pgroll DDL operations")
rootCmd.PersistentFlags().Int("backfill-batch-size", roll.DefaultBackfillBatchSize, "Number of rows backfilled in each batch")
rootCmd.PersistentFlags().Duration("backfill-batch-delay", roll.DefaultBackfillDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
rootCmd.PersistentFlags().String("role", "", "Optional postgres role to set when executing migrations")

viper.BindPFlag("PG_URL", rootCmd.PersistentFlags().Lookup("postgres-url"))
viper.BindPFlag("SCHEMA", rootCmd.PersistentFlags().Lookup("schema"))
viper.BindPFlag("STATE_SCHEMA", rootCmd.PersistentFlags().Lookup("pgroll-schema"))
viper.BindPFlag("LOCK_TIMEOUT", rootCmd.PersistentFlags().Lookup("lock-timeout"))
viper.BindPFlag("BACKFILL_BATCH_SIZE", rootCmd.PersistentFlags().Lookup("backfill-batch-size"))
viper.BindPFlag("BACKFILL_BATCH_DELAY", rootCmd.PersistentFlags().Lookup("backfill-batch-delay"))
viper.BindPFlag("ROLE", rootCmd.PersistentFlags().Lookup("role"))

// register subcommands
Expand Down
26 changes: 18 additions & 8 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spf13/viper"

"github.com/xataio/pgroll/cmd/flags"
"github.com/xataio/pgroll/pkg/backfill"
"github.com/xataio/pgroll/pkg/migrations"
"github.com/xataio/pgroll/pkg/roll"
)
Expand All @@ -34,30 +35,39 @@ func startCmd() *cobra.Command {
}
defer m.Close()

return runMigrationFromFile(cmd.Context(), m, fileName, complete)
c := backfill.NewConfig(
backfill.WithBatchSize(flags.BackfillBatchSize()),
backfill.WithBatchDelay(flags.BackfillBatchDelay()),
)

return runMigrationFromFile(cmd.Context(), m, fileName, complete, c)
},
}

startCmd.PersistentFlags().Int("backfill-batch-size", backfill.DefaultBatchSize, "Number of rows backfilled in each batch")
startCmd.PersistentFlags().Duration("backfill-batch-delay", backfill.DefaultDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
startCmd.Flags().BoolVarP(&complete, "complete", "c", false, "Mark the migration as complete")

startCmd.Flags().BoolP("skip-validation", "s", false, "skip migration validation")

viper.BindPFlag("BACKFILL_BATCH_SIZE", startCmd.PersistentFlags().Lookup("backfill-batch-size"))
viper.BindPFlag("BACKFILL_BATCH_DELAY", startCmd.PersistentFlags().Lookup("backfill-batch-delay"))
viper.BindPFlag("SKIP_VALIDATION", startCmd.Flags().Lookup("skip-validation"))

return startCmd
}

func runMigrationFromFile(ctx context.Context, m *roll.Roll, fileName string, complete bool) error {
func runMigrationFromFile(ctx context.Context, m *roll.Roll, fileName string, complete bool, c *backfill.Config) error {
migration, err := readMigration(fileName)
if err != nil {
return err
}

return runMigration(ctx, m, migration, complete)
return runMigration(ctx, m, migration, complete, c)
}

func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migration, complete bool) error {
func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migration, complete bool, c *backfill.Config) error {
sp, _ := pterm.DefaultSpinner.WithText("Starting migration...").Start()
cb := func(n int64, total int64) {
c.AddCallback(func(n int64, total int64) {
if total > 0 {
percent := float64(n) / float64(total) * 100
// Percent can be > 100 if we're on the last batch in which case we still want to display 100.
Expand All @@ -66,9 +76,9 @@ func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migra
} else {
sp.UpdateText(fmt.Sprintf("%d records complete...", n))
}
}
})

err := m.Start(ctx, migration, cb)
err := m.Start(ctx, migration, c)
if err != nil {
sp.Fail(fmt.Sprintf("Failed to start migration: %s", err))
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/benchmarks/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/xataio/pgroll/internal/testutils"
"github.com/xataio/pgroll/pkg/backfill"
"github.com/xataio/pgroll/pkg/migrations"
"github.com/xataio/pgroll/pkg/roll"
)
Expand Down Expand Up @@ -71,7 +72,7 @@ func BenchmarkBackfill(b *testing.B) {

// Backfill
b.StartTimer()
require.NoError(b, mig.Start(ctx, &migAlterColumn))
require.NoError(b, mig.Start(ctx, &migAlterColumn, backfill.NewConfig()))
require.NoError(b, mig.Complete(ctx))
b.StopTimer()
b.Logf("Backfilled %d rows in %s", rowCount, b.Elapsed())
Expand Down Expand Up @@ -132,7 +133,7 @@ func BenchmarkWriteAmplification(b *testing.B) {
setupInitialTable(b, ctx, testSchema, mig, db, rowCount)

// Start the migration
require.NoError(b, mig.Start(ctx, &migAlterColumn))
require.NoError(b, mig.Start(ctx, &migAlterColumn, backfill.NewConfig()))
b.Cleanup(func() {
// Finish the migration
require.NoError(b, mig.Complete(ctx))
Expand Down Expand Up @@ -211,7 +212,7 @@ func setupInitialTable(tb testing.TB, ctx context.Context, testSchema string, mi
}

// Setup
require.NoError(tb, mig.Start(ctx, &migCreateTable))
require.NoError(tb, mig.Start(ctx, &migCreateTable, backfill.NewConfig()))
require.NoError(tb, mig.Complete(ctx))
seed(tb, rowCount, db)
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,18 @@ import (
)

type Backfill struct {
conn db.DB
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
conn db.DB
config *Config
}

type CallbackFn func(done int64, total int64)

// New creates a new backfill operation with the given options. The backfill is
// not started until `Start` is invoked.
func New(conn db.DB, opts ...OptionFn) *Backfill {
func New(conn db.DB, c *Config) *Backfill {
b := &Backfill{
conn: conn,
batchSize: 1000,
}

for _, opt := range opts {
opt(b)
conn: conn,
config: c,
}

return b
Expand Down Expand Up @@ -61,14 +55,14 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
BatchConfig: templates.BatchConfig{
TableName: table.Name,
PrimaryKey: identityColumns,
BatchSize: bf.batchSize,
BatchSize: bf.config.batchSize,
},
}

// Update each batch of rows, invoking callbacks for each one.
for batch := 0; ; batch++ {
for _, cb := range bf.callbacks {
cb(int64(batch*bf.batchSize), total)
for _, cb := range bf.config.callbacks {
cb(int64(batch*bf.config.batchSize), total)
}

if err := b.updateBatch(ctx, bf.conn); err != nil {
Expand All @@ -81,7 +75,7 @@ func (bf *Backfill) Start(ctx context.Context, table *schema.Table) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(bf.batchDelay):
case <-time.After(bf.config.batchDelay):
}
}

Expand Down
66 changes: 66 additions & 0 deletions pkg/backfill/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// SPDX-License-Identifier: Apache-2.0

package backfill

import (
"time"
)

type Config struct {
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
}

const (
DefaultBatchSize int = 1000
DefaultDelay time.Duration = 0
)

type OptionFn func(*Config)

func NewConfig(opts ...OptionFn) *Config {
c := &Config{
batchSize: DefaultBatchSize,
batchDelay: DefaultDelay,
callbacks: make([]CallbackFn, 0),
}

for _, opt := range opts {
opt(c)
}
return c
}

// WithBatchSize sets the batch size for the backfill operation.
func WithBatchSize(batchSize int) OptionFn {
return func(o *Config) {
o.batchSize = batchSize
}
}

// WithBatchDelay sets the delay between batches for the backfill operation.
func WithBatchDelay(delay time.Duration) OptionFn {
return func(o *Config) {
o.batchDelay = delay
}
}

// WithCallbacks sets the callbacks for the backfill operation.
// Callbacks are invoked after each batch is processed.
func WithCallbacks(cbs ...CallbackFn) OptionFn {
return func(o *Config) {
o.callbacks = cbs
}
}

func (c *Config) AddCallback(fn CallbackFn) {
if c.callbacks == nil {
c.callbacks = make([]CallbackFn, 0)
}
c.callbacks = append(c.callbacks, fn)
}

func (c *Config) Callbacks() []CallbackFn {
return c.callbacks
}
29 changes: 0 additions & 29 deletions pkg/backfill/options.go

This file was deleted.

8 changes: 5 additions & 3 deletions pkg/migrations/op_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/xataio/pgroll/internal/testutils"
"github.com/xataio/pgroll/pkg/backfill"
"github.com/xataio/pgroll/pkg/migrations"
"github.com/xataio/pgroll/pkg/roll"
)
Expand Down Expand Up @@ -51,10 +52,11 @@ func ExecuteTests(t *testing.T, tests TestCases, opts ...roll.Option) {
t.Run(tt.name, func(t *testing.T) {
testutils.WithMigratorInSchemaAndConnectionToContainerWithOptions(t, testSchema, opts, func(mig *roll.Roll, db *sql.DB) {
ctx := context.Background()
config := backfill.NewConfig()

// run all migrations except the last one
for i := 0; i < len(tt.migrations)-1; i++ {
if err := mig.Start(ctx, &tt.migrations[i]); err != nil {
if err := mig.Start(ctx, &tt.migrations[i], config); err != nil {
t.Fatalf("Failed to start migration: %v", err)
}

Expand All @@ -64,7 +66,7 @@ func ExecuteTests(t *testing.T, tests TestCases, opts ...roll.Option) {
}

// start the last migration
err := mig.Start(ctx, &tt.migrations[len(tt.migrations)-1])
err := mig.Start(ctx, &tt.migrations[len(tt.migrations)-1], config)
if tt.wantStartErr != nil {
if !errors.Is(err, tt.wantStartErr) {
t.Fatalf("Expected error %q, got %q", tt.wantStartErr, err)
Expand Down Expand Up @@ -98,7 +100,7 @@ func ExecuteTests(t *testing.T, tests TestCases, opts ...roll.Option) {
}

// re-start the last migration
if err := mig.Start(ctx, &tt.migrations[len(tt.migrations)-1]); err != nil {
if err := mig.Start(ctx, &tt.migrations[len(tt.migrations)-1], config); err != nil {
t.Fatalf("Failed to start migration: %v", err)
}

Expand Down
Loading

0 comments on commit 4dcd097

Please sign in to comment.