Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move backfill options to start and migrate command #660

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions cli-definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@
"use": "migrate <directory>",
"example": "migrate ./migrations",
"flags": [
{
"name": "backfill-batch-delay",
"description": "Duration of delay between batch backfills (eg. 1s, 1000ms)",
"default": "0s"
},
{
"name": "backfill-batch-size",
"description": "Number of rows backfilled in each batch",
"default": "1000"
},
{
"name": "complete",
"shorthand": "c",
Expand Down Expand Up @@ -112,6 +122,16 @@
"use": "start <file>",
"example": "",
"flags": [
{
"name": "backfill-batch-delay",
"description": "Duration of delay between batch backfills (eg. 1s, 1000ms)",
"default": "0s"
},
{
"name": "backfill-batch-size",
"description": "Number of rows backfilled in each batch",
"default": "1000"
},
{
"name": "complete",
"shorthand": "c",
Expand Down Expand Up @@ -141,16 +161,6 @@
}
],
"flags": [
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have expected these flags to have moved, not just removed.

I wonder if the tool that generates this CLI description is working correctly.

"name": "backfill-batch-delay",
"description": "Duration of delay between batch backfills (eg. 1s, 1000ms)",
"default": "0s"
},
{
"name": "backfill-batch-size",
"description": "Number of rows backfilled in each batch",
"default": "1000"
},
{
"name": "lock-timeout",
"description": "Postgres lock timeout in milliseconds for pgroll DDL operations",
Expand Down
6 changes: 0 additions & 6 deletions cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package flags

import (
"time"

"github.com/spf13/viper"
)

Expand All @@ -24,10 +22,6 @@ func LockTimeout() int {
return viper.GetInt("LOCK_TIMEOUT")
}

func BackfillBatchSize() int { return viper.GetInt("BACKFILL_BATCH_SIZE") }

func BackfillBatchDelay() time.Duration { return viper.GetDuration("BACKFILL_BATCH_DELAY") }

func SkipValidation() bool { return viper.GetBool("SKIP_VALIDATION") }

func Role() string {
Expand Down
16 changes: 14 additions & 2 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ package cmd
import (
"fmt"
"os"
"time"

"github.com/spf13/cobra"

"github.com/xataio/pgroll/pkg/backfill"
)

func migrateCmd() *cobra.Command {
var complete bool
var batchSize int
var batchDelay time.Duration

migrateCmd := &cobra.Command{
Use: "migrate <directory>",
Expand Down Expand Up @@ -59,19 +64,26 @@ func migrateCmd() *cobra.Command {
return nil
}

backfillConfig := backfill.NewConfig(
backfill.WithBatchSize(batchSize),
backfill.WithBatchDelay(batchDelay),
)

// Run all migrations after the latest version up to the final migration,
// completing each one.
for _, mig := range migs[:len(migs)-1] {
if err := runMigration(ctx, m, mig, true); err != nil {
if err := runMigration(ctx, m, mig, true, backfillConfig); err != nil {
return fmt.Errorf("failed to run migration file %q: %w", mig.Name, err)
}
}

// Run the final migration, completing it only if requested.
return runMigration(ctx, m, migs[len(migs)-1], complete)
return runMigration(ctx, m, migs[len(migs)-1], complete, backfillConfig)
},
}

migrateCmd.Flags().IntVar(&batchSize, "backfill-batch-size", backfill.DefaultBatchSize, "Number of rows backfilled in each batch")
migrateCmd.Flags().DurationVar(&batchDelay, "backfill-batch-delay", backfill.DefaultDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
migrateCmd.Flags().BoolVarP(&complete, "complete", "c", false, "complete the final migration rather than leaving it active")

return migrateCmd
Expand Down
8 changes: 0 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
stateSchema := flags.StateSchema()
lockTimeout := flags.LockTimeout()
role := flags.Role()
backfillBatchSize := flags.BackfillBatchSize()
backfillBatchDelay := flags.BackfillBatchDelay()
skipValidation := flags.SkipValidation()

state, err := state.New(ctx, pgURL, stateSchema)
Expand All @@ -34,8 +32,6 @@ func NewRoll(ctx context.Context) (*roll.Roll, error) {
return roll.New(ctx, pgURL, schema, state,
roll.WithLockTimeoutMs(lockTimeout),
roll.WithRole(role),
roll.WithBackfillBatchSize(backfillBatchSize),
roll.WithBackfillBatchDelay(backfillBatchDelay),
roll.WithSkipValidation(skipValidation),
)
}
Expand All @@ -54,16 +50,12 @@ func Prepare() *cobra.Command {
rootCmd.PersistentFlags().String("schema", "public", "Postgres schema to use for the migration")
rootCmd.PersistentFlags().String("pgroll-schema", "pgroll", "Postgres schema to use for pgroll internal state")
rootCmd.PersistentFlags().Int("lock-timeout", 500, "Postgres lock timeout in milliseconds for pgroll DDL operations")
rootCmd.PersistentFlags().Int("backfill-batch-size", roll.DefaultBackfillBatchSize, "Number of rows backfilled in each batch")
rootCmd.PersistentFlags().Duration("backfill-batch-delay", roll.DefaultBackfillDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
rootCmd.PersistentFlags().String("role", "", "Optional postgres role to set when executing migrations")

viper.BindPFlag("PG_URL", rootCmd.PersistentFlags().Lookup("postgres-url"))
viper.BindPFlag("SCHEMA", rootCmd.PersistentFlags().Lookup("schema"))
viper.BindPFlag("STATE_SCHEMA", rootCmd.PersistentFlags().Lookup("pgroll-schema"))
viper.BindPFlag("LOCK_TIMEOUT", rootCmd.PersistentFlags().Lookup("lock-timeout"))
viper.BindPFlag("BACKFILL_BATCH_SIZE", rootCmd.PersistentFlags().Lookup("backfill-batch-size"))
viper.BindPFlag("BACKFILL_BATCH_DELAY", rootCmd.PersistentFlags().Lookup("backfill-batch-delay"))
viper.BindPFlag("ROLE", rootCmd.PersistentFlags().Lookup("role"))

// register subcommands
Expand Down
27 changes: 19 additions & 8 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ import (
"fmt"
"math"
"os"
"time"

"github.com/pterm/pterm"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/xataio/pgroll/cmd/flags"
"github.com/xataio/pgroll/pkg/backfill"
"github.com/xataio/pgroll/pkg/migrations"
"github.com/xataio/pgroll/pkg/roll"
)

func startCmd() *cobra.Command {
var complete bool
var batchSize int
var batchDelay time.Duration

startCmd := &cobra.Command{
Use: "start <file>",
Expand All @@ -34,30 +38,37 @@ func startCmd() *cobra.Command {
}
defer m.Close()

return runMigrationFromFile(cmd.Context(), m, fileName, complete)
c := backfill.NewConfig(
backfill.WithBatchSize(batchSize),
backfill.WithBatchDelay(batchDelay),
)

return runMigrationFromFile(cmd.Context(), m, fileName, complete, c)
},
}

startCmd.Flags().IntVar(&batchSize, "backfill-batch-size", backfill.DefaultBatchSize, "Number of rows backfilled in each batch")
startCmd.Flags().DurationVar(&batchDelay, "backfill-batch-delay", backfill.DefaultDelay, "Duration of delay between batch backfills (eg. 1s, 1000ms)")
startCmd.Flags().BoolVarP(&complete, "complete", "c", false, "Mark the migration as complete")

startCmd.Flags().BoolP("skip-validation", "s", false, "skip migration validation")

viper.BindPFlag("SKIP_VALIDATION", startCmd.Flags().Lookup("skip-validation"))

return startCmd
}

func runMigrationFromFile(ctx context.Context, m *roll.Roll, fileName string, complete bool) error {
func runMigrationFromFile(ctx context.Context, m *roll.Roll, fileName string, complete bool, c *backfill.Config) error {
migration, err := readMigration(fileName)
if err != nil {
return err
}

return runMigration(ctx, m, migration, complete)
return runMigration(ctx, m, migration, complete, c)
}

func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migration, complete bool) error {
func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migration, complete bool, c *backfill.Config) error {
sp, _ := pterm.DefaultSpinner.WithText("Starting migration...").Start()
cb := func(n int64, total int64) {
c.AddCallback(func(n int64, total int64) {
if total > 0 {
percent := float64(n) / float64(total) * 100
// Percent can be > 100 if we're on the last batch in which case we still want to display 100.
Expand All @@ -66,9 +77,9 @@ func runMigration(ctx context.Context, m *roll.Roll, migration *migrations.Migra
} else {
sp.UpdateText(fmt.Sprintf("%d records complete...", n))
}
}
})

err := m.Start(ctx, migration, cb)
err := m.Start(ctx, migration, c)
if err != nil {
sp.Fail(fmt.Sprintf("Failed to start migration: %s", err))
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/benchmarks/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/xataio/pgroll/internal/testutils"
"github.com/xataio/pgroll/pkg/backfill"
"github.com/xataio/pgroll/pkg/migrations"
"github.com/xataio/pgroll/pkg/roll"
)
Expand Down Expand Up @@ -71,7 +72,7 @@ func BenchmarkBackfill(b *testing.B) {

// Backfill
b.StartTimer()
require.NoError(b, mig.Start(ctx, &migAlterColumn))
require.NoError(b, mig.Start(ctx, &migAlterColumn, backfill.NewConfig()))
require.NoError(b, mig.Complete(ctx))
b.StopTimer()
b.Logf("Backfilled %d rows in %s", rowCount, b.Elapsed())
Expand Down Expand Up @@ -132,7 +133,7 @@ func BenchmarkWriteAmplification(b *testing.B) {
setupInitialTable(b, ctx, testSchema, mig, db, rowCount)

// Start the migration
require.NoError(b, mig.Start(ctx, &migAlterColumn))
require.NoError(b, mig.Start(ctx, &migAlterColumn, backfill.NewConfig()))
b.Cleanup(func() {
// Finish the migration
require.NoError(b, mig.Complete(ctx))
Expand Down Expand Up @@ -211,7 +212,7 @@ func setupInitialTable(tb testing.TB, ctx context.Context, testSchema string, mi
}

// Setup
require.NoError(tb, mig.Start(ctx, &migCreateTable))
require.NoError(tb, mig.Start(ctx, &migCreateTable, backfill.NewConfig()))
require.NoError(tb, mig.Complete(ctx))
seed(tb, rowCount, db)
}
Expand Down
16 changes: 5 additions & 11 deletions pkg/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,18 @@ import (
)

type Backfill struct {
conn db.DB
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
conn db.DB
*Config
}

type CallbackFn func(done int64, total int64)

// New creates a new backfill operation with the given options. The backfill is
// not started until `Start` is invoked.
func New(conn db.DB, opts ...OptionFn) *Backfill {
func New(conn db.DB, c *Config) *Backfill {
b := &Backfill{
conn: conn,
batchSize: 1000,
}

for _, opt := range opts {
opt(b)
conn: conn,
Config: c,
}

return b
Expand Down
51 changes: 51 additions & 0 deletions pkg/backfill/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// SPDX-License-Identifier: Apache-2.0

package backfill

import (
"time"
)

type Config struct {
batchSize int
batchDelay time.Duration
callbacks []CallbackFn
}

const (
DefaultBatchSize int = 1000
DefaultDelay time.Duration = 0
)

type OptionFn func(*Config)

func NewConfig(opts ...OptionFn) *Config {
c := &Config{
batchSize: DefaultBatchSize,
batchDelay: DefaultDelay,
callbacks: make([]CallbackFn, 0),
}

for _, opt := range opts {
opt(c)
}
return c
}

// WithBatchSize sets the batch size for the backfill operation.
func WithBatchSize(batchSize int) OptionFn {
return func(o *Config) {
o.batchSize = batchSize
}
}

// WithBatchDelay sets the delay between batches for the backfill operation.
func WithBatchDelay(delay time.Duration) OptionFn {
return func(o *Config) {
o.batchDelay = delay
}
}

func (c *Config) AddCallback(fn CallbackFn) {
c.callbacks = append(c.callbacks, fn)
}
29 changes: 0 additions & 29 deletions pkg/backfill/options.go

This file was deleted.

Loading