Skip to content

Commit

Permalink
MoveTables: add flag to specify that routing rules should not be crea…
Browse files Browse the repository at this point in the history
…ted when a movetables workflow is created (#13858)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Aug 30, 2023
1 parent 1e82c0c commit 8745291
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 40 deletions.
4 changes: 3 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ var commands = []commandGroup{
{
name: "MoveTables",
method: commandMoveTables,
params: "[--source=<sourceKs>] [--tables=<tableSpecs>] [--cells=<cells>] [--tablet_types=<source_tablet_types>] [--all] [--exclude=<tables>] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=<ddl-action>] [--source_shards=<source_shards>] [--source_time_zone=<mysql_time_zone>] [--initialize-target-sequences] <action> 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' <targetKs.workflow>",
params: "[--source=<sourceKs>] [--tables=<tableSpecs>] [--cells=<cells>] [--tablet_types=<source_tablet_types>] [--all] [--exclude=<tables>] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=<ddl-action>] [--source_shards=<source_shards>] [--source_time_zone=<mysql_time_zone>] [--initialize-target-sequences] [--no-routing-rules] <action> 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' <targetKs.workflow>",
help: `Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{"column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{"column": "id2", "name": "hash"}]}}'. In the case of an unsharded target keyspace the vschema for each table may be empty. Example: '{"t1":{}, "t2":{}}'.`,
},
{
Expand Down Expand Up @@ -2116,6 +2116,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub

// MoveTables-only params
renameTables := subFlags.Bool("rename_tables", false, "MoveTables only. Rename tables instead of dropping them. --rename_tables is only supported for Complete.")
noRoutingRules := subFlags.Bool("no-routing-rules", false, "(Advanced) MoveTables Create only. Do not create routing rules while creating the workflow. See the reference documentation for limitations if you use this flag.")

// MoveTables and Reshard params
sourceShards := subFlags.String("source_shards", "", "Source shards")
Expand Down Expand Up @@ -2260,6 +2261,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub
vrwp.ExternalCluster = externalClusterName
vrwp.SourceTimeZone = *sourceTimeZone
vrwp.DropForeignKeys = *dropForeignKeys
vrwp.NoRoutingRules = *noRoutingRules
if *sourceShards != "" {
vrwp.SourceShards = strings.Split(*sourceShards, ",")
}
Expand Down
59 changes: 29 additions & 30 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func shouldInclude(table string, excludes []string) bool {
// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypesStr string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool,
externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, sourceShards []string) (err error) {
externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string,
sourceShards []string, noRoutingRules bool) (err error) {
//FIXME validate tableSpecs, allTables, excludeTables
var tables []string
var externalTopo *topo.Server
Expand Down Expand Up @@ -283,39 +284,37 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing
// rules fails to save, we may generate duplicate table errors.
if mz.isPartial {
if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil {
if noRoutingRules {
log.Warningf("Found --no-routing-rules flag, not creating routing rules for workflow %s.%s", targetKeyspace, workflow)
} else {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
}

rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return err
}
}
}
}
if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down
45 changes: 37 additions & 8 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,35 @@ const mzCheckJournal = "/select val from _vt.resharding_journal where id="

var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()

// TestMoveTablesNoRoutingRules confirms that MoveTables does not create routing rules if --no-routing-rules is specified.
func TestMoveTablesNoRoutingRules(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"})
defer env.close()

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, true)
require.NoError(t, err)
rr, err := env.wr.ts.GetRoutingRules(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(rr.Rules))
}

func TestMigrateTables(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
Expand All @@ -70,7 +99,7 @@ func TestMigrateTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -113,11 +142,11 @@ func TestMissingTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -175,7 +204,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
require.EqualValues(t, tcase.want, targetTables(ctx, env))
})
Expand Down Expand Up @@ -211,7 +240,7 @@ func TestMoveTablesStopFlags(t *testing.T) {
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true, "", false, false, "", defaultOnDDL, nil)
"", false, "", false, true, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
Expand Down Expand Up @@ -239,7 +268,7 @@ func TestMigrateVSchema(t *testing.T) {
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -2942,7 +2971,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true, "", false, false, "", onDDLAction, nil)
"", false, "", false, true, "", false, false, "", onDDLAction, nil, false)
require.NoError(t, err)
})
}
Expand Down
5 changes: 4 additions & 1 deletion go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type VReplicationWorkflowParams struct {

// Migrate specific
ExternalCluster string

// MoveTables only
NoRoutingRules bool
}

// VReplicationWorkflow stores various internal objects for a workflow
Expand Down Expand Up @@ -433,7 +436,7 @@ func (vrw *VReplicationWorkflow) initMoveTables() error {
return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace,
vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables,
vrw.params.AutoStart, vrw.params.StopAfterCopy, vrw.params.ExternalCluster, vrw.params.DropForeignKeys,
vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards)
vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards, vrw.params.NoRoutingRules)
}

func (vrw *VReplicationWorkflow) initReshard() error {
Expand Down

0 comments on commit 8745291

Please sign in to comment.