From 874529196ed14f4848d7d3feca14d359f66fd348 Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Mon, 28 Aug 2023 17:40:51 +0200 Subject: [PATCH] MoveTables: add flag to specify that routing rules should not be created when a movetables workflow is created (#13858) Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 4 +- go/vt/wrangler/materializer.go | 59 ++++++++++++++--------------- go/vt/wrangler/materializer_test.go | 45 ++++++++++++++++++---- go/vt/wrangler/workflow.go | 5 ++- 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 640e6841bda..36953be2dc5 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -444,7 +444,7 @@ var commands = []commandGroup{ { name: "MoveTables", method: commandMoveTables, - params: "[--source=] [--tables=] [--cells=] [--tablet_types=] [--all] [--exclude=] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=] [--source_shards=] [--source_time_zone=] [--initialize-target-sequences] 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' ", + params: "[--source=] [--tables=] [--cells=] [--tablet_types=] [--all] [--exclude=] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=] [--source_shards=] [--source_time_zone=] [--initialize-target-sequences] [--no-routing-rules] 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' ", help: `Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{"column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{"column": "id2", "name": "hash"}]}}'. In the case of an unsharded target keyspace the vschema for each table may be empty. Example: '{"t1":{}, "t2":{}}'.`, }, { @@ -2116,6 +2116,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub // MoveTables-only params renameTables := subFlags.Bool("rename_tables", false, "MoveTables only. Rename tables instead of dropping them. --rename_tables is only supported for Complete.") + noRoutingRules := subFlags.Bool("no-routing-rules", false, "(Advanced) MoveTables Create only. Do not create routing rules while creating the workflow. See the reference documentation for limitations if you use this flag.") // MoveTables and Reshard params sourceShards := subFlags.String("source_shards", "", "Source shards") @@ -2260,6 +2261,7 @@ func commandVReplicationWorkflow(ctx context.Context, wr *wrangler.Wrangler, sub vrwp.ExternalCluster = externalClusterName vrwp.SourceTimeZone = *sourceTimeZone vrwp.DropForeignKeys = *dropForeignKeys + vrwp.NoRoutingRules = *noRoutingRules if *sourceShards != "" { vrwp.SourceShards = strings.Split(*sourceShards, ",") } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 28a48f344ac..cc4e3030322 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -129,7 +129,8 @@ func shouldInclude(table string, excludes []string) bool { // MoveTables initiates moving table(s) over to another keyspace func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypesStr string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool, - externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, sourceShards []string) (err error) { + externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, + sourceShards []string, noRoutingRules bool) (err error) { //FIXME validate tableSpecs, allTables, excludeTables var tables []string var externalTopo *topo.Server @@ -283,39 +284,37 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta // Now that the streams have been successfully created, let's put the associated // routing rules in place. if externalTopo == nil { - // Save routing rules before vschema. If we save vschema first, and routing - // rules fails to save, we may generate duplicate table errors. - if mz.isPartial { - if err := wr.createDefaultShardRoutingRules(ctx, ms); err != nil { + if noRoutingRules { + log.Warningf("Found --no-routing-rules flag, not creating routing rules for workflow %s.%s", targetKeyspace, workflow) + } else { + // Save routing rules before vschema. If we save vschema first, and routing rules + // fails to save, we may generate duplicate table errors. + rules, err := topotools.GetRoutingRules(ctx, wr.ts) + if err != nil { return err } - } - - rules, err := topotools.GetRoutingRules(ctx, wr.ts) - if err != nil { - return err - } - for _, table := range tables { - toSource := []string{sourceKeyspace + "." + table} - rules[table] = toSource - rules[table+"@replica"] = toSource - rules[table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[targetKeyspace+"."+table+"@replica"] = toSource - rules[targetKeyspace+"."+table+"@rdonly"] = toSource - rules[targetKeyspace+"."+table] = toSource - rules[sourceKeyspace+"."+table+"@replica"] = toSource - rules[sourceKeyspace+"."+table+"@rdonly"] = toSource - } - if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { - return err - } - - if vschema != nil { - // We added to the vschema. - if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + for _, table := range tables { + toSource := []string{sourceKeyspace + "." + table} + rules[table] = toSource + rules[table+"@replica"] = toSource + rules[table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[targetKeyspace+"."+table+"@replica"] = toSource + rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource + rules[sourceKeyspace+"."+table+"@replica"] = toSource + rules[sourceKeyspace+"."+table+"@rdonly"] = toSource + } + if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { return err } + + if vschema != nil { + // We added to the vschema. + if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { + return err + } + } } } if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 0350d8f00fd..323769ae908 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -49,6 +49,35 @@ const mzCheckJournal = "/select val from _vt.resharding_journal where id=" var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() +// TestMoveTablesNoRoutingRules confirms that MoveTables does not create routing rules if --no-routing-rules is specified. +func TestMoveTablesNoRoutingRules(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + Workflow: "workflow", + SourceKeyspace: "sourceks", + TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: "t1", + SourceExpression: "select * from t1", + }}, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := newTestMaterializerEnv(t, ctx, ms, []string{"0"}, []string{"0"}) + defer env.close() + + env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{}) + env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{}) + env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) + env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) + env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) + + err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, true) + require.NoError(t, err) + rr, err := env.wr.ts.GetRoutingRules(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(rr.Rules)) +} + func TestMigrateTables(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ Workflow: "workflow", @@ -70,7 +99,7 @@ func TestMigrateTables(t *testing.T) { env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) - err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil) + err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false) require.NoError(t, err) vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell) require.NoError(t, err) @@ -113,11 +142,11 @@ func TestMissingTables(t *testing.T) { env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) - err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil) + err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false) require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt") - err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil) + err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false) require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt") - err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil) + err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false) require.NoError(t, err) } @@ -175,7 +204,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) { env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) - err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil) + err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil, false) require.NoError(t, err) require.EqualValues(t, tcase.want, targetTables(ctx, env)) }) @@ -211,7 +240,7 @@ func TestMoveTablesStopFlags(t *testing.T) { env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) // -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", - "", false, "", false, true, "", false, false, "", defaultOnDDL, nil) + "", false, "", false, true, "", false, false, "", defaultOnDDL, nil, false) require.NoError(t, err) env.tmc.verifyQueries(t) }) @@ -239,7 +268,7 @@ func TestMigrateVSchema(t *testing.T) { env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{}) env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) - err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil) + err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false) require.NoError(t, err) vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell) require.NoError(t, err) @@ -2942,7 +2971,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{}) err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", - "", false, "", false, true, "", false, false, "", onDDLAction, nil) + "", false, "", false, true, "", false, false, "", onDDLAction, nil, false) require.NoError(t, err) }) } diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index d6a8f542ed1..66c11f5c6ae 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -71,6 +71,9 @@ type VReplicationWorkflowParams struct { // Migrate specific ExternalCluster string + + // MoveTables only + NoRoutingRules bool } // VReplicationWorkflow stores various internal objects for a workflow @@ -433,7 +436,7 @@ func (vrw *VReplicationWorkflow) initMoveTables() error { return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace, vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables, vrw.params.AutoStart, vrw.params.StopAfterCopy, vrw.params.ExternalCluster, vrw.params.DropForeignKeys, - vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards) + vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards, vrw.params.NoRoutingRules) } func (vrw *VReplicationWorkflow) initReshard() error {