From 07dee3c183200da9de446d0a2908d84a58035646 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 29 Aug 2024 09:45:05 +0530 Subject: [PATCH 1/6] feat: run move tables in stress test Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/move_tables.go | 94 +++++++++++++++++++ .../transaction/twopc/stress/fuzzer_test.go | 2 +- .../transaction/twopc/stress/main_test.go | 28 ++++-- .../transaction/twopc/stress/stress_test.go | 32 ++++++- 4 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 go/test/endtoend/cluster/move_tables.go diff --git a/go/test/endtoend/cluster/move_tables.go b/go/test/endtoend/cluster/move_tables.go new file mode 100644 index 00000000000..96bc0108ce2 --- /dev/null +++ b/go/test/endtoend/cluster/move_tables.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "testing" + "time" +) + +// MoveTablesWorkflow is used to store the information needed to run +// MoveTables commands. +type MoveTablesWorkflow struct { + t *testing.T + clusterInstance *LocalProcessCluster + workflowName string + targetKs string + srcKs string + tables string +} + +// NewMoveTables creates a new MoveTablesWorkflow. +func NewMoveTables(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, srcKs, tables string) *MoveTablesWorkflow { + return &MoveTablesWorkflow{ + t: t, + clusterInstance: clusterInstance, + workflowName: workflowName, + tables: tables, + targetKs: targetKs, + srcKs: srcKs, + } +} + +func (mtw *MoveTablesWorkflow) Create() (string, error) { + args := []string{"Create", "--source-keyspace=" + mtw.srcKs} + if mtw.tables != "" { + args = append(args, "--tables="+mtw.tables) + } else { + args = append(args, "--all-tables") + } + return mtw.exec(args...) +} + +func (mtw *MoveTablesWorkflow) exec(args ...string) (string, error) { + args2 := []string{"MoveTables", "--workflow=" + mtw.workflowName, "--target-keyspace=" + mtw.targetKs} + args2 = append(args2, args...) + return mtw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...) +} + +func (mtw *MoveTablesWorkflow) SwitchReadsAndWrites() (string, error) { + return mtw.exec("SwitchTraffic") +} + +func (mtw *MoveTablesWorkflow) ReverseReadsAndWrites() (string, error) { + return mtw.exec("ReverseTraffic") +} + +func (mtw *MoveTablesWorkflow) Cancel() (string, error) { + return mtw.exec("Cancel") +} + +func (mtw *MoveTablesWorkflow) Complete() (string, error) { + return mtw.exec("Complete") +} + +func (mtw *MoveTablesWorkflow) Show() (string, error) { + return mtw.exec("Show") +} + +func (mtw *MoveTablesWorkflow) WaitForVreplCatchup() { + for _, ks := range mtw.clusterInstance.Keyspaces { + if ks.Name != mtw.targetKs { + continue + } + for _, shard := range ks.Shards { + vttablet := shard.PrimaryTablet().VttabletProcess + vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second) + } + } +} diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 932fcae1217..3ba29e50713 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -443,7 +443,7 @@ func onlineDDLFuzzer(t *testing.T) { return } fmt.Println("Running online DDL with uuid: ", output) - WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) } func mysqlRestarts(t *testing.T) { diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index ef8e454be15..05525171d2d 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -32,13 +32,13 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - vtgateGrpcAddress string - keyspaceName = "ks" - cell = "zone1" - hostname = "localhost" - sidecarDBName = "vt_ks" + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + unshardedKeyspaceName = "uks" + cell = "zone1" + hostname = "localhost" //go:embed schema.sql SchemaSQL string @@ -79,18 +79,28 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: SchemaSQL, VSchema: VSchema, - SidecarDBName: sidecarDBName, DurabilityPolicy: "semi_sync", } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } + // Start an unsharded keyspace + unshardedKeyspace := &cluster.Keyspace{ + Name: unshardedKeyspaceName, + SchemaSQL: "", + VSchema: "{}", + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil { + return 1 + } + // Start Vtgate if err := clusterInstance.StartVtgate(); err != nil { return 1 } - vtParams = clusterInstance.GetVTParams(keyspaceName) + vtParams = clusterInstance.GetVTParams("") vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) return m.Run() diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 4dae0156b9d..6db0687a5ab 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -76,6 +76,11 @@ func TestDisruptions(t *testing.T) { commitDelayTime: "20", disruption: onlineDDL, }, + { + disruptionName: "MoveTables", + commitDelayTime: "20", + disruption: moveTables, + }, { disruptionName: "EmergencyReparentShard", commitDelayTime: "5", @@ -239,6 +244,29 @@ func mysqlRestartShard3(t *testing.T) error { return syscallutil.Kill(pid, syscall.SIGKILL) } +// moveTables runs a move tables command. +func moveTables(t *testing.T) error { + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + _, err := mtw.Create() + require.NoError(t, err) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup() + // SwitchTraffic + _, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err) + // Wait for a couple of seconds and then switch the traffic back + time.Sleep(2 * time.Second) + _, err = mtw.ReverseReadsAndWrites() + require.NoError(t, err) + // Wait another couple of seconds and then cancel the workflow + time.Sleep(2 * time.Second) + _, err = mtw.Cancel() + require.NoError(t, err) + return nil +} + var orderedDDL = []string{ "alter table twopc_t1 add column extra_col1 varchar(20)", "alter table twopc_t1 add column extra_col2 varchar(20)", @@ -256,13 +284,13 @@ func onlineDDL(t *testing.T) error { require.NoError(t, err) count++ fmt.Println("uuid: ", output) - status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) require.Equal(t, schema.OnlineDDLStatusComplete, status) return nil } -func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { +func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { shardNames := map[string]bool{} for _, shard := range shards { shardNames[shard.Name] = true From 627191e32a67d2d23654ed664a0d4aaa1109b4f9 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 29 Aug 2024 12:40:38 +0530 Subject: [PATCH 2/6] docs: add docs for MoveTables Signed-off-by: Manan Gupta --- doc/design-docs/AtomicTransactionsWithDisruptions.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/doc/design-docs/AtomicTransactionsWithDisruptions.md b/doc/design-docs/AtomicTransactionsWithDisruptions.md index 4ca60778436..7b3e050ae0d 100644 --- a/doc/design-docs/AtomicTransactionsWithDisruptions.md +++ b/doc/design-docs/AtomicTransactionsWithDisruptions.md @@ -48,4 +48,14 @@ If it finds no prepared transaction of the table, it moves forward with the cut- In the Prepare code, we check the query rules before adding the transaction to the prepared list and re-check the rules before storing the transaction logs in the transaction redo table. Any transaction that went past the first check will fail the second check if the cutover proceeds. -The check on both sides prevents either the cutover from proceeding or the transaction from being prepared. \ No newline at end of file +The check on both sides prevents either the cutover from proceeding or the transaction from being prepared. + +## MoveTables + +The only step of a `MoveTables` workflow that needs to synchronize with atomic transactions is `SwitchTraffic` for writes. As part of this step, we want to disallow writes to only the tables involved. We use `DeniedTables` in `ShardInfo` to accomplish this. After we update the topo server with the new `DeniedTables`, we make all the vttablets refresh their topo to ensure that they've registered the change. + +On vttablet, the `DeniedTables` are used to add query rules very similar to the ones in Online DDL. The only difference is that in Online DDL, we buffer the queries, but for `SwitchTraffic` we fail them altogether. Addition of these query rules, prevents any new atomic transactions from being prepared. + +Next, we try locking the tables to ensure no existing write is pending. This step blocks until all open prepared transactions have succeeded. + +After this step, `SwitchTraffic` can proceed without any issues, since we are guaranteed to reject any new atomic transactions until the `DeniedTables` has been reset, and having acquired the table lock, we know no write is currently in progress. From 176b57657ac267d19a79f5fd688b0aa452bebcd9 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 30 Aug 2024 13:46:47 +0530 Subject: [PATCH 3/6] test: add move tables to the fuzzer Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/move_tables.go | 4 +- go/test/endtoend/cluster/vttablet_process.go | 1 + .../transaction/twopc/stress/fuzzer_test.go | 39 ++++++++- .../transaction/twopc/stress/stress_test.go | 79 ++++++++++++++----- 4 files changed, 100 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/cluster/move_tables.go b/go/test/endtoend/cluster/move_tables.go index 96bc0108ce2..c80f22cf5f5 100644 --- a/go/test/endtoend/cluster/move_tables.go +++ b/go/test/endtoend/cluster/move_tables.go @@ -81,14 +81,14 @@ func (mtw *MoveTablesWorkflow) Show() (string, error) { return mtw.exec("Show") } -func (mtw *MoveTablesWorkflow) WaitForVreplCatchup() { +func (mtw *MoveTablesWorkflow) WaitForVreplCatchup(timeToWait time.Duration) { for _, ks := range mtw.clusterInstance.Keyspaces { if ks.Name != mtw.targetKs { continue } for _, shard := range ks.Shards { vttablet := shard.PrimaryTablet().VttabletProcess - vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second) + vttablet.WaitForVReplicationToCatchup(mtw.t, mtw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait) } } } diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index d4f0e3f1963..2c6f467bb08 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -611,6 +611,7 @@ func (vttablet *VttabletProcess) getDBSystemValues(placeholder string, value str // WaitForVReplicationToCatchup waits for "workflow" to finish copying func (vttablet *VttabletProcess) WaitForVReplicationToCatchup(t testing.TB, workflow, database string, sidecarDBName string, duration time.Duration) { + t.Helper() if sidecarDBName == "" { sidecarDBName = sidecar.DefaultName } diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 3ba29e50713..d4fe6d8d091 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -29,6 +29,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -102,12 +103,12 @@ func TestTwoPCFuzzTest(t *testing.T) { timeForTesting: 5 * time.Second, }, { - name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL disruptions", + name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions", threads: 15, updateSets: 15, timeForTesting: 5 * time.Second, - clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer}, - disruptionProbability: []int{5, 5, 5, 5, 5}, + clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer}, + disruptionProbability: []int{5, 5, 5, 5, 5, 5}, }, } @@ -446,6 +447,38 @@ func onlineDDLFuzzer(t *testing.T) { waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) } +var moveTablesCount int + +// moveTablesFuzzer runs a MoveTables workflow. +func moveTablesFuzzer(t *testing.T) { + workflow := "TestTwoPCFuzzTest" + srcKeyspace := keyspaceName + targetKeyspace := unshardedKeyspaceName + if moveTablesCount%2 == 1 { + srcKeyspace = unshardedKeyspaceName + targetKeyspace = keyspaceName + // We apply the vschema again because previous move tables would have removed the entry for `twopc_fuzzer_update`. + err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema) + require.NoError(t, err) + } + log.Errorf("MoveTables from - %v to %v", srcKeyspace, targetKeyspace) + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, targetKeyspace, srcKeyspace, "twopc_fuzzer_update") + // Initiate MoveTables for twopc_fuzzer_update. + output, err := mtw.Create() + if err != nil { + log.Errorf("error creating MoveTables - %v, output - %v", err, output) + return + } + moveTablesCount++ + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(1 * time.Minute) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + assert.NoError(t, err, output) + output, err = mtw.Complete() + assert.NoError(t, err, output) +} + func mysqlRestarts(t *testing.T) { shards := clusterInstance.Keyspaces[0].Shards shard := shards[rand.Intn(len(shards))] diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 6db0687a5ab..62ae98bd9dc 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -48,6 +48,7 @@ func TestDisruptions(t *testing.T) { disruptionName string commitDelayTime string disruption func(t *testing.T) error + resetFunc func(t *testing.T) }{ { disruptionName: "No Disruption", @@ -77,9 +78,15 @@ func TestDisruptions(t *testing.T) { disruption: onlineDDL, }, { - disruptionName: "MoveTables", - commitDelayTime: "20", - disruption: moveTables, + disruptionName: "MoveTables - Complete", + commitDelayTime: "10", + disruption: moveTablesComplete, + resetFunc: moveTablesReset, + }, + { + disruptionName: "MoveTables - Cancel", + commitDelayTime: "10", + disruption: moveTablesCancel, }, { disruptionName: "EmergencyReparentShard", @@ -141,6 +148,10 @@ func TestDisruptions(t *testing.T) { waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) writeCancel() writerWg.Wait() + + if tt.resetFunc != nil { + tt.resetFunc(t) + } }) } } @@ -244,29 +255,61 @@ func mysqlRestartShard3(t *testing.T) error { return syscallutil.Kill(pid, syscall.SIGKILL) } -// moveTables runs a move tables command. -func moveTables(t *testing.T) error { +// moveTablesCancel runs a move tables command that we cancel in the end. +func moveTablesCancel(t *testing.T) error { workflow := "TestDisruptions" mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") // Initiate MoveTables for twopc_t1. - _, err := mtw.Create() - require.NoError(t, err) + output, err := mtw.Create() + require.NoError(t, err, output) // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. - mtw.WaitForVreplCatchup() + mtw.WaitForVreplCatchup(10 * time.Second) // SwitchTraffic - _, err = mtw.SwitchReadsAndWrites() - require.NoError(t, err) - // Wait for a couple of seconds and then switch the traffic back - time.Sleep(2 * time.Second) - _, err = mtw.ReverseReadsAndWrites() - require.NoError(t, err) - // Wait another couple of seconds and then cancel the workflow - time.Sleep(2 * time.Second) - _, err = mtw.Cancel() - require.NoError(t, err) + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.ReverseReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Cancel() + require.NoError(t, err, output) + return nil +} + +// moveTablesComplete runs a move tables command that we complete in the end. +func moveTablesComplete(t *testing.T) error { + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + output, err := mtw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Complete() + require.NoError(t, err, output) return nil } +// moveTablesReset moves the table back from the unsharded keyspace to sharded +func moveTablesReset(t *testing.T) { + // We apply the vschema again because previous move tables would have removed the entry for `twopc_t1`. + err := clusterInstance.VtctldClientProcess.ApplyVSchema(keyspaceName, VSchema) + require.NoError(t, err) + workflow := "TestDisruptions" + mtw := cluster.NewMoveTables(t, clusterInstance, workflow, keyspaceName, unshardedKeyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + output, err := mtw.Create() + require.NoError(t, err, output) + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + mtw.WaitForVreplCatchup(10 * time.Second) + // SwitchTraffic + output, err = mtw.SwitchReadsAndWrites() + require.NoError(t, err, output) + output, err = mtw.Complete() + require.NoError(t, err, output) +} + var orderedDDL = []string{ "alter table twopc_t1 add column extra_col1 varchar(20)", "alter table twopc_t1 add column extra_col2 varchar(20)", From dc426877636991a745929a00c6275a59144fc1d2 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 30 Aug 2024 14:00:16 +0530 Subject: [PATCH 4/6] test: reduce number of writes to allow move tables to catch up Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/stress/fuzzer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index d4fe6d8d091..3fc1858fae4 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -104,8 +104,8 @@ func TestTwoPCFuzzTest(t *testing.T) { }, { name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions", - threads: 15, - updateSets: 15, + threads: 4, + updateSets: 4, timeForTesting: 5 * time.Second, clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer}, disruptionProbability: []int{5, 5, 5, 5, 5, 5}, From 225567926622a98f24563f4e5068a2ca90bfb0b3 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 2 Sep 2024 12:45:59 +0530 Subject: [PATCH 5/6] test: check for context timeout in online ddl fuzzer before Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/stress/stress_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 62ae98bd9dc..56b6832d1c7 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -351,6 +351,11 @@ func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c lastKnownStatus := "" for { + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } countMatchedShards := 0 conn, err := mysql.Connect(ctx, vtParams) if err != nil { @@ -375,10 +380,5 @@ func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - select { - case <-ctx.Done(): - return schema.OnlineDDLStatus(lastKnownStatus) - case <-ticker.C: - } } } From 5880d765f5c3a569e5e22d22b625615753de0fc0 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 2 Sep 2024 16:22:56 +0530 Subject: [PATCH 6/6] test: now that we are running multiple keyspaces, specify it in the onlineddl queries to see the status Signed-off-by: Manan Gupta --- go/test/endtoend/onlineddl/vtgate_util.go | 3 ++- go/test/endtoend/transaction/twopc/stress/stress_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 6c26320b472..639b3ce8eb4 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -257,7 +257,8 @@ func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards // CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool { - query, err := sqlparser.ParseAndBind("show vitess_migrations like %a", + ksName := shards[0].PrimaryTablet().VttabletProcess.Keyspace + query, err := sqlparser.ParseAndBind(fmt.Sprintf("show vitess_migrations from %s like %%a", ksName), sqltypes.StringBindVariable(uuid), ) require.NoError(t, err) diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 56b6832d1c7..5baee342bec 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -338,7 +338,7 @@ func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, shard := range shards { shardNames[shard.Name] = true } - query := fmt.Sprintf("show vitess_migrations like '%s'", uuid) + query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid) statusesMap := map[string]bool{} for _, status := range expectStatuses {