Skip to content

Commit

Permalink
Merge pull request #7449 from planetscale/rn-vr-mt-stop-flags
Browse files Browse the repository at this point in the history
MoveTables/Reshard: add flags to start workflows in a stopped state and to stop workflow once copy is completed
  • Loading branch information
deepthi authored Feb 11, 2021
2 parents 27c07af + b0ac683 commit 31a43f5
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 38 deletions.
25 changes: 23 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,10 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to targets")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1895,7 +1899,8 @@ func commandReshard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.F
}
source := strings.Split(subFlags.Arg(1), ",")
target := strings.Split(subFlags.Arg(2), ",")
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells, *tabletTypes)
return wr.Reshard(ctx, keyspace, workflow, source, target, *skipSchemaCopy, *cells,
*tabletTypes, *autoStart, *stopAfterCopy)
}

func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand All @@ -1911,6 +1916,9 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -1933,7 +1941,8 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
source := subFlags.Arg(0)
target := subFlags.Arg(1)
tableSpecs := subFlags.Arg(2)
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables, *excludes)
return wr.MoveTables(ctx, *workflow, source, target, tableSpecs, *cells, *tabletTypes, *allTables,
*excludes, *autoStart, *stopAfterCopy)
}

// VReplicationWorkflowAction defines subcommands passed to vtctl for movetables or reshard
Expand All @@ -1960,12 +1969,17 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")

autoStart := subFlags.Bool("auto_start", true, "If false, streams will start in the Stopped state and will need to be explicitly started")
stopAfterCopy := subFlags.Bool("stop_after_copy", false, "Streams will be stopped once the copy phase is completed")

// MoveTables-only params
sourceKeyspace := subFlags.String("source", "", "Source keyspace")
tables := subFlags.String("tables", "", "A table spec or a list of tables")
allTables := subFlags.Bool("all", false, "Move all tables from the source keyspace")
excludes := subFlags.String("exclude", "", "Tables to exclude (comma-separated) if -all is specified")
renameTables := subFlags.Bool("rename_tables", false, "Rename tables instead of dropping them")

// Reshard-only params
sourceShards := subFlags.String("source_shards", "", "Source shards")
targetShards := subFlags.String("target_shards", "", "Target shards")
skipSchemaCopy := subFlags.Bool("skip_schema_copy", false, "Skip copying of schema to target shards")
Expand Down Expand Up @@ -1995,6 +2009,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
TargetKeyspace: target,
Workflow: workflow,
DryRun: *dryRun,
AutoStart: *autoStart,
StopAfterCopy: *stopAfterCopy,
}

printDetails := func() error {
Expand Down Expand Up @@ -2154,7 +2170,12 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if err != nil {
return err
}
if !*autoStart {
wr.Logger().Printf("Workflow has been created in Stopped state\n")
break
}
wr.Logger().Printf("Waiting for workflow to start:\n")

type streamCount struct {
total, running int64
}
Expand Down
10 changes: 8 additions & 2 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (

// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypes string, allTables bool, excludeTables string) error {
cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool) error {
//FIXME validate tableSpecs, allTables, excludeTables
var tables []string
var err error
Expand Down Expand Up @@ -186,6 +186,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
TargetKeyspace: targetKeyspace,
Cell: cell,
TabletTypes: tabletTypes,
StopAfterCopy: stopAfterCopy,
}
for _, table := range tables {
buf := sqlparser.NewTrackedBuffer(nil)
Expand Down Expand Up @@ -222,7 +223,12 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
workflow, targetKeyspace)
return fmt.Errorf(msg)
}
return mz.startStreams(ctx)
if autoStart {
return mz.startStreams(ctx)
}
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")

return nil
}

func (wr *Wrangler) validateSourceTablesExist(ctx context.Context, sourceKeyspace string, ksTables, tables []string) error {
Expand Down
46 changes: 38 additions & 8 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"sort"
"strings"
"testing"

"context"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -63,7 +62,7 @@ func TestMigrateTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -104,11 +103,11 @@ func TestMissingTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -164,7 +163,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false)
require.NoError(t, err)
require.EqualValues(t, tcase.want, targetTables(env))
})
Expand All @@ -173,6 +172,37 @@ func TestMoveTablesAllAndExclude(t *testing.T) {

}

func TestMoveTablesStopFlags(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}

ctx := context.Background()
var err error
t.Run("StopStartedAndStopAfterCopyFlags", func(t *testing.T) {
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()
env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
// insert expects flag stop_after_copy to be true
insert := `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name\) values .*stop_after_copy:true.*`

env.tmc.expectVRQuery(200, insert, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
}

func TestMigrateVSchema(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
Expand All @@ -193,7 +223,7 @@ func TestMigrateVSchema(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "")
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down
22 changes: 15 additions & 7 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type resharder struct {
refStreams map[string]*refStream
cell string //single cell or cellsAlias or comma-separated list of cells/cellsAliases
tabletTypes string
stopAfterCopy bool
}

type refStream struct {
Expand All @@ -61,14 +62,16 @@ type refStream struct {
}

// Reshard initiates a resharding workflow.
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string, skipSchemaCopy bool, cell, tabletTypes string) error {
func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sources, targets []string,
skipSchemaCopy bool, cell, tabletTypes string, autoStart, stopAfterCopy bool) error {
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}
rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes)
if err != nil {
return vterrors.Wrap(err, "buildResharder")
}
rs.stopAfterCopy = stopAfterCopy
if !skipSchemaCopy {
if err := rs.copySchema(ctx); err != nil {
return vterrors.Wrap(err, "copySchema")
Expand All @@ -77,10 +80,14 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
if err := rs.createStreams(ctx); err != nil {
return vterrors.Wrap(err, "createStreams")
}
if err := rs.startStreams(ctx); err != nil {
return vterrors.Wrap(err, "startStream")
}

if autoStart {
if err := rs.startStreams(ctx); err != nil {
return vterrors.Wrap(err, "startStreams")
}
} else {
wr.Logger().Infof("Streams will not be started since -auto_start is set to false")
}
return nil
}

Expand Down Expand Up @@ -301,9 +308,10 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}),
}
bls := &binlogdatapb.BinlogSource{
Keyspace: rs.keyspace,
Shard: source.ShardName(),
Filter: filter,
Keyspace: rs.keyspace,
Shard: source.ShardName(),
Filter: filter,
StopAfterCopy: rs.stopAfterCopy,
}
ig.AddRow(rs.workflow, bls, "", rs.cell, rs.tabletTypes)
}
Expand Down
Loading

0 comments on commit 31a43f5

Please sign in to comment.