diff --git a/doc/design-docs/AtomicTransactionsWithDisruptions.md b/doc/design-docs/AtomicTransactionsWithDisruptions.md index 4ca60778436..7b3e050ae0d 100644 --- a/doc/design-docs/AtomicTransactionsWithDisruptions.md +++ b/doc/design-docs/AtomicTransactionsWithDisruptions.md @@ -48,4 +48,14 @@ If it finds no prepared transaction of the table, it moves forward with the cut- In the Prepare code, we check the query rules before adding the transaction to the prepared list and re-check the rules before storing the transaction logs in the transaction redo table. Any transaction that went past the first check will fail the second check if the cutover proceeds. -The check on both sides prevents either the cutover from proceeding or the transaction from being prepared. \ No newline at end of file +The check on both sides prevents either the cutover from proceeding or the transaction from being prepared. + +## MoveTables + +The only step of a `MoveTables` workflow that needs to synchronize with atomic transactions is `SwitchTraffic` for writes. As part of this step, we want to disallow writes to only the tables involved. We use `DeniedTables` in `ShardInfo` to accomplish this. After we update the topo server with the new `DeniedTables`, we make all the vttablets refresh their topo to ensure that they've registered the change. + +On vttablet, the `DeniedTables` are used to add query rules very similar to the ones in Online DDL. The only difference is that in Online DDL, we buffer the queries, but for `SwitchTraffic` we fail them altogether. Addition of these query rules, prevents any new atomic transactions from being prepared. + +Next, we try locking the tables to ensure no existing write is pending. This step blocks until all open prepared transactions have succeeded. + +After this step, `SwitchTraffic` can proceed without any issues, since we are guaranteed to reject any new atomic transactions until the `DeniedTables` has been reset, and having acquired the table lock, we know no write is currently in progress. diff --git a/go/test/endtoend/cluster/move_tables.go b/go/test/endtoend/cluster/move_tables.go new file mode 100644 index 00000000000..c80f22cf5f5 --- /dev/null +++ b/go/test/endtoend/cluster/move_tables.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "testing" + "time" +) + +// MoveTablesWorkflow is used to store the information needed to run +// MoveTables commands. +type MoveTablesWorkflow struct { + t *testing.T + clusterInstance *LocalProcessCluster + workflowName string + targetKs string + srcKs string + tables string +} + +// NewMoveTables creates a new MoveTablesWorkflow. +func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow { + return &MoveTablesWorkflow{ + t: t, + clusterInstance: clusterInstance, + workflowName: workflowName, + tables: tables, + targetKs: targetKs, + srcKs: srcKs, + } +} + +func (mtw *MoveTablesWorkflow) Create() (string, error) { + args := []string{"Create", "--source-keyspace=" + mtw.srcKs} + if mtw.tables != "" { + args = append(args, "--tables="+mtw.tables) + } else { + args = append(args, "--all-tables") + } + return mtw.exec(args...) +} + +func (mtw *MoveTablesWorkflow) exec(args ...string) (string, error) { + args2 := []string{"MoveTables", "--workflow=" + mtw.workflowName, "--target-keyspace=" + mtw.targetKs} + args2 = append(args2, args...) + return mtw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...) +} + +func (mtw *MoveTablesWorkflow) SwitchReadsAndWrites() (string, error) { + return mtw.exec("SwitchTraffic") +} + +func (mtw *MoveTablesWorkflow) ReverseReadsAndWrites() (string, error) { + return mtw.exec("ReverseTraffic") +} + +func (mtw *MoveTablesWorkflow) Cancel() (string, error) { + return mtw.exec("Cancel") +} + +func (mtw *MoveTablesWorkflow) Complete() (string, error) { + return mtw.exec("Complete") +} + +func (mtw *MoveTablesWorkflow) Show() (string, error) { + return mtw.exec("Show") +} + +func (mtw *MoveTablesWorkflow) WaitForVreplCatchup(timeToWait time.Duration) { + for _, ks := range mtw.clusterInstance.Keyspaces { + if ks.Name != mtw.targetKs { + continue + } + for _, shard := range ks.Shards { + vttablet := shard.PrimaryTablet().VttabletProcess + vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait) + } + } +} diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index d4f0e3f1963..2c6f467bb08 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -611,6 +611,7 @@ func (vttablet *VttabletProcess) getDBSystemValues(placeholder string, value str // WaitForVReplicationToCatchup waits for "workflow" to finish copying func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, workflow, database string, sidecarDBName string, duration time.Duration) { + t.Helper() if sidecarDBName == "" { sidecarDBName = sidecar.DefaultName } diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 6c26320b472..639b3ce8eb4 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -257,7 +257,8 @@ func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards // CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool { - query, err := sqlparser.ParseAndBind("show vitess_migrations like %a", + ksName := shards[0].PrimaryTablet().VttabletProcess.Keyspace + query, err := sqlparser.ParseAndBind(fmt.Sprintf("show vitess_migrations from %s like %%a", ksName), sqltypes.StringBindVariable(uuid), ) require.NoError(t, err) diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 932fcae1217..3fc1858fae4 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -29,6 +29,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -102,12 +103,12 @@ func TestTwoPCFuzzTest(t *testing.T) { timeForTesting: 5 * time.Second, }, { - name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL disruptions", - threads: 15, - updateSets: 15, + name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions", + threads: 4, + updateSets: 4, timeForTesting: 5 * time.Second, - clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer}, - disruptionProbability: []int{5, 5, 5, 5, 5}, + clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer}, + disruptionProbability: []int{5, 5, 5, 5, 5, 5}, }, } @@ -443,7 +444,39 @@ func onlineDDLFuzzer(t *testing.T) { return } fmt.Println("Running online DDL with uuid: ", output) - WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) +} + +var moveTablesCount int + +// moveTablesFuzzer runs a MoveTables workflow. +func moveTablesFuzzer(t *testing.T) { + workflow := "TestTwoPCFuzzTest" + srcKeyspace := keyspaceName + targetKeyspace := unshardedKeyspaceName + if moveTablesCount%2 == 1 { + srcKeyspace = unshardedKeyspaceName + targetKeyspace = keyspaceName + // We apply the vschema again because previous move tables would have removed the entry for `twopc_fuzzer_update`. + err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema) + require.NoError(t, err) + } + log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace) + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update") + // Initiate MoveTables for twopc_fuzzer_update. + output, err := mtw.Create() + if err != nil { + log.Errorf("error creating MoveTables - %v, output - %v", err, output) + return + } + moveTablesCount++ + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(1 * time.Minute) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + assert.NoError(t, err, output) + output, err = mtw.Complete() + assert.NoError(t, err, output) } func mysqlRestarts(t *testing.T) { diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index ef8e454be15..05525171d2d 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -32,13 +32,13 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - vtgateGrpcAddress string - keyspaceName = "ks" - cell = "zone1" - hostname = "localhost" - sidecarDBName = "vt_ks" + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + unshardedKeyspaceName = "uks" + cell = "zone1" + hostname = "localhost" //go:embed schema.sql SchemaSQL string @@ -79,18 +79,28 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: SchemaSQL, VSchema: VSchema, - SidecarDBName: sidecarDBName, DurabilityPolicy: "semi_sync", } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } + // Start an unsharded keyspace + unshardedKeyspace := &cluster.Keyspace{ + Name: unshardedKeyspaceName, + SchemaSQL: "", + VSchema: "{}", + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil { + return 1 + } + // Start Vtgate if err := clusterInstance.StartVtgate(); err != nil { return 1 } - vtParams = clusterInstance.GetVTParams(keyspaceName) + vtParams = clusterInstance.GetVTParams("") vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) return m.Run() diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 4dae0156b9d..5baee342bec 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -48,6 +48,7 @@ func TestDisruptions(t *testing.T) { disruptionName string commitDelayTime string disruption func(t *testing.T) error + resetFunc func(t *testing.T) }{ { disruptionName: "No Disruption", @@ -76,6 +77,17 @@ func TestDisruptions(t *testing.T) { commitDelayTime: "20", disruption: onlineDDL, }, + { + disruptionName: "MoveTables - Complete", + commitDelayTime: "10", + disruption: moveTablesComplete, + resetFunc: moveTablesReset, + }, + { + disruptionName: "MoveTables - Cancel", + commitDelayTime: "10", + disruption: moveTablesCancel, + }, { disruptionName: "EmergencyReparentShard", commitDelayTime: "5", @@ -136,6 +148,10 @@ func TestDisruptions(t *testing.T) { waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) writeCancel() writerWg.Wait() + + if tt.resetFunc != nil { + tt.resetFunc(t) + } }) } } @@ -239,6 +255,61 @@ func mysqlRestartShard3(t *testing.T) error { return syscallutil.Kill(pid, syscall.SIGKILL) } +// moveTablesCancel runs a move tables command that we cancel in the end. +func moveTablesCancel(t *testing.T) error { + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + output, err := mtw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.ReverseReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Cancel() + require.NoError(t, err, output) + return nil +} + +// moveTablesComplete runs a move tables command that we complete in the end. +func moveTablesComplete(t *testing.T) error { + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + output, err := mtw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Complete() + require.NoError(t, err, output) + return nil +} + +// moveTablesReset moves the table back from the unsharded keyspace to sharded +func moveTablesReset(t *testing.T) { + // We apply the vschema again because previous move tables would have removed the entry for `twopc_t1`. + err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema) + require.NoError(t, err) + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + output, err := mtw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Complete() + require.NoError(t, err, output) +} + var orderedDDL = []string{ "alter table twopc_t1 add column extra_col1 varchar(20)", "alter table twopc_t1 add column extra_col2 varchar(20)", @@ -256,18 +327,18 @@ func onlineDDL(t *testing.T) error { require.NoError(t, err) count++ fmt.Println("uuid: ", output) - status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) require.Equal(t, schema.OnlineDDLStatusComplete, status) return nil } -func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { +func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { shardNames := map[string]bool{} for _, shard := range shards { shardNames[shard.Name] = true } - query := fmt.Sprintf("show vitess_migrations like '%s'", uuid) + query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid) statusesMap := map[string]bool{} for _, status := range expectStatuses { @@ -280,6 +351,11 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c lastKnownStatus := "" for { + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } countMatchedShards := 0 conn, err := mysql.Connect(ctx, vtParams) if err != nil { @@ -304,10 +380,5 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - select { - case <-ctx.Done(): - return schema.OnlineDDLStatus(lastKnownStatus) - case <-ticker.C: - } } }