Skip to content

Commit

Permalink
VReplication: Initialize Sequence Tables Used By Tables Being Moved (#…
Browse files Browse the repository at this point in the history
…13656)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Aug 9, 2023
1 parent f502ed2 commit 755f771
Show file tree
Hide file tree
Showing 33 changed files with 1,564 additions and 543 deletions.
46 changes: 27 additions & 19 deletions go/cmd/vtctldclient/command/movetables.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ var (
TabletTypes []topodatapb.TabletType
TabletTypesInPreferenceOrder bool
SourceShards []string
ExternalClusterName string
AllTables bool
IncludeTables []string
ExcludeTables []string
Expand All @@ -237,13 +236,14 @@ var (
StopAfterCopy bool
}{}
moveTablesSwitchTrafficOptions = struct {
Cells []string
TabletTypes []topodatapb.TabletType
MaxReplicationLagAllowed time.Duration
EnableReverseReplication bool
Timeout time.Duration
DryRun bool
Direction workflow.TrafficSwitchDirection
Cells []string
TabletTypes []topodatapb.TabletType
MaxReplicationLagAllowed time.Duration
EnableReverseReplication bool
Timeout time.Duration
DryRun bool
InitializeTargetSequences bool
Direction workflow.TrafficSwitchDirection
}{}
)

Expand Down Expand Up @@ -271,13 +271,16 @@ func commandMoveTablesCreate(cmd *cobra.Command, args []string) error {
Workflow: moveTablesOptions.Workflow,
TargetKeyspace: moveTablesOptions.TargetKeyspace,
SourceKeyspace: moveTablesCreateOptions.SourceKeyspace,
SourceShards: moveTablesCreateOptions.SourceShards,
SourceTimeZone: moveTablesCreateOptions.SourceTimeZone,
Cells: moveTablesCreateOptions.Cells,
TabletTypes: moveTablesCreateOptions.TabletTypes,
TabletSelectionPreference: tsp,
AllTables: moveTablesCreateOptions.AllTables,
IncludeTables: moveTablesCreateOptions.IncludeTables,
ExcludeTables: moveTablesCreateOptions.ExcludeTables,
OnDdl: moveTablesCreateOptions.OnDDL,
DeferSecondaryKeys: moveTablesCreateOptions.DeferSecondaryKeys,
AutoStart: moveTablesCreateOptions.AutoStart,
StopAfterCopy: moveTablesCreateOptions.StopAfterCopy,
}
Expand Down Expand Up @@ -450,14 +453,15 @@ func commandMoveTablesSwitchTraffic(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: moveTablesOptions.TargetKeyspace,
Workflow: moveTablesOptions.Workflow,
TabletTypes: moveTablesSwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.Timeout),
DryRun: moveTablesSwitchTrafficOptions.DryRun,
EnableReverseReplication: moveTablesSwitchTrafficOptions.EnableReverseReplication,
Direction: int32(moveTablesSwitchTrafficOptions.Direction),
Keyspace: moveTablesOptions.TargetKeyspace,
Workflow: moveTablesOptions.Workflow,
TabletTypes: moveTablesSwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(moveTablesSwitchTrafficOptions.Timeout),
DryRun: moveTablesSwitchTrafficOptions.DryRun,
EnableReverseReplication: moveTablesSwitchTrafficOptions.EnableReverseReplication,
InitializeTargetSequences: moveTablesSwitchTrafficOptions.InitializeTargetSequences,
Direction: int32(moveTablesSwitchTrafficOptions.Direction),
}
resp, err := client.WorkflowSwitchTraffic(commandCtx, req)
if err != nil {
Expand Down Expand Up @@ -510,12 +514,14 @@ func init() {
MoveTablesCreate.MarkPersistentFlagRequired("source-keyspace")
MoveTablesCreate.Flags().StringSliceVarP(&moveTablesCreateOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to copy table data from")
MoveTablesCreate.Flags().StringSliceVar(&moveTablesCreateOptions.SourceShards, "source-shards", nil, "Source shards to copy data from when performing a partial MoveTables (experimental)")
MoveTablesCreate.Flags().StringVar(&moveTablesCreateOptions.SourceTimeZone, "source-time-zone", "", "Specifying this causes any DATETIME fields to be converted from the given time zone into UTC")
MoveTablesCreate.Flags().Var((*topoproto.TabletTypeListFlag)(&moveTablesCreateOptions.TabletTypes), "tablet-types", "Source tablet types to replicate table data from (e.g. PRIMARY,REPLICA,RDONLY)")
MoveTablesCreate.Flags().BoolVar(&moveTablesCreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag")
MoveTablesCreate.Flags().BoolVar(&moveTablesCreateOptions.AllTables, "all-tables", false, "Copy all tables from the source")
MoveTablesCreate.Flags().StringSliceVar(&moveTablesCreateOptions.IncludeTables, "tables", nil, "Source tables to copy")
MoveTablesCreate.Flags().StringSliceVar(&moveTablesCreateOptions.ExcludeTables, "exclude-tables", nil, "Source tables to exclude from copying")
MoveTablesCreate.Flags().StringVar(&moveTablesCreateOptions.OnDDL, "on-ddl", onDDLDefault, "What to do when DDL is encountered in the VReplication stream. Possible values are IGNORE, STOP, EXEC, and EXEC_IGNORE")
MoveTablesCreate.Flags().BoolVar(&moveTablesCreateOptions.DeferSecondaryKeys, "defer-secondary-keys", false, "Defer secondary index creation for a table until after it has been copied")
MoveTablesCreate.Flags().BoolVar(&moveTablesCreateOptions.AutoStart, "auto-start", true, "Start the MoveTables workflow after creating it")
MoveTablesCreate.Flags().BoolVar(&moveTablesCreateOptions.StopAfterCopy, "stop-after-copy", false, "Stop the MoveTables workflow after it's finished copying the existing rows and before it starts replicating changes")
MoveTables.AddCommand(MoveTablesCreate)
Expand All @@ -532,14 +538,16 @@ func init() {
MoveTablesSwitchTraffic.Flags().Var((*topoproto.TabletTypeListFlag)(&moveTablesSwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for")
MoveTablesSwitchTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.Timeout, "timeout", timeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
MoveTablesSwitchTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTablesSwitchTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
MoveTables.AddCommand(MoveTablesSwitchTraffic)

MoveTablesReverseTraffic.Flags().StringSliceVarP(&moveTablesSwitchTrafficOptions.Cells, "cells", "c", nil, "Cells and/or CellAliases to switch traffic in")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover")
MoveTablesReverseTraffic.Flags().Var((*topoproto.TabletTypeListFlag)(&moveTablesSwitchTrafficOptions.TabletTypes), "tablet-types", "Tablet types to switch traffic for")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.Timeout, "timeout", timeoutDefault, "Specifies the maximum time to wait, in seconds, for VReplication to catch up on primary tablets. The traffic switch will be cancelled on timeout.")
MoveTablesReverseTraffic.Flags().DurationVar(&moveTablesSwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", maxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original target keyspace to support switching traffic again")
MoveTablesReverseTraffic.Flags().BoolVar(&moveTablesSwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred")
MoveTables.AddCommand(MoveTablesReverseTraffic)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
switch currentWorkflowType {
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow:
args = append(args, "--defer-secondary-keys")
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
}
if cells != "" {
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{}');
insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4));
insert into customer(cid, name, typ, sport, blb) values(3, 'ringo','enterprise','','blob data');
-- We use a high cid value here to test the target sequence initialization.
insert into customer(cid, name, typ, sport, blb) values(999999, 'ringo','enterprise','','blob data');
insert into merchant(mname, category) values('Monoprice', 'eléctronics');
insert into merchant(mname, category) values('newegg', 'elec†ronics');
insert into product(pid, description) values(1, 'keyböard ⌨️');
Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ var testCases = []*testCase{
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`,
retryInsert: `insert into customer(cid, name, typ) values(1991234, 'Testy McTester', 'soho')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(92234, 'Testy McTester (redux)', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1992234, 'Testy McTester (redux)', 'enterprise')`,
testCLIErrors: true, // test for errors in the simplest workflow
testCLICreateWait: true, // test wait on create feature against simplest workflow
},
Expand All @@ -81,9 +81,9 @@ var testCases = []*testCase{
targetShards: "-40,40-a0,a0-",
tabletBaseID: 400,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(93234, 'Testy McTester Jr', 'enterprise'), (94234, 'Testy McTester II', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(1993234, 'Testy McTester Jr', 'enterprise'), (1993235, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(95234, 'Testy McTester III', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1994234, 'Testy McTester III', 'enterprise')`,
stop: true,
},
{
Expand All @@ -96,9 +96,9 @@ var testCases = []*testCase{
targetShards: "0",
tabletBaseID: 700,
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(96234, 'Testy McTester IV', 'enterprise')`,
retryInsert: `insert into customer(cid, name, typ) values(1995234, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(97234, 'Testy McTester V', 'enterprise'), (98234, 'Testy McTester VI', 'enterprise')`,
resumeInsert: `insert into customer(cid, name, typ) values(1996234, 'Testy McTester V', 'enterprise'), (1996235, 'Testy McTester VI', 'enterprise')`,
stop: true,
},
}
Expand Down
71 changes: 46 additions & 25 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,15 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)

// The original unsharded customer data included an insert with the
// vindex column (cid) of 999999, so the backing sequence table should
// now have a next_id of 1000000 after SwitchTraffic.
res := execVtgateQuery(t, vtgateConn, sourceKs, "select next_id from customer_seq where id = 0")
require.Equal(t, "1000000", res.Rows[0][0].ToString())

if withOpenTx && commit != nil {
commit(t)
}
Expand Down Expand Up @@ -1358,11 +1365,18 @@ func catchup(t *testing.T, vttablet *cluster.VttabletProcess, workflow, info str
func moveTablesAction(t *testing.T, action, cell, workflow, sourceKs, targetKs, tables string, extraFlags ...string) {
var err error
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, action}
if strings.EqualFold(action, strings.ToLower(workflowActionCreate)) {
switch strings.ToLower(action) {
case strings.ToLower(workflowActionCreate):
extraFlags = append(extraFlags, "--source-keyspace="+sourceKs, "--tables="+tables, "--cells="+cell, "--tablet-types=primary,replica,rdonly")
case strings.ToLower(workflowActionSwitchTraffic):
extraFlags = append(extraFlags, "--initialize-target-sequences")
}
args = append(args, extraFlags...)
err = vc.VtctldClient.ExecuteCommand(args...)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
if output != "" {
fmt.Printf("Output of vtctldclient MoveTables %s for %s workflow:\n++++++\n%s\n--------\n",
action, workflow, output)
}
if err != nil {
t.Fatalf("MoveTables %s command failed with %+v\n", action, err)
}
Expand Down Expand Up @@ -1396,8 +1410,8 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
}

func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() &&
workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
Expand All @@ -1415,6 +1429,34 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() &&
workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
command := "SwitchTraffic"
if reverse {
command = "ReverseTraffic"
}
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
// Use vtctldclient for MoveTables SwitchTraffic ~ 50% of the time.
if workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables.String() && time.Now().Second()%2 == 0 {
parts := strings.Split(ksWorkflow, ".")
require.Equal(t, 2, len(parts))
moveTablesAction(t, command, defaultCellName, parts[1], sourceKs, parts[0], "", "--timeout="+SwitchWritesTimeout, "--tablet-types=primary")
return
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary",
"--timeout="+SwitchWritesTimeout, "--initialize-target-sequences", command, ksWorkflow)
if output != "" {
fmt.Printf("Output of switching writes with vtctlclient for %s:\n++++++\n%s\n--------\n", ksWorkflow, output)
}
// printSwitchWritesExtraDebug is useful when debugging failures in Switch writes due to corner cases/races
_ = printSwitchWritesExtraDebug
require.NoError(t, err, fmt.Sprintf("Switch writes Error: %s: %s", err, output))
}

func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunResults []string) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
Expand Down Expand Up @@ -1457,27 +1499,6 @@ func printSwitchWritesExtraDebug(t *testing.T, ksWorkflow, msg string) {
}
}

func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_MoveTables)] &&
workflowType != binlogdatapb.VReplicationWorkflowType_name[int32(binlogdatapb.VReplicationWorkflowType_Reshard)] {
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
command := "SwitchTraffic"
if reverse {
command = "ReverseTraffic"
}
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary",
"--timeout="+SwitchWritesTimeout, command, ksWorkflow)
if output != "" {
fmt.Printf("Output of switching writes for %s:\n++++++\n%s\n--------\n", ksWorkflow, output)
}
// printSwitchWritesExtraDebug is useful when debugging failures in Switch writes due to corner cases/races
_ = printSwitchWritesExtraDebug
require.NoError(t, err, fmt.Sprintf("Switch writes Error: %s: %s", err, output))
}

// generateInnoDBRowHistory generates at least maxSourceTrxHistory rollback segment entries.
// This allows us to confirm two behaviors:
// 1. MoveTables blocks on starting its first copy phase until we rollback
Expand Down
Loading

0 comments on commit 755f771

Please sign in to comment.