Skip to content

Commit

Permalink
VReplication: Remove Deprecated V1 Client Commands (#11705)
Browse files Browse the repository at this point in the history
* Remove SwitchReads/SwitchWrites client commands

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Remove Reshard/MoveTables v1 client commands

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Misc text clarifications

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Fix TestVreplicationCopyThrottling

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Remove DropSources

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add removal info to release notes

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Reduce flakiness of copy_state optimization check

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Minor tweaks after self review

Signed-off-by: Matt Lord <mattalord@gmail.com>

* De-flake sharded pitr test

Signed-off-by: Matt Lord <mattalord@gmail.com>

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Nov 30, 2022
1 parent 0f30178 commit dd49816
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 449 deletions.
4 changes: 3 additions & 1 deletion doc/releasenotes/16_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ The `RestoreFromBackup --restore_to_pos` ends with:
An issue in versions `<= v14.0.3` and `<= v15.0.0` that generated corrupted results for non-full-group-by queries with a JOIN
is now fixed. The full issue can be found [here](https://github.com/vitessio/vitess/issues/11625), and its fix [here](https://github.com/vitessio/vitess/pull/11633).

### Deprecations
### Deprecations and Removals

The V3 planner is deprecated as of the V16 release, and will be removed in the V17 release of Vitess.

The [VReplication v1 commands](https://vitess.io/docs/15.0/reference/vreplication/v1/) — which were deprecated in Vitess 11.0 — have been removed. You will need to use the [VReplication v2 commands](https://vitess.io/docs/16.0/reference/vreplication/v2/) instead.

### MySQL Compatibility

#### Transaction Isolation Level
Expand Down
5 changes: 2 additions & 3 deletions examples/region_sharding/205_switch_writes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ source ./env.sh
vtctlclient Reshard -- --tablet_types=primary SwitchTraffic main.main2regions

# to go back to unsharded
# call SwitchReads and SwitchWrites with workflow main.main2regions_reverse
# delete vreplication rows from sharded tablets
# drop all the tables
# call Reshard ReverseTraffic with all tablet types
# call Reshard Cancel
# change vschema back to unsharded
# drop lookup table

48 changes: 38 additions & 10 deletions go/test/endtoend/recovery/pitr/shardedpitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -89,6 +90,9 @@ var (
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--serving_state_grace_period", "1s"}

defaultTimeout = 30 * time.Second
defaultTick = 1 * time.Second
)

// Test pitr (Point in time recovery).
Expand Down Expand Up @@ -296,23 +300,23 @@ func performResharding(t *testing.T) {
err = clusterInstance.VtctlProcess.ExecuteCommand("InitShardPrimary", "--", "--force", "ks/80-", shard1Primary.Alias)
require.NoError(t, err)

// we need to create the schema, and the worker will do data copying
for _, keyspaceShard := range []string{"ks/-80", "ks/80-"} {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("CopySchemaShard", "ks/0", keyspaceShard)
require.NoError(t, err)
}

err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "--", "--v1", "ks.reshardWorkflow", "0", "--", "-80,80-")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "--", "--source_shards=0", "--target_shards=-80,80-", "Create", "ks.reshardWorkflow")
require.NoError(t, err)

err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchReads", "--", "--tablet_types=rdonly", "ks.reshardWorkflow")
waitTimeout := 30 * time.Second
shard0Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, waitTimeout)
shard1Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, waitTimeout)

waitForNoWorkflowLag(t, clusterInstance, "ks.reshardWorkflow")

err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "--", "--tablet_types=rdonly", "SwitchTraffic", "ks.reshardWorkflow")
require.NoError(t, err)

err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchReads", "--", "--tablet_types=replica", "ks.reshardWorkflow")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "--", "--tablet_types=replica", "SwitchTraffic", "ks.reshardWorkflow")
require.NoError(t, err)

// then serve primary from the split shards
err = clusterInstance.VtctlclientProcess.ExecuteCommand("SwitchWrites", "ks.reshardWorkflow")
err = clusterInstance.VtctlclientProcess.ExecuteCommand("Reshard", "--", "--tablet_types=primary", "SwitchTraffic", "ks.reshardWorkflow")
require.NoError(t, err)

// remove the original tablets in the original shard
Expand Down Expand Up @@ -552,3 +556,27 @@ func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer *

tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 20*time.Second)
}

// waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag
// value to be 0.
func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ksWorkflow string) {
lag := int64(0)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := vc.VtctlclientProcess.ExecuteCommandWithOutput("Workflow", "--", ksWorkflow, "show")
require.NoError(t, err)
lag, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")
require.NoError(t, err)
if lag == 0 {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d",
ksWorkflow, defaultTimeout, lag))
default:
time.Sleep(defaultTick)
}
}
}
47 changes: 28 additions & 19 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,6 @@ func printShardPositions(vc *VitessCluster, ksShards []string) {
}
}

func clearRoutingRules(t *testing.T, vc *VitessCluster) error {
if _, err := vc.VtctlClient.ExecuteCommandWithOutput("ApplyRoutingRules", "--", "--rules={}"); err != nil {
return err
}
return nil
}

func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error {
var output string
var err error
Expand Down Expand Up @@ -567,16 +560,32 @@ func verifyCopyStateIsOptimized(t *testing.T, tablet *cluster.VttabletProcess) {
_, err := tablet.QueryTablet("analyze table _vt.copy_state", "", false)
require.NoError(t, err)

// Verify that there's no delete marked rows and we reset the auto-inc value
res, err := tablet.QueryTablet("select data_free, auto_increment from information_schema.tables where table_schema='_vt' and table_name='copy_state'",
"", false)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, 1, len(res.Rows))
dataFree, err := res.Rows[0][0].ToInt64()
require.NoError(t, err)
require.Equal(t, int64(0), dataFree, "data_free should be 0")
autoIncrement, err := res.Rows[0][1].ToInt64()
require.NoError(t, err)
require.Equal(t, int64(1), autoIncrement, "auto_increment should be 1")
// Verify that there's no delete marked rows and we reset the auto-inc value.
// MySQL doesn't always immediately update information_schema so we wait.
tmr := time.NewTimer(defaultTimeout)
defer tmr.Stop()
query := "select data_free, auto_increment from information_schema.tables where table_schema='_vt' and table_name='copy_state'"
var dataFree, autoIncrement int64
for {
res, err := tablet.QueryTablet(query, "", false)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, 1, len(res.Rows))
dataFree, err = res.Rows[0][0].ToInt64()
require.NoError(t, err)
autoIncrement, err = res.Rows[0][1].ToInt64()
require.NoError(t, err)
if dataFree == 0 && autoIncrement == 1 {
return
}

select {
case <-tmr.C:
require.FailNowf(t, "timed out waiting for copy_state table to be optimized",
"data_free should be 0 and auto_increment should be 1, last seen values were %d and %d respectively",
dataFree, autoIncrement)
default:
time.Sleep(defaultTick)
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ create table customer(cid int, name varbinary(128), meta json default null, typ
}
}

moveTables(t, defaultCell.Name, "stress_workflow", sourceKs, targetKs, "largebin")
moveTablesAction(t, "Create", defaultCell.Name, "stress_workflow", sourceKs, targetKs, "largebin")

keyspaceTgt := defaultCell.Keyspaces[targetKs]
for _, shard := range keyspaceTgt.Shards {
Expand Down
79 changes: 37 additions & 42 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ func tstWorkflowReverseWrites(t *testing.T) {
require.NoError(t, tstWorkflowAction(t, workflowActionReverseTraffic, "primary", ""))
}

// tstWorkflowSwitchReadsAndWrites tests that SwitchWrites w/o any user provided --tablet_types
// tstWorkflowSwitchReadsAndWrites tests that switching traffic w/o any user provided --tablet_types
// value switches all traffic
func tstWorkflowSwitchReadsAndWrites(t *testing.T) {
require.NoError(t, tstWorkflowAction(t, workflowActionSwitchTraffic, "", ""))
}

// tstWorkflowReversesReadsAndWrites tests that SwitchWrites w/o any user provided --tablet_types
// tstWorkflowReversesReadsAndWrites tests that ReverseTraffic w/o any user provided --tablet_types
// value switches all traffic in reverse
func tstWorkflowReverseReadsAndWrites(t *testing.T) {
require.NoError(t, tstWorkflowAction(t, workflowActionReverseTraffic, "", ""))
Expand Down Expand Up @@ -205,25 +205,15 @@ func validateWritesRouteToTarget(t *testing.T) {
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100")
}

func revert(t *testing.T) {
switchWrites(t, reverseKsWorkflow, false)
func revert(t *testing.T, workflowType string) {
switchWrites(t, workflowType, ksWorkflow, true)
validateWritesRouteToSource(t)
switchReadsNew(t, allCellNames, ksWorkflow, true)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, true)
validateReadsRouteToSource(t, "replica")
queries := []string{
"delete from _vt.vreplication",
"delete from _vt.resharding_journal",
}

for _, query := range queries {
targetTab1.QueryTablet(query, "customer", true)
targetTab2.QueryTablet(query, "customer", true)
sourceTab.QueryTablet(query, "product", true)
}
targetTab1.QueryTablet("drop table vt_customer.customer", "customer", true)
targetTab2.QueryTablet("drop table vt_customer.customer", "customer", true)

clearRoutingRules(t, vc)
// cancel the workflow to cleanup
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "Cancel", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("%s Cancel error: %v", workflowType, err))
}

func checkStates(t *testing.T, startState, endState string) {
Expand Down Expand Up @@ -648,9 +638,13 @@ func TestSwitchReadsWritesInAnyOrder(t *testing.T) {
moveCustomerTableSwitchFlows(t, []*Cell{vc.Cells["zone1"]}, "zone1")
}

func switchReadsNew(t *testing.T, cells, ksWorkflow string, reverse bool) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "--", "--cells="+cells,
"--tablet_types=rdonly,replica", fmt.Sprintf("--reverse=%t", reverse), ksWorkflow)
func switchReadsNew(t *testing.T, workflowType, cells, ksWorkflow string, reverse bool) {
command := "SwitchTraffic"
if reverse {
command = "ReverseTraffic"
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells,
"--tablet_types=rdonly,replica", command, ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output))
if output != "" {
fmt.Printf("SwitchReads output: %s\n", output)
Expand All @@ -664,83 +658,84 @@ func moveCustomerTableSwitchFlows(t *testing.T, cells []*Cell, sourceCellOrAlias
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
tables := "customer"
setupCustomerKeyspace(t)
workflowType := "MoveTables"

var moveTablesAndWait = func() {
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)
catchup(t, targetTab1, workflow, "MoveTables")
catchup(t, targetTab2, workflow, "MoveTables")
moveTablesAction(t, "Create", sourceCellOrAlias, workflow, sourceKs, targetKs, tables)
catchup(t, targetTab1, workflow, workflowType)
catchup(t, targetTab2, workflow, workflowType)
vdiff1(t, ksWorkflow, "")
}

var switchReadsFollowedBySwitchWrites = func() {
moveTablesAndWait()

validateReadsRouteToSource(t, "replica")
switchReadsNew(t, allCellNames, ksWorkflow, false)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, false)
validateReadsRouteToTarget(t, "replica")

validateWritesRouteToSource(t)
switchWrites(t, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
validateWritesRouteToTarget(t)

revert(t)
revert(t, workflowType)
}
var switchWritesFollowedBySwitchReads = func() {
moveTablesAndWait()

validateWritesRouteToSource(t)
switchWrites(t, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
validateWritesRouteToTarget(t)

validateReadsRouteToSource(t, "replica")
switchReadsNew(t, allCellNames, ksWorkflow, false)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, false)
validateReadsRouteToTarget(t, "replica")

revert(t)
revert(t, workflowType)
}

var switchReadsReverseSwitchWritesSwitchReads = func() {
moveTablesAndWait()

validateReadsRouteToSource(t, "replica")
switchReadsNew(t, allCellNames, ksWorkflow, false)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, false)
validateReadsRouteToTarget(t, "replica")

switchReadsNew(t, allCellNames, ksWorkflow, true)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, true)
validateReadsRouteToSource(t, "replica")
printRoutingRules(t, vc, "After reversing SwitchReads")
printRoutingRules(t, vc, "After reversing read traffic")

validateWritesRouteToSource(t)
switchWrites(t, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
validateWritesRouteToTarget(t)

printRoutingRules(t, vc, "After SwitchWrites and reversing SwitchReads")
printRoutingRules(t, vc, "After switching writes and reversing reads")
validateReadsRouteToSource(t, "replica")
switchReadsNew(t, allCellNames, ksWorkflow, false)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, false)
validateReadsRouteToTarget(t, "replica")

revert(t)
revert(t, workflowType)
}

var switchWritesReverseSwitchReadsSwitchWrites = func() {
moveTablesAndWait()

validateWritesRouteToSource(t)
switchWrites(t, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
validateWritesRouteToTarget(t)

switchWrites(t, reverseKsWorkflow, true)
switchWrites(t, workflowType, reverseKsWorkflow, true)
validateWritesRouteToSource(t)

validateReadsRouteToSource(t, "replica")
switchReadsNew(t, allCellNames, ksWorkflow, false)
switchReadsNew(t, workflowType, allCellNames, ksWorkflow, false)
validateReadsRouteToTarget(t, "replica")

validateWritesRouteToSource(t)
switchWrites(t, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
validateWritesRouteToTarget(t)

revert(t)
revert(t, workflowType)

}
switchReadsFollowedBySwitchWrites()
Expand Down
Loading

0 comments on commit dd49816

Please sign in to comment.